Powered by AppSignal & Oban Pro

Supervisor - replication

notebooks/supervisor_replication.livemd

Supervisor - replication

Mix.install([{:kino, "~> 0.9.4"}])

Introduction

Practicing supervision, reading the book https://elixirpatterns.dev

what data buffering supervision tree is

  • consumes data asynchronously from other processes
  • summarizes that data
  • flushes that data at a regular interval

three components

  • EventCollector
    • aggregates asynchronous cast messages from other processes
  • EventFlusher
    • flushes the buffered data from EventCollector
  • SummarizerSupervisor
    • monitors these two processes
    • :one_for_one strategy because there is no interdependence between child processes

PartitionSupervisor

  • used when you want to horizontally scale processes or even supervision trees
  • By default, starts one instance of the specified process for each scheduler that is available to the BEAM

CleanShot 2023-05-24 at 14 46 55

PartitionSupervisor via tuple

  • instructs the call and cast functions to first look up the PID before dispatching the message to the running process
  • Erlang phash2 function deterministically generates an integer clamped to the range of available partitions
defp via_tuple(term) do
  {:via, PartitionSupervisor, {EventCollectorPartitionSupervisor, term}}
end
System.schedulers_online()

Define modules

defmodule User do
  # these keys are required when creating a new struct
  @enforce_keys [:id, :name, :plan]
  defstruct [:id, :name, :plan]
end
defmodule EventCollector do
  use GenServer
  require Logger

  ## Client API

  def start_link(options) do
    GenServer.start_link(__MODULE__, options)
  end

  def record_event(%User{id: user_id} = user) do
    GenServer.cast(via_tuple(user_id), {:record_event, user})
  end

  def flush_events(partition) do
    GenServer.call(via_tuple(partition), :flush_events)
  end

  # instructs the call and cast functions to first look up the PID
  # before dispatching the message to the running process
  defp via_tuple(term) do
    {:via, PartitionSupervisor, {EventCollectorPartitionSupervisor, term}}
  end

  ## Server callbacks

  @initial_state %{count: 0, data: %{}}

  @impl true
  def init(_) do
    {:ok, @initial_state}
  end

  # Records an event
  @impl true
  def handle_cast({:record_event, %User{} = user}, state) do
    new_count = state.count + 1
    new_data = Map.update(state.data, user.id, 1, &(&1 + 1))
    {:noreply, %{state | count: new_count, data: new_data}}
  end

  # Flushes all the collected date
  @impl true
  def handle_call(:flush_events, _from, state) do
    if state.count > 0 do
      Logger.info("#{__MODULE__}:#{inspect(self())} - #{state.count} events flushed")
    end

    {:reply, state.data, @initial_state}
  end
end
defmodule EventFlusher do
  use GenServer
  require Logger

  ## Client API

  def start_link(options) do
    GenServer.start_link(__MODULE__, options)
  end

  ## Server callbacks

  @impl true
  def init(options) do
    flush_interval = Keyword.fetch!(options, :flush_interval)
    partition = Keyword.fetch!(options, :partition)

    initial_state = %{
      flush_interval: flush_interval,
      partition: partition
    }

    {:ok, initial_state, {:continue, :schedule_next_run}}
  end

  @impl true
  def handle_continue(:schedule_next_run, state) do
    Process.send_after(self(), :perform_cron_work, state.flush_interval)
    {:noreply, state}
  end

  @impl true
  def handle_info(:perform_cron_work, state) do
    buffered_events = EventCollector.flush_events(state.partition)

    # check the size of the map in constant time
    unless map_size(buffered_events) == 0 do
      # do something here
      Logger.info("#{__MODULE__}:#{inspect(self())} - Flushed data: #{inspect(buffered_events)}")
    end

    {:noreply, state, {:continue, :schedule_next_run}}
  end
end
defmodule SummarizerSupervisor do
  use Supervisor

  @partition_count 3

  def start_link(options) do
    Supervisor.start_link(__MODULE__, options, name: __MODULE__)
  end

  @impl true
  def init(options) do
    children = [
      {
        PartitionSupervisor,
        child_spec: EventCollector.child_spec(options),
        name: EventCollectorPartitionSupervisor,
        partitions: @partition_count
      },
      {
        PartitionSupervisor,
        child_spec: EventFlusher.child_spec(options),
        name: EventFlusherPartitionSupervisor,
        partitions: @partition_count,
        with_arguments: fn [options], partition ->
          [Keyword.put(options, :partition, partition)]
        end
      }
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

Run

Supervisor.stop(SummarizerSupervisor)
{:ok, sup} = SummarizerSupervisor.start_link(flush_interval: 1_000)
# Supervisor.which_children(:c.pid(0, 584, 0))
Supervisor.which_children(sup)
test_users = [
  %User{id: "1", name: "MegaCorp", plan: :enterprise},
  %User{id: "2", name: "Gundam", plan: :basic},
  %User{id: "3", name: "CoffeeCentral", plan: :free},
  %User{id: "4", name: "CodeTogether", plan: :enterprise},
  %User{id: "5", name: "FPFunHouse", plan: :basic}
]

Kino.DataTable.new(test_users)
1..100_000
|> Task.async_stream(
  fn _ ->
    event = Enum.random(test_users)

    EventCollector.record_event(event)
  end,
  max_concurrency: 2000
)
|> Stream.run()