Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us
Notesclub

The Single Supervisor Solution

7_single_supervisor_solution.livemd

The Single Supervisor Solution

Mix.install([
  {:kino, "~> 0.7.0"}
])

The User Struct

defmodule User do
  defstruct [:id, :name, :plan]
end

EventCollector GenServer

defmodule EventCollector do
  use GenServer

  # ---- Client API ----

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def count_event(%User{} = user) do
    GenServer.cast(__MODULE__, {:count_event, user})
  end

  def flush_event_counts do
    GenServer.call(__MODULE__, :flush_event_counts)
  end

  # ---- Server Callbacks ----

  @impl true
  def init(_) do
    {:ok, %{}}
  end

  @impl true
  def handle_cast({:count_event, %User{} = user}, event_totals) do
    updated_event_totals = Map.update(event_totals, user.id, 1, &(&1 + 1))

    {:noreply, updated_event_totals}
  end

  @impl true
  def handle_call(:flush_event_counts, _from, event_totals) do
    {:reply, event_totals, %{}}
  end
end

EventFlusher GenServer

defmodule EventFlusher do
  use GenServer

  require Logger

  # ---- Client API ----

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  # ---- Server Callbacks ----

  @impl true
  def init(opts) do
    flush_interval = Keyword.get(opts, :flush_interval, 5_000)

    {:ok, flush_interval, {:continue, :schedule_next_run}}
  end

  @impl true
  def handle_continue(:schedule_next_run, flush_interval) do
    Process.send_after(self(), :perform_cron_work, flush_interval)

    {:noreply, flush_interval}
  end

  @impl true
  def handle_info(:perform_cron_work, flush_interval) do
    write_data_to_db = EventCollector.flush_event_counts()

    unless map_size(write_data_to_db) == 0 do
      Logger.info(write_data_to_db)
    end

    {:noreply, flush_interval, {:continue, :schedule_next_run}}
  end
end

SummarizerSupervisor

defmodule SummarizerSupervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    children = [
      {EventCollector, []},
      {EventFlusher, []}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

Time for a Test Drive

if Process.whereis(SummarizerSupervisor) do
  Supervisor.stop(SummarizerSupervisor)
end

SummarizerSupervisor.start_link([]) |> IO.inspect()
Kino.Process.render_sup_tree(SummarizerSupervisor)

test_users = [
  %User{id: "1", name: "MegaCorp", plan: :enterprise},
  %User{id: "2", name: "Gundam", plan: :basic},
  %User{id: "3", name: "CoffeCentral", plan: :free},
  %User{id: "4", name: "CodersUnite", plan: :enterprise},
  %User{id: "5", name: "FPFunHouse", plan: :basic}
]

1..500_000
|> Task.async_stream(
  fn _ ->
    user = Enum.random(test_users)
    EventCollector.count_event(user)
  end,
  max_concurrency: 2_000
)
|> Stream.run()

Visualizing messages

if Process.whereis(SummarizerSupervisor) do
  Supervisor.stop(SummarizerSupervisor)
end

Kino.Process.render_seq_trace(:all, fn ->
  SummarizerSupervisor.start_link([])

  EventCollector.count_event(%User{id: "1", name: "MegaCorp", plan: :enterprise})
  EventCollector.count_event(%User{id: "2", name: "Gundam", plan: :basic})
end)

<- Previous | Next ->