Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us
Notesclub

Fixing Our PartitionSupervisor

9_correct_parition_supervisor.livemd

Fixing Our PartitionSupervisor

Mix.install([
  {:kino, "~> 0.7.0"}
])

Section

defmodule User do
  defstruct [:id, :name, :plan]
end
defmodule EventCollector do
  use GenServer

  # ---- Client API ----

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  def count_event(%User{} = user) do
    GenServer.cast(
      {:via, PartitionSupervisor, {EventCollectorPartitionSupervisor, user}},
      {:count_event, user}
    )
  end

  def flush_event_counts(partition) do
    GenServer.call(
      {:via, PartitionSupervisor, {EventCollectorPartitionSupervisor, partition}},
      :flush_event_counts
    )
  end

  # ---- Server Callbacks ----

  @impl true
  def init(_) do
    {:ok, %{}}
  end

  @impl true
  def handle_cast({:count_event, %User{} = user}, event_totals) do
    updated_event_totals = Map.update(event_totals, user.id, 1, &(&1 + 1))

    {:noreply, updated_event_totals}
  end

  @impl true
  def handle_call(:flush_event_counts, _from, event_totals) do
    {:reply, event_totals, %{}}
  end
end
defmodule EventFlusher do
  use GenServer

  require Logger

  # ---- Client API ----

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  # ---- Server Callbacks ----

  @impl true
  def init(opts) do
    state = %{
      flush_interval: Keyword.get(opts, :flush_interval, 5_000),
      partition: Keyword.fetch!(opts, :partition)
    }

    {:ok, state, {:continue, :schedule_next_run}}
  end

  @impl true
  def handle_continue(:schedule_next_run, state) do
    Process.send_after(self(), :perform_cron_work, state.flush_interval)

    {:noreply, state}
  end

  @impl true
  def handle_info(:perform_cron_work, state) do
    write_data_to_db = EventCollector.flush_event_counts(state.partition)

    unless Map.keys(write_data_to_db) == [] do
      Logger.info(write_data_to_db)
    end

    {:noreply, state, {:continue, :schedule_next_run}}
  end
end
defmodule SummarizerSupervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    partitions = 3

    children = [
      {
        PartitionSupervisor,
        child_spec: EventCollector,
        name: EventCollectorPartitionSupervisor,
        partitions: partitions
      },
      {
        PartitionSupervisor,
        child_spec: EventFlusher,
        name: EventFlusherPartitionSupervisor,
        partitions: partitions,
        with_arguments: fn [opts], partition ->
          [Keyword.put(opts, :partition, partition)]
        end
      }
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end
if Process.whereis(SummarizerSupervisor) do
  Supervisor.stop(SummarizerSupervisor)
end

SummarizerSupervisor.start_link([]) |> IO.inspect()
Kino.Process.render_sup_tree(SummarizerSupervisor)

test_users = [
  %User{id: "1", name: "MegaCorp", plan: :enterprise},
  %User{id: "2", name: "Gundam", plan: :basic},
  %User{id: "3", name: "CoffeCentral", plan: :free},
  %User{id: "4", name: "CodersUnite", plan: :enterprise},
  %User{id: "5", name: "FPFunHouse", plan: :basic}
]

1..1_000_000
|> Task.async_stream(
  fn _ ->
    user = Enum.random(test_users)
    EventCollector.count_event(user)
  end,
  max_concurrency: 2_000
)
|> Stream.run()

<- Previous