Supervisor - replication
Mix.install([{:kino, "~> 0.9.4"}])
Introduction
Practicing supervision, reading the book https://elixirpatterns.dev
what data buffering supervision tree is
- consumes data asynchronously from other processes
- summarizes that data
- flushes that data at a regular interval
three components
-
EventCollector- aggregates asynchronous cast messages from other processes
-
EventFlusher-
flushes the buffered data from
EventCollector
-
flushes the buffered data from
-
SummarizerSupervisor- monitors these two processes
-
:one_for_onestrategy because there is no interdependence between child processes
PartitionSupervisor
- used when you want to horizontally scale processes or even supervision trees
- By default, starts one instance of the specified process for each scheduler that is available to the BEAM
PartitionSupervisor via tuple
- instructs the call and cast functions to first look up the PID before dispatching the message to the running process
-
Erlang
phash2function deterministically generates an integer clamped to the range of available partitions
defp via_tuple(term) do
{:via, PartitionSupervisor, {EventCollectorPartitionSupervisor, term}}
end
System.schedulers_online()
Define modules
defmodule User do
# these keys are required when creating a new struct
@enforce_keys [:id, :name, :plan]
defstruct [:id, :name, :plan]
end
defmodule EventCollector do
use GenServer
require Logger
## Client API
def start_link(options) do
GenServer.start_link(__MODULE__, options)
end
def record_event(%User{id: user_id} = user) do
GenServer.cast(via_tuple(user_id), {:record_event, user})
end
def flush_events(partition) do
GenServer.call(via_tuple(partition), :flush_events)
end
# instructs the call and cast functions to first look up the PID
# before dispatching the message to the running process
defp via_tuple(term) do
{:via, PartitionSupervisor, {EventCollectorPartitionSupervisor, term}}
end
## Server callbacks
@initial_state %{count: 0, data: %{}}
@impl true
def init(_) do
{:ok, @initial_state}
end
# Records an event
@impl true
def handle_cast({:record_event, %User{} = user}, state) do
new_count = state.count + 1
new_data = Map.update(state.data, user.id, 1, &(&1 + 1))
{:noreply, %{state | count: new_count, data: new_data}}
end
# Flushes all the collected date
@impl true
def handle_call(:flush_events, _from, state) do
if state.count > 0 do
Logger.info("#{__MODULE__}:#{inspect(self())} - #{state.count} events flushed")
end
{:reply, state.data, @initial_state}
end
end
defmodule EventFlusher do
use GenServer
require Logger
## Client API
def start_link(options) do
GenServer.start_link(__MODULE__, options)
end
## Server callbacks
@impl true
def init(options) do
flush_interval = Keyword.fetch!(options, :flush_interval)
partition = Keyword.fetch!(options, :partition)
initial_state = %{
flush_interval: flush_interval,
partition: partition
}
{:ok, initial_state, {:continue, :schedule_next_run}}
end
@impl true
def handle_continue(:schedule_next_run, state) do
Process.send_after(self(), :perform_cron_work, state.flush_interval)
{:noreply, state}
end
@impl true
def handle_info(:perform_cron_work, state) do
buffered_events = EventCollector.flush_events(state.partition)
# check the size of the map in constant time
unless map_size(buffered_events) == 0 do
# do something here
Logger.info("#{__MODULE__}:#{inspect(self())} - Flushed data: #{inspect(buffered_events)}")
end
{:noreply, state, {:continue, :schedule_next_run}}
end
end
defmodule SummarizerSupervisor do
use Supervisor
@partition_count 3
def start_link(options) do
Supervisor.start_link(__MODULE__, options, name: __MODULE__)
end
@impl true
def init(options) do
children = [
{
PartitionSupervisor,
child_spec: EventCollector.child_spec(options),
name: EventCollectorPartitionSupervisor,
partitions: @partition_count
},
{
PartitionSupervisor,
child_spec: EventFlusher.child_spec(options),
name: EventFlusherPartitionSupervisor,
partitions: @partition_count,
with_arguments: fn [options], partition ->
[Keyword.put(options, :partition, partition)]
end
}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
Run
Supervisor.stop(SummarizerSupervisor)
{:ok, sup} = SummarizerSupervisor.start_link(flush_interval: 1_000)
# Supervisor.which_children(:c.pid(0, 584, 0))
Supervisor.which_children(sup)
test_users = [
%User{id: "1", name: "MegaCorp", plan: :enterprise},
%User{id: "2", name: "Gundam", plan: :basic},
%User{id: "3", name: "CoffeeCentral", plan: :free},
%User{id: "4", name: "CodeTogether", plan: :enterprise},
%User{id: "5", name: "FPFunHouse", plan: :basic}
]
Kino.DataTable.new(test_users)
1..100_000
|> Task.async_stream(
fn _ ->
event = Enum.random(test_users)
EventCollector.record_event(event)
end,
max_concurrency: 2000
)
|> Stream.run()