Fixing Our PartitionSupervisor
Mix.install([
{:kino, "~> 0.7.0"}
])
Section
defmodule User do
defstruct [:id, :name, :plan]
end
defmodule EventCollector do
use GenServer
# ---- Client API ----
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
def count_event(%User{} = user) do
GenServer.cast(
{:via, PartitionSupervisor, {EventCollectorPartitionSupervisor, user}},
{:count_event, user}
)
end
def flush_event_counts(partition) do
GenServer.call(
{:via, PartitionSupervisor, {EventCollectorPartitionSupervisor, partition}},
:flush_event_counts
)
end
# ---- Server Callbacks ----
@impl true
def init(_) do
{:ok, %{}}
end
@impl true
def handle_cast({:count_event, %User{} = user}, event_totals) do
updated_event_totals = Map.update(event_totals, user.id, 1, &(&1 + 1))
{:noreply, updated_event_totals}
end
@impl true
def handle_call(:flush_event_counts, _from, event_totals) do
{:reply, event_totals, %{}}
end
end
defmodule EventFlusher do
use GenServer
require Logger
# ---- Client API ----
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
# ---- Server Callbacks ----
@impl true
def init(opts) do
state = %{
flush_interval: Keyword.get(opts, :flush_interval, 5_000),
partition: Keyword.fetch!(opts, :partition)
}
{:ok, state, {:continue, :schedule_next_run}}
end
@impl true
def handle_continue(:schedule_next_run, state) do
Process.send_after(self(), :perform_cron_work, state.flush_interval)
{:noreply, state}
end
@impl true
def handle_info(:perform_cron_work, state) do
write_data_to_db = EventCollector.flush_event_counts(state.partition)
unless Map.keys(write_data_to_db) == [] do
Logger.info(write_data_to_db)
end
{:noreply, state, {:continue, :schedule_next_run}}
end
end
defmodule SummarizerSupervisor do
use Supervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
partitions = 3
children = [
{
PartitionSupervisor,
child_spec: EventCollector,
name: EventCollectorPartitionSupervisor,
partitions: partitions
},
{
PartitionSupervisor,
child_spec: EventFlusher,
name: EventFlusherPartitionSupervisor,
partitions: partitions,
with_arguments: fn [opts], partition ->
[Keyword.put(opts, :partition, partition)]
end
}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
if Process.whereis(SummarizerSupervisor) do
Supervisor.stop(SummarizerSupervisor)
end
SummarizerSupervisor.start_link([]) |> IO.inspect()
Kino.Process.render_sup_tree(SummarizerSupervisor)
test_users = [
%User{id: "1", name: "MegaCorp", plan: :enterprise},
%User{id: "2", name: "Gundam", plan: :basic},
%User{id: "3", name: "CoffeCentral", plan: :free},
%User{id: "4", name: "CodersUnite", plan: :enterprise},
%User{id: "5", name: "FPFunHouse", plan: :basic}
]
1..1_000_000
|> Task.async_stream(
fn _ ->
user = Enum.random(test_users)
EventCollector.count_event(user)
end,
max_concurrency: 2_000
)
|> Stream.run()
<- Previous