Account analytics
A persistent subscriber
On the top level, Commanded has convenient things for projectors, event handlers, etc. However, they work on individual events and often we want to “power through” all events (currently a tad over half a billion of them). The lower level EventStore library has ways to batch things.
Let’s print all events. We subscribe to the special $all
stream UUID,
which is an event stream that holds all events. Apart from this stream, there is a stream per
aggregate root so you can use this to go through all events of an account or a user or a monitor
too in a very efficient way.
The EventStore docs have an example of a persistent subscriber in a module but we go the simple way for now using a receive loop.
alias Backend.EventStore
subs_id = "livebook#{:erlang.unique_integer()}"
IO.puts("Subscribing as #{subs_id}")
batch_size = 1000
{:ok, subscription} =
EventStore.subscribe_to_all_streams(
subs_id,
self(),
buffer_size: batch_size
)
receive do
{:subscribed, ^subscription} ->
IO.puts("Successfully subscribed to $all stream")
end
loop = fn loop ->
receive do
{:events, events} ->
num = Enum.count(events)
IO.puts("Received #{num} events, first: #{inspect(hd(events))}")
last = List.last(events)
last_id = last.event_number
# Acknowledge receipt
IO.puts("Ack with event number #{last_id}")
:ok = EventStore.ack(subscription, last_id)
if num == batch_size do
loop.(loop)
end
end
end
loop.(loop)
IO.puts("All done, deleting subscription")
EventStore.delete_subscription("$all", subs_id)
Stream Linking
One option to avoid having to go through the whole backlog is to use stream linking and have a stream per event type. The code below is not for us so won’t run, I got it from a nice person on the Elixir-lang Slack.
Current idea is to run the equivalent of this on dev1 and see what happens.
defmodule Ranger.Results.StreamLinker do
use Commanded.Event.Handler,
application: Ranger.App,
name: __MODULE__
require Logger
alias Ranger.Results.Events.{
BreachedCredentialsFound,
DNSRecordsFound,
OpenTCPPortsFound,
RawNessusVulnsFound,
RegisteredTyposquatsFound,
SecurityHeadersFound,
SRIHashesFound,
SubdomainTakeoversFound,
SubdomainsFound,
TLSVulnsFound,
WebServersFound
}
@events_to_link [
BreachedCredentialsFound,
DNSRecordsFound,
OpenTCPPortsFound,
RawNessusVulnsFound,
RegisteredTyposquatsFound,
SecurityHeadersFound,
SRIHashesFound,
SubdomainTakeoversFound,
SubdomainsFound,
TLSVulnsFound,
WebServersFound
]
def handle(%event_type{}, metadata) when event_type in @events_to_link do
Logger.info("Linking #{event_type} to #{event_type_to_stream(event_type)}")
event_type
|> event_type_to_stream()
|> link_to_stream(metadata)
:ok
end
def event_type_to_stream(event_type) do
event_type
|> to_string()
|> String.split(".")
|> Enum.reverse()
|> hd()
|> Macro.underscore()
end
defp link_to_stream(stream_name, %{event_id: event_id}) do
Ranger.EventStore.link_to_stream(stream_name, :any_version, [event_id])
end
end