Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Real-World Event Sourcing

notes.livemd

Real-World Event Sourcing

Mix.install([
  {:kino, "~> 0.14.0"}
])

Event Sourced Calculator

defmodule EventSourcedCalculator.V3 do
  @max_state_value 10_000
  @min_state_value 0

  def handle_command(%{value: val}, %{cmd: :add, value: v}) do
    %{event_type: :value_added, value: min(@max_state_value - val, v)}
  end

  def handle_command(%{value: val}, %{cmd: :sub, value: v}) do
    %{event_type: :value_subtracted, value: max(@min_state_value, val - v)}
  end

  def handle_command(%{value: val}, %{cmd: :mul, value: v}) when val * v > @max_state_value do
    {:error, :mul_failed}
  end

  def handle_command(%{value: _val}, %{cmd: :mul, value: v}) do
    %{event_type: :value_multiplied, value: v}
  end

  def handle_command(%{value: _val}, %{cmd: :div, value: 0}) do
    {:error, :divide_failed}
  end

  def handle_command(%{value: _val}, %{cmd: :div, value: v}) do
    %{event_type: :value_divided, value: v}
  end

  def handle_event(%{value: val}, %{event_type: :value_added, value: v}) do
    %{value: val + v}
  end

  def handle_event(%{value: val}, %{event_type: :value_subtracted, value: v}) do
    %{value: val - v}
  end

  def handle_event(%{value: val}, %{event_type: :value_multiplied, value: v}) do
    %{value: val * v}
  end

  def handle_event(%{value: val}, %{event_type: :value_divided, value: v}) do
    %{value: val / v}
  end

  def handle_event(%{value: _val}, %{}) do
    %{}
  end

  def handle_event(%{value: _val} = state, _) do
    state
  end
end
cmds = [
  %{value: 50, cmd: :add},
  %{value: 10, cmd: :add},
  %{value: 0, cmd: :div},
  %{value: 2, cmd: :add}
]
initial = %{value: 0}
cmds
|> List.foldl(initial, fn cmd, acc ->
  EventSourcedCalculator.V3.handle_event(
    acc,
    EventSourcedCalculator.V3.handle_command(acc, cmd) |> IO.inspect()
  )
  |> IO.inspect()
end)

Projector - Bank Account

defmodule Projectors.AccountBalance do
  use GenServer
  require Logger

  def start_link(account_number) do
    GenServer.start_link(__MODULE__, account_number, name: via(account_number))
  end

  @impl true
  def init(account_number) do
    {:ok, %{account_number: account_number, balance: 0}}
  end

  def apply_event(%{account_number: account} = event) when is_binary(account) do
    case Registry.lookup(Registry.AccountProjectors, account) do
      [{pid, _}] ->
        apply_event(pid, event)

      _ ->
        Logger.debug("Attempt to apply event to non-existent account, starting projector")
        {:ok, pid} = start_link(account)
        apply_event(pid, event)
    end
  end

  def apply_event(pid, event) when is_pid(pid) do
    GenServer.cast(pid, {:handle_event, event})
  end

  @impl true
  def handle_cast({:handle_event, evt}, state) do
    {:noreply, handle_event(state, evt)}
  end

  def handle_event(%{balance: bal} = s, %{event_type: :amount_withdrawn, value: v}) do
    %{s | balance: bal - v}
  end

  def handle_event(%{balance: bal} = s, %{event_type: :amount_deposited, value: v}) do
    %{s | balance: bal + v}
  end

  def handle_event(%{balance: bal} = s, %{event_type: :fee_applied, value: v}) do
    %{s | balance: bal - v}
  end

  def lookup_balance(account_number) when is_binary(account_number) do
    with [{pid, _}] <-
           Registry.lookup(Registry.AccountProjectors, account_number) do
      {:ok, get_balance(pid)}
    else
      _ ->
        {:error, :unknown_account}
    end
  end

  def get_balance(pid) do
    GenServer.call(pid, :get_balance)
  end

  @impl true
  def handle_call(:get_balance, _from, state) do
    {:reply, state.balance, state}
  end

  defp via(account_number) do
    {:via, Registry, {Registry.AccountProjectors, account_number}}
  end
end
{:ok, registry_pid} = Registry.start_link(keys: :unique, name: Registry.AccountProjectors)
Projectors.AccountBalance.apply_event(%{
  event_type: :amount_deposited,
  account_number: "NEWACCOUNT",
  value: 12
})
Projectors.AccountBalance.apply_event(%{
  event_type: :amount_deposited,
  account_number: "NEWACCOUNT",
  value: 30
})
Projectors.AccountBalance.lookup_balance("NEWACCOUNT")
Projectors.AccountBalance.lookup_balance("NOT-AN-ACCOUNT")

Projector - Leaderboard

defmodule Projectors.Leaderboard do
  use GenServer
  require Logger

  # Client Api
  def start_link() do
    GenServer.start_link(__MODULE__, nil)
  end

  def apply_event(pid, evt) do
    GenServer.cast(pid, {:handle_event, evt})
  end

  def get_top10(pid) do
    GenServer.call(pid, :get_top10)
  end

  def get_score(pid, attacker) do
    GenServer.call(pid, {:get_score, attacker})
  end

  # Callbacks
  @impl true
  def init(_) do
    {:ok, scores: %{}, top10: []}
  end

  @impl true
  def handle_call({:get_score, attacker}, _from, state) do
    {:reply, Map.get(state.scores, attacker, 0), state}
  end

  @impl true
  def handle_call(:get_top10, _from, state) do
    {:reply, state.top10, state}
  end

  @impl true
  def handle_cast({:handle_event, %{event_type: :zombie_killed, attacker: att}}, state) do
    new_scores = Map.update(state.scores, att, 1, &amp;(&amp;1 + 1))
    {:noreply, %{state | scores: new_scores, top10: rerank(new_scores)}}
  end

  @impl true
  def handle_cast({:handle_event, %{event_type: :week_completed}}, _state) do
    {:noreply, %{scores: %{}, top10: []}}
  end

  defp rerank(scores) when is_map(scores) do
    scores
    |> Map.to_list()
    |> Enum.sort(fn {_k1, val1}, {_k2, val2} ->
      val1 >= val2
    end)
    |> Enum.take(10)
  end
end