ES/CQRS’s Anatomy
Mix.install([
{:kino_db, "~> 0.2.13"},
{:postgrex, "~> 0.19.1"}
])
Introduction
docker compose:
What’s Event Sourcing?
Event sourcing is like a historical timeline for your data. Instead of storing a snapshot of the current state, event sourcing keeps a record of every change that has happened to your data. This allows you to reconstruct the data’s past state and track its evolution over time.
Think of it as a medical record for your data. Just as a doctor can review a patient’s medical history to diagnose an illness, you can use event sourcing to understand how your data has changed and identify any anomalies.
This approach is especially useful for applications that need strong audit trails, complex state tracking, or flexible scalability. However, it also comes with higher storage requirements and computational complexity.
graph LR;
A(Order Created) -->B(Item 1 Added) -->C(Item 2 Added) --> D(Order Paid) --> E(Order Shipped);
What’s CQRS?
CQRS (Command Query Responsibility Segregation) is like having two separate sets of instructions for your data: one for changing it (commands) and one for reading it (queries). This separation allows for specializing each set of instructions, making data manipulation and retrieval more efficient, especially for applications with high write or read loads.
Let’s put ALL TOGETHER
graph LR;
C(Command) -->|Invoked on| AGG([Aggregate]) -->|Generates|E(Event) -->|Stored to| ES[(Event Store)];
PR(Projector) -->|Read from| ES;
Q(Query) -->|Query to| PN([Projection]) -->|Retrive from| RS[(Read Store)];
PR -->|Write to| RS
A Command is routed to an Aggregate, that emit an Event. Then the event is stored into the Event Store. The Projector subscribe some events and build one or more table into the Read Store. The Projection retrieve data from Read Store based on a Query
Ok, but why all this complex stuff?
Elixir for its actor model and the pattern match that fix very We’ll this job
Event Sourcing for its auditing built in, time traveling and new projections with an historical deep
CQRS to scale the read and write using the best database for out need
What’s Event Storming
Event Storming is a collaborative workshop technique for understanding and designing complex business processes or software systems. It involves a visual approach where participants use sticky notes to represent events, commands, aggregates, and other key elements on a large workspace. The goal is to foster communication, shared understanding, and uncover insights into the dynamics of the system or process being modeled. Event Storming is commonly used in agile development, domain-driven design, and other contexts where a shared understanding among team members is crucial.
The picture that explains everything
Yet Another ERP
We put all the Domain Experts, Developers, Product Owners ecc in the same room and start doing a big picture event storming.
Then at the end we have a clear idea of what the system should do (or at least we think so).
Let’s play with Orders
Let’s create an Order invoking the relative command and take the aggregate id from the command response, for the next steps.
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
aggregate_uuid =
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Commands.CreateOrder
alias EsCqrsAnatomy.Order.Commands.OrderItem
%CreateOrder{
id: Faker.UUID.v4(),
order_number: Faker.String.base64(5),
business_partner: Faker.Internet.email(),
items: [
%OrderItem{
product_id: Faker.UUID.v4(),
quantity: 1,
uom: "KG"
}
]
}
|> EsCqrsAnatomy.App.dispatch(include_execution_result: true)
|> elem(1)
|> Map.get(:aggregate_uuid)
""",
file: __ENV__.file
)
Create a connection to the Event Store
opts = [
hostname: "postgres",
port: 5432,
username: "postgres",
password: "postgres",
database: "event_store"
]
{:ok, conn} = Kino.start_child({Postgrex, opts})
Fetch all the event of the Aggregate.
Here we cheat a bit using the event payload to identify all the events related to our Aggregate.
We see the corect way to doi it soon
result_events =
Postgrex.query!(
conn,
~S"""
SELECT
event_id::text,
event_type,
data
FROM events
WHERE encode(data, 'escape')::json ->> 'id' = $1
""",
[aggregate_uuid]
)
Every Aggregate as it’s own Stream
result =
Postgrex.query!(
conn,
~S"""
SELECT
stream_id,
stream_uuid,
stream_version
FROM streams
WHERE stream_uuid = $1
""",
[aggregate_uuid]
)
Let’s invoke another command on the Aggregate
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Commands.CompleteOrder
%CompleteOrder{
id: aggregate_uuid
}
|> EsCqrsAnatomy.App.dispatch()
""",
file: __ENV__.file
)
Check the Stream now
stream =
Postgrex.query!(
conn,
~S"""
SELECT
stream_id,
stream_uuid,
stream_version
FROM streams
WHERE stream_uuid = $1
""",
[aggregate_uuid]
)
Take the Stream Id of out Aggregate
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
stream_id =
Kino.RPC.eval_string(
node,
~S"""
%{rows: [[stream_id | _]]} = stream
stream_id
""",
file: __ENV__.file
)
Check the event i that Stream
result4 =
Postgrex.query!(
conn,
~S"""
SELECT
event_id::text,
stream_id,
stream_version
FROM stream_events se
WHERE stream_id = $1
ORDER BY stream_version ASC
""",
[stream_id]
)
Put all together to see all events for a give Aggregate
result3 =
Postgrex.query!(
conn,
~S"""
SELECT
e.event_id::text,
event_type,
data
FROM events e
JOIN stream_events se
ON e.event_id = se.event_id
JOIN streams s
ON s.stream_id = se.stream_id
WHERE s.stream_uuid = $1
ORDER BY se.stream_version ASC
""",
[aggregate_uuid]
)
The C stand for Command
How a command is routed to the right Aggregate?
With the Router of course!
We define which fields are the key of the Aggregate and which Command should be routed to which Aggregate
Note! We have removed CreateOrder intentionally. Let’s see what happen…
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Router do
use Commanded.Commands.Router
alias EsCqrsAnatomy.Order.Aggregate.Order
alias EsCqrsAnatomy.Order.Commands.{CompleteOrder}
identify(Order, by: :id)
dispatch([CompleteOrder], to: Order)
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
command_result =
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Commands.CreateOrder
alias EsCqrsAnatomy.Order.Commands.OrderItem
%CreateOrder{
id: Faker.UUID.v4(),
order_number: Faker.String.base64(5),
business_partner: Faker.Internet.email(),
items: [
%OrderItem{
product_id: Faker.UUID.v4(),
quantity: 1,
uom: "KG"
}
]
}
|> EsCqrsAnatomy.App.dispatch()
""",
file: __ENV__.file
)
Cool! The Command is not registered to any Aggregate
Finally the Aggregate
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Order.Aggregate.Order do
use TypedStruct
alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder, DeleteOrder}
alias EsCqrsAnatomy.Order.Events.{OrderCreated, OrderCompleted, OrderDeleted}
alias EsCqrsAnatomy.Order.Aggregate.OrderStatus
@derive Jason.Encoder
typedstruct enforce: true do
field(:id, String.t())
field(:status, String.t())
end
def execute(%__MODULE__{id: nil}, %CreateOrder{} = command) do
%OrderCreated{
id: command.id,
order_number: command.order_number,
business_partner: command.business_partner,
items: command.items
}
end
def execute(%__MODULE__{}, %CreateOrder{}), do: {:error, "Order already created"}
def execute(%__MODULE__{status: "COMPLETED"}, %CompleteOrder{}),
do: {:error, "Order already completed"}
def execute(%__MODULE__{}, %CompleteOrder{blocked_product_ids: blocked_product_ids})
when length(blocked_product_ids) > 0 do
{:error, "Order contains blocked products: #{IO.inspect(blocked_product_ids)}"}
end
def execute(%__MODULE__{id: id}, %CompleteOrder{}), do: %OrderCompleted{id: id}
def execute(%__MODULE__{id: id}, %DeleteOrder{}), do: %OrderDeleted{id: id}
def apply(%__MODULE__{} = order, %OrderCreated{} = event) do
%__MODULE__{
order
| id: event.id,
status: OrderStatus.open()
}
end
def apply(%__MODULE__{} = order, %OrderCompleted{} = event) do
%__MODULE__{
order | status: OrderStatus.completed()
}
end
def apply(%__MODULE__{} = order, _), do: order
end
""",
file: __ENV__.file
)
Event Handlers
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Router do
use Commanded.Commands.Router
alias EsCqrsAnatomy.Middleware.{Validate, Enrichment}
alias Commanded.Middleware.Uniqueness
alias EsCqrsAnatomy.Order.Aggregate.Order
alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder}
alias EsCqrsAnatomy.Shipment.Aggregate.Shipment
alias EsCqrsAnatomy.Shipment.Commands.{CreateShipment, CompleteShipment}
middleware(Enrichment)
middleware(Validate)
middleware(Uniqueness)
identify(Order, by: :id)
identify(Shipment, by: :id)
dispatch([CreateOrder, CompleteOrder], to: Order)
dispatch([CreateShipment, CompleteShipment], to: Shipment)
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Shipment.Policies.Shipment do
use Commanded.Event.Handler,
application: EsCqrsAnatomy.App,
name: "shipment",
start_from: :current
use EsCqrsAnatomy.Base.EventHandler
alias EsCqrsAnatomy.Order.Events.OrderCompleted
alias EsCqrsAnatomy.Shipment.Commands.CreateShipment
def handle(%OrderCompleted{id: id}, %{
event_id: causation_id,
correlation_id: correlation_id
}) do
%CreateShipment{
id: UUID.uuid4(),
order_id: id
}
|> EsCqrsAnatomy.App.dispatch(causation_id: causation_id, correlation_id: correlation_id)
end
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.EventHandlerSupervisor do
use Supervisor
def start_link(_args) do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_args) do
Supervisor.init(
[
EsCqrsAnatomy.Order.Projectors.Orders,
EsCqrsAnatomy.Shipment.Policies.Shipment
],
strategy: :one_for_one
)
end
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
Kino.RPC.eval_string(
node,
~S"Process.whereis(EsCqrsAnatomy.EventHandlerSupervisor) |> Process.exit(:kill)",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
order_id =
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder}
alias EsCqrsAnatomy.Order.Commands.OrderItem
alias EsCqrsAnatomy.Order.Constants
order_id =
%CreateOrder{
id: Faker.UUID.v4(),
order_number: Faker.String.base64(5),
business_partner: Faker.Internet.email(),
items: [
%OrderItem{
product_id: Faker.UUID.v4(),
quantity: 1,
uom: "KG"
}
]
}
|> EsCqrsAnatomy.App.dispatch(include_execution_result: true)
|> elem(1)
|> Map.get(:aggregate_uuid)
%CompleteOrder{
id: order_id
}
|> EsCqrsAnatomy.App.dispatch()
order_id
""",
file: __ENV__.file
)
result5 =
Postgrex.query!(
conn,
~S"""
SELECT
event_id::text,
event_type,
data
FROM events
WHERE event_type = 'Shipment.ShipmentCreated'
AND encode(data, 'escape')::json ->> 'order_id' = $1
""",
[order_id]
)
Projector
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Order.Projectors.Orders do
use Commanded.Projections.Ecto,
application: EsCqrsAnatomy.App,
repo: EsCqrsAnatomy.Repo,
name: "orders"
use EsCqrsAnatomy.Base.EventHandler
alias EsCqrsAnatomy.Order.Events.{
OrderCreated,
OrderCompleted,
OrderDeleted
}
alias EsCqrsAnatomy.Order.Projections.{Order, OrderItem}
alias EsCqrsAnatomy.Order.Aggregate.OrderStatus
project(
%OrderCreated{} = event,
_metadata,
fn multi ->
multi
|> Ecto.Multi.insert(
:orders,
%Order{
id: event.id,
order_number: event.order_number,
business_partner: event.business_partner,
status: OrderStatus.open(),
items:
event.items
|> Enum.map(
&%OrderItem{
id: UUID.uuid4(),
order_id: event.id,
product_id: &1.product_id,
quantity: &1.quantity,
uom: &1.uom
}
)
}
)
end
)
project(
%OrderCompleted{id: id},
_metadata,
fn multi ->
multi
|> Ecto.Multi.run(:order_to_update, fn repo, _changes ->
{:ok, repo.get(Order, id)}
end)
|> Ecto.Multi.update(:order, fn %{order_to_update: order} ->
Ecto.Changeset.change(order,
status: OrderStatus.completed()
)
end)
end
)
project(
%OrderDeleted{id: id},
_metadata,
fn multi ->
multi
|> Ecto.Multi.run(:order_to_delete, fn repo, _changes ->
{rows_deleted, _} = from(o in Order, where: o.id == ^id) |> repo.delete_all()
{:ok, rows_deleted}
end)
end
)
end
""",
file: __ENV__.file
)
opts = [
hostname: "postgres",
port: 5432,
username: "postgres",
password: "postgres",
database: "read_store"
]
{:ok, conn} = Kino.start_child({Postgrex, opts})
result9 =
Postgrex.query!(
conn,
~S"""
SELECT
o.order_number,
o.business_partner,
o.status,
oi.product_id,
oi.quantity,
oi.uom
FROM orders o
JOIN order_items oi
ON o.id = oi.order_id
ORDER BY
o.order_number,
oi.quantity
""",
[]
)
Performance
Aggregate Snapshoot
opts = [
hostname: "postgres",
port: 5432,
username: "postgres",
password: "postgres",
database: "event_store"
]
{:ok, conn} = Kino.start_child({Postgrex, opts})
config :es_cqrs_anatomy, EsCqrsAnatomy.App,
snapshotting: %{
EsCqrsAnatomy.Order.Aggregate.Order => [
snapshot_every: 1,
snapshot_version: 1
]
}
result12 =
Postgrex.query!(
conn,
~S"""
SELECT
s.source_version,
s.source_type,
s.metadata,
s.data
FROM snapshots s
""",
[]
)