Real-World Event Sourcing
Mix.install([
{:kino, "~> 0.14.0"}
])
Event Sourced Calculator
defmodule EventSourcedCalculator.V3 do
@max_state_value 10_000
@min_state_value 0
def handle_command(%{value: val}, %{cmd: :add, value: v}) do
%{event_type: :value_added, value: min(@max_state_value - val, v)}
end
def handle_command(%{value: val}, %{cmd: :sub, value: v}) do
%{event_type: :value_subtracted, value: max(@min_state_value, val - v)}
end
def handle_command(%{value: val}, %{cmd: :mul, value: v}) when val * v > @max_state_value do
{:error, :mul_failed}
end
def handle_command(%{value: _val}, %{cmd: :mul, value: v}) do
%{event_type: :value_multiplied, value: v}
end
def handle_command(%{value: _val}, %{cmd: :div, value: 0}) do
{:error, :divide_failed}
end
def handle_command(%{value: _val}, %{cmd: :div, value: v}) do
%{event_type: :value_divided, value: v}
end
def handle_event(%{value: val}, %{event_type: :value_added, value: v}) do
%{value: val + v}
end
def handle_event(%{value: val}, %{event_type: :value_subtracted, value: v}) do
%{value: val - v}
end
def handle_event(%{value: val}, %{event_type: :value_multiplied, value: v}) do
%{value: val * v}
end
def handle_event(%{value: val}, %{event_type: :value_divided, value: v}) do
%{value: val / v}
end
def handle_event(%{value: _val}, %{}) do
%{}
end
def handle_event(%{value: _val} = state, _) do
state
end
end
cmds = [
%{value: 50, cmd: :add},
%{value: 10, cmd: :add},
%{value: 0, cmd: :div},
%{value: 2, cmd: :add}
]
initial = %{value: 0}
cmds
|> List.foldl(initial, fn cmd, acc ->
EventSourcedCalculator.V3.handle_event(
acc,
EventSourcedCalculator.V3.handle_command(acc, cmd) |> IO.inspect()
)
|> IO.inspect()
end)
Projector - Bank Account
defmodule Projectors.AccountBalance do
use GenServer
require Logger
def start_link(account_number) do
GenServer.start_link(__MODULE__, account_number, name: via(account_number))
end
@impl true
def init(account_number) do
{:ok, %{account_number: account_number, balance: 0}}
end
def apply_event(%{account_number: account} = event) when is_binary(account) do
case Registry.lookup(Registry.AccountProjectors, account) do
[{pid, _}] ->
apply_event(pid, event)
_ ->
Logger.debug("Attempt to apply event to non-existent account, starting projector")
{:ok, pid} = start_link(account)
apply_event(pid, event)
end
end
def apply_event(pid, event) when is_pid(pid) do
GenServer.cast(pid, {:handle_event, event})
end
@impl true
def handle_cast({:handle_event, evt}, state) do
{:noreply, handle_event(state, evt)}
end
def handle_event(%{balance: bal} = s, %{event_type: :amount_withdrawn, value: v}) do
%{s | balance: bal - v}
end
def handle_event(%{balance: bal} = s, %{event_type: :amount_deposited, value: v}) do
%{s | balance: bal + v}
end
def handle_event(%{balance: bal} = s, %{event_type: :fee_applied, value: v}) do
%{s | balance: bal - v}
end
def lookup_balance(account_number) when is_binary(account_number) do
with [{pid, _}] <-
Registry.lookup(Registry.AccountProjectors, account_number) do
{:ok, get_balance(pid)}
else
_ ->
{:error, :unknown_account}
end
end
def get_balance(pid) do
GenServer.call(pid, :get_balance)
end
@impl true
def handle_call(:get_balance, _from, state) do
{:reply, state.balance, state}
end
defp via(account_number) do
{:via, Registry, {Registry.AccountProjectors, account_number}}
end
end
{:ok, registry_pid} = Registry.start_link(keys: :unique, name: Registry.AccountProjectors)
Projectors.AccountBalance.apply_event(%{
event_type: :amount_deposited,
account_number: "NEWACCOUNT",
value: 12
})
Projectors.AccountBalance.apply_event(%{
event_type: :amount_deposited,
account_number: "NEWACCOUNT",
value: 30
})
Projectors.AccountBalance.lookup_balance("NEWACCOUNT")
Projectors.AccountBalance.lookup_balance("NOT-AN-ACCOUNT")
Projector - Leaderboard
defmodule Projectors.Leaderboard do
use GenServer
require Logger
# Client Api
def start_link() do
GenServer.start_link(__MODULE__, nil)
end
def apply_event(pid, evt) do
GenServer.cast(pid, {:handle_event, evt})
end
def get_top10(pid) do
GenServer.call(pid, :get_top10)
end
def get_score(pid, attacker) do
GenServer.call(pid, {:get_score, attacker})
end
# Callbacks
@impl true
def init(_) do
{:ok, scores: %{}, top10: []}
end
@impl true
def handle_call({:get_score, attacker}, _from, state) do
{:reply, Map.get(state.scores, attacker, 0), state}
end
@impl true
def handle_call(:get_top10, _from, state) do
{:reply, state.top10, state}
end
@impl true
def handle_cast({:handle_event, %{event_type: :zombie_killed, attacker: att}}, state) do
new_scores = Map.update(state.scores, att, 1, &(&1 + 1))
{:noreply, %{state | scores: new_scores, top10: rerank(new_scores)}}
end
@impl true
def handle_cast({:handle_event, %{event_type: :week_completed}}, _state) do
{:noreply, %{scores: %{}, top10: []}}
end
defp rerank(scores) when is_map(scores) do
scores
|> Map.to_list()
|> Enum.sort(fn {_k1, val1}, {_k2, val2} ->
val1 >= val2
end)
|> Enum.take(10)
end
end