Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Account analytics

met-402-account-analytics.livemd

Account analytics

A persistent subscriber

On the top level, Commanded has convenient things for projectors, event handlers, etc. However, they work on individual events and often we want to “power through” all events (currently a tad over half a billion of them). The lower level EventStore library has ways to batch things.

Let’s print all events. We subscribe to the special $all stream UUID, which is an event stream that holds all events. Apart from this stream, there is a stream per aggregate root so you can use this to go through all events of an account or a user or a monitor too in a very efficient way.

The EventStore docs have an example of a persistent subscriber in a module but we go the simple way for now using a receive loop.

alias Backend.EventStore
subs_id = "livebook#{:erlang.unique_integer()}"
IO.puts("Subscribing as #{subs_id}")
batch_size = 1000

{:ok, subscription} =
  EventStore.subscribe_to_all_streams(
    subs_id,
    self(),
    buffer_size: batch_size
  )

receive do
  {:subscribed, ^subscription} ->
    IO.puts("Successfully subscribed to $all stream")
end

loop = fn loop ->
  receive do
    {:events, events} ->
      num = Enum.count(events)
      IO.puts("Received #{num} events, first: #{inspect(hd(events))}")
      last = List.last(events)
      last_id = last.event_number

      # Acknowledge receipt
      IO.puts("Ack with event number #{last_id}")
      :ok = EventStore.ack(subscription, last_id)

      if num == batch_size do
        loop.(loop)
      end
  end
end

loop.(loop)
IO.puts("All done, deleting subscription")
EventStore.delete_subscription("$all", subs_id)

Stream Linking

One option to avoid having to go through the whole backlog is to use stream linking and have a stream per event type. The code below is not for us so won’t run, I got it from a nice person on the Elixir-lang Slack.

Current idea is to run the equivalent of this on dev1 and see what happens.

defmodule Ranger.Results.StreamLinker do
  use Commanded.Event.Handler,
    application: Ranger.App,
    name: __MODULE__

  require Logger

  alias Ranger.Results.Events.{
    BreachedCredentialsFound,
    DNSRecordsFound,
    OpenTCPPortsFound,
    RawNessusVulnsFound,
    RegisteredTyposquatsFound,
    SecurityHeadersFound,
    SRIHashesFound,
    SubdomainTakeoversFound,
    SubdomainsFound,
    TLSVulnsFound,
    WebServersFound
  }

  @events_to_link [
    BreachedCredentialsFound,
    DNSRecordsFound,
    OpenTCPPortsFound,
    RawNessusVulnsFound,
    RegisteredTyposquatsFound,
    SecurityHeadersFound,
    SRIHashesFound,
    SubdomainTakeoversFound,
    SubdomainsFound,
    TLSVulnsFound,
    WebServersFound
  ]

  def handle(%event_type{}, metadata) when event_type in @events_to_link do
    Logger.info("Linking #{event_type} to #{event_type_to_stream(event_type)}")

    event_type
    |> event_type_to_stream()
    |> link_to_stream(metadata)

    :ok
  end

  def event_type_to_stream(event_type) do
    event_type
    |> to_string()
    |> String.split(".")
    |> Enum.reverse()
    |> hd()
    |> Macro.underscore()
  end

  defp link_to_stream(stream_name, %{event_id: event_id}) do
    Ranger.EventStore.link_to_stream(stream_name, :any_version, [event_id])
  end
end