The Single Supervisor Solution
Mix.install([
{:kino, "~> 0.7.0"}
])
The User Struct
defmodule User do
defstruct [:id, :name, :plan]
end
EventCollector GenServer
defmodule EventCollector do
use GenServer
# ---- Client API ----
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def count_event(%User{} = user) do
GenServer.cast(__MODULE__, {:count_event, user})
end
def flush_event_counts do
GenServer.call(__MODULE__, :flush_event_counts)
end
# ---- Server Callbacks ----
@impl true
def init(_) do
{:ok, %{}}
end
@impl true
def handle_cast({:count_event, %User{} = user}, event_totals) do
updated_event_totals = Map.update(event_totals, user.id, 1, &(&1 + 1))
{:noreply, updated_event_totals}
end
@impl true
def handle_call(:flush_event_counts, _from, event_totals) do
{:reply, event_totals, %{}}
end
end
EventFlusher GenServer
defmodule EventFlusher do
use GenServer
require Logger
# ---- Client API ----
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
# ---- Server Callbacks ----
@impl true
def init(opts) do
flush_interval = Keyword.get(opts, :flush_interval, 5_000)
{:ok, flush_interval, {:continue, :schedule_next_run}}
end
@impl true
def handle_continue(:schedule_next_run, flush_interval) do
Process.send_after(self(), :perform_cron_work, flush_interval)
{:noreply, flush_interval}
end
@impl true
def handle_info(:perform_cron_work, flush_interval) do
write_data_to_db = EventCollector.flush_event_counts()
unless map_size(write_data_to_db) == 0 do
Logger.info(write_data_to_db)
end
{:noreply, flush_interval, {:continue, :schedule_next_run}}
end
end
SummarizerSupervisor
defmodule SummarizerSupervisor do
use Supervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
children = [
{EventCollector, []},
{EventFlusher, []}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
Time for a Test Drive
if Process.whereis(SummarizerSupervisor) do
Supervisor.stop(SummarizerSupervisor)
end
SummarizerSupervisor.start_link([]) |> IO.inspect()
Kino.Process.render_sup_tree(SummarizerSupervisor)
test_users = [
%User{id: "1", name: "MegaCorp", plan: :enterprise},
%User{id: "2", name: "Gundam", plan: :basic},
%User{id: "3", name: "CoffeCentral", plan: :free},
%User{id: "4", name: "CodersUnite", plan: :enterprise},
%User{id: "5", name: "FPFunHouse", plan: :basic}
]
1..500_000
|> Task.async_stream(
fn _ ->
user = Enum.random(test_users)
EventCollector.count_event(user)
end,
max_concurrency: 2_000
)
|> Stream.run()
Visualizing messages
if Process.whereis(SummarizerSupervisor) do
Supervisor.stop(SummarizerSupervisor)
end
Kino.Process.render_seq_trace(:all, fn ->
SummarizerSupervisor.start_link([])
EventCollector.count_event(%User{id: "1", name: "MegaCorp", plan: :enterprise})
EventCollector.count_event(%User{id: "2", name: "Gundam", plan: :basic})
end)
<- Previous | Next ->