Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us
Notesclub

Is Scaling Up Simple?

8_naive_parition_supervisor.livemd

Is Scaling Up Simple?

Mix.install([
  {:kino, "~> 0.7.0"}
])

Same code as before

defmodule User do
  defstruct [:id, :name, :plan]
end

defmodule EventCollector do
  use GenServer

  # ---- Client API ----

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def count_event(%User{} = user) do
    GenServer.cast(__MODULE__, {:count_event, user})
  end

  def flush_event_counts do
    GenServer.call(__MODULE__, :flush_event_counts)
  end

  # ---- Server Callbacks ----

  @impl true
  def init(_) do
    {:ok, %{}}
  end

  @impl true
  def handle_cast({:count_event, %User{} = user}, event_totals) do
    updated_event_totals = Map.update(event_totals, user.id, 1, &(&1 + 1))

    {:noreply, updated_event_totals}
  end

  @impl true
  def handle_call(:flush_event_counts, _from, event_totals) do
    {:reply, event_totals, %{}}
  end
end

defmodule EventFlusher do
  use GenServer

  require Logger

  # ---- Client API ----

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  # ---- Server Callbacks ----

  @impl true
  def init(opts) do
    flush_interval = Keyword.get(opts, :flush_interval, 5_000)

    {:ok, flush_interval, {:continue, :schedule_next_run}}
  end

  @impl true
  def handle_continue(:schedule_next_run, flush_interval) do
    Process.send_after(self(), :perform_cron_work, flush_interval)

    {:noreply, flush_interval}
  end

  @impl true
  def handle_info(:perform_cron_work, flush_interval) do
    write_data_to_db = EventCollector.flush_event_counts()

    unless Map.keys(write_data_to_db) == [] do
      Logger.info(write_data_to_db)
    end

    {:noreply, flush_interval, {:continue, :schedule_next_run}}
  end
end

defmodule SummarizerSupervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    children = [
      {EventCollector, []},
      {EventFlusher, []}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

PartitionSupervisor

if Process.whereis(SummarizerPartitionSupervisor) do
  Supervisor.stop(SummarizerPartitionSupervisor)
end

children = [
  {
    PartitionSupervisor,
    # System.schedulers_online()
    child_spec: SummarizerSupervisor, name: SummarizerPartitionSupervisor, partitions: 1
  }
]

Supervisor.start_link(children, strategy: :one_for_one)

With all of the components of our existing supervision tree being singletons (using the :name option in start_link/1) we cannot start multiple copies of the supervision trees. Let’s adjust our approach and scale this up!

<- Previous | Next ->