ES/CQRS’s Anatomy
Mix.install([
{:kino_db, "~> 0.2.8"},
{:postgrex, "~> 0.18.0"}
])
Introduction
What’s Event Sourcing?
Event sourcing is like a historical timeline for your data. Instead of storing a snapshot of the current state, event sourcing keeps a record of every change that has happened to your data. This allows you to reconstruct the data’s past state and track its evolution over time.
Think of it as a medical record for your data. Just as a doctor can review a patient’s medical history to diagnose an illness, you can use event sourcing to understand how your data has changed and identify any anomalies.
This approach is especially useful for applications that need strong audit trails, complex state tracking, or flexible scalability. However, it also comes with higher storage requirements and computational complexity.
graph LR;
A(Order Created) -->B(Item 1 Added) -->C(Item 2 Added) --> D(Order Paid) --> E(Order Shipped);
What’s CQRS?
CQRS (Command Query Responsibility Segregation) is like having two separate sets of instructions for your data: one for changing it (commands) and one for reading it (queries). This separation allows for specializing each set of instructions, making data manipulation and retrieval more efficient, especially for applications with high write or read loads.
Let’s put ALL TOGETHER
graph LR;
C(Command) -->|Invoked on| AGG([Aggregate]) -->|Generates|E(Event) -->|Stored to| ES[(Event Store)];
PR(Projector) -->|Read from| ES;
Q(Query) -->|Query to| PN([Projection]) -->|Retrive from| RS[(Read Store)];
PR -->|Write to| RS
A Command is routed to an Aggregate, that emit an Event. Then the event is stored into the Event Store. The Projector subscribe some events and build one or more table into the Read Store. The Projection retrieve data from Read Store based on a Query
What’s Event Storming
Event Storming is a collaborative workshop technique for understanding and designing complex business processes or software systems. It involves a visual approach where participants use sticky notes to represent events, commands, aggregates, and other key elements on a large workspace. The goal is to foster communication, shared understanding, and uncover insights into the dynamics of the system or process being modeled. Event Storming is commonly used in agile development, domain-driven design, and other contexts where a shared understanding among team members is crucial.
The picture that explains everything
Yet Another ERP
We put all the Domain Experts, Developers, Product Owners ecc in the same room and start doing a big picture event storming.
Then at the end we have a clear idea of what the system should do (or at least we think so).
Play with Orders
Let’s create an Order invoking the relative command and take the aggregate id from the command response, for the next steps.
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
aggregate_uuid =
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Commands.CreateOrder
alias EsCqrsAnatomy.Order.Commands.OrderItem
%CreateOrder{
id: Faker.UUID.v4(),
order_number: Faker.String.base64(5),
business_partner: Faker.Internet.email(),
items: [
%OrderItem{
product_id: Faker.UUID.v4(),
quantity: 1,
uom: "KG"
}
]
}
|> EsCqrsAnatomy.App.dispatch(include_execution_result: true)
|> elem(1)
|> Map.get(:aggregate_uuid)
""",
file: __ENV__.file
)
Create a connection to the Event Store
opts = [
hostname: "postgres",
port: 5432,
username: "postgres",
password: "postgres",
database: "event_store"
]
{:ok, conn} = Kino.start_child({Postgrex, opts})
Fetch all the event of the Aggregate.
Here we cheat a bit using the event payload to identify all the events related to our Aggregate.
We see the corect way to doi it soon
result_events =
Postgrex.query!(
conn,
"""
SELECT
event_id::text,
event_type,
data
FROM events
WHERE encode(data, 'escape')::json ->> 'id' = $1
""",
[aggregate_uuid]
)
Every Aggregate as it’s own Stream
result =
Postgrex.query!(
conn,
"""
SELECT
stream_id,
stream_uuid,
stream_version
FROM streams
WHERE stream_uuid = $1
""",
[aggregate_uuid]
)
Let’s invoke another command on the Aggregate
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Commands.CompleteOrder
%CompleteOrder{
id: aggregate_uuid
}
|> EsCqrsAnatomy.App.dispatch()
""",
file: __ENV__.file
)
Check the Stream now
stream =
Postgrex.query!(
conn,
"""
SELECT
stream_id,
stream_uuid,
stream_version
FROM streams
WHERE stream_uuid = $1
""",
[aggregate_uuid]
)
Take the Stream Id of out Aggregate
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
stream_id =
Kino.RPC.eval_string(
node,
~S"""
%{rows: [[stream_id | _]]} = stream
stream_id
""",
file: __ENV__.file
)
Check the event i that Stream
result4 =
Postgrex.query!(
conn,
"""
SELECT
event_id::text,
stream_id,
stream_version
FROM stream_events se
WHERE stream_id = $1
ORDER BY stream_version ASC
""",
[stream_id]
)
Put all together to see all events for a give Aggregate
result3 =
Postgrex.query!(
conn,
"""
SELECT
e.event_id::text,
event_type,
data
FROM events e
JOIN stream_events se
ON e.event_id = se.event_id
JOIN streams s
ON s.stream_id = se.stream_id
WHERE s.stream_uuid = $1
ORDER BY se.stream_version ASC
""",
[aggregate_uuid]
)
The C stand for Command
How a command is routed to the right Aggregate?
With the Router of course!
We define which fields are the key of the Aggregate and which Command should be routed to which Aggregate
Note! We have removed CreateOrder intentionally. Let’s see what happen…
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Router do
use Commanded.Commands.Router
alias EsCqrsAnatomy.Order.Aggregate.Order
alias EsCqrsAnatomy.Order.Commands.{CompleteOrder}
identify(Order, by: :id)
dispatch([CompleteOrder], to: Order)
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
command_result =
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Commands.CreateOrder
alias EsCqrsAnatomy.Order.Commands.OrderItem
%CreateOrder{
id: Faker.UUID.v4(),
order_number: Faker.String.base64(5),
business_partner: Faker.Internet.email(),
items: [
%OrderItem{
product_id: Faker.UUID.v4(),
quantity: 1,
uom: "KG"
}
]
}
|> EsCqrsAnatomy.App.dispatch()
""",
file: __ENV__.file
)
Cool! The Command is not registered to any Aggregate
Middlewares
Middlewares are component used to perform the follow tasks:
- Command Validation (static and dynamic)
- Uniqueness Check
- Command Enrichment
Note! This time we’ve added some Middlewares before to reach the Aggregate
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Router do
use Commanded.Commands.Router
alias EsCqrsAnatomy.Middleware.{Validate, Enrichment}
alias Commanded.Middleware.Uniqueness
alias EsCqrsAnatomy.Order.Aggregate.Order
alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder}
middleware(Enrichment)
middleware(Validate)
middleware(Uniqueness)
identify(Order, by: :id)
dispatch([CreateOrder, CompleteOrder], to: Order)
end
""",
file: __ENV__.file
)
Command Validation
we can do check on mandatory fields and the relative format.
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Order.Commands.CreateOrder do
use TypedStruct
use Vex.Struct
use StructAccess
alias EsCqrsAnatomy.Order.Commands.OrderItem
@derive Jason.Encoder
typedstruct enforce: true do
field(:id, String.t())
field(:order_number, String.t())
field(:business_partner, String.t())
field(:items, list(OrderItem))
end
use ExConstructor
validates(:id, presence: true, uuid: true)
validates(:order_number, presence: true, string: true)
validates(:business_partner, presence: true, email: true)
validates(:items, presence: true, list_of_structs: true)
end
defmodule EsCqrsAnatomy.Order.Commands.OrderItem do
use TypedStruct
use Vex.Struct
use StructAccess
@derive Jason.Encoder
typedstruct enforce: true do
field(:product_id, String.t())
field(:quantity, String.t())
field(:uom, String.t())
end
use ExConstructor
validates(:product_id, presence: true, uuid: true)
validates(:quantity, presence: true, number: [greater_than: 0])
validates(:uom, presence: true, inclusion: ["KG", "MT"])
end
""",
file: __ENV__.file
)
We can validate standard format like String, Number ecc or create custom validator. In this example we have two custom validator for UUID and Email
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Validator.Email do
use Vex.Validator
def validate(value, _options) when is_binary(value) do
if String.match?(value, regexp()) do
:ok
else
{:error, "must be a valid email"}
end
end
def validate(_, _options), do: {:error, "must be a valid email"}
defp regexp(),
do: ~r/^[_A-Za-z0-9-\+]+(\.[_A-Za-z0-9-]+)*@[A-Za-z0-9-]+(\.[A-Za-z0-9]+)*(\.[A-Za-z]{2,})$/
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Commands.CreateOrder
alias EsCqrsAnatomy.Order.Commands.OrderItem
%CreateOrder{
id: Faker.UUID.v4(),
order_number: 12345,
business_partner: "i_wanna_be_an_email",
items: [
%OrderItem{
product_id: "Pen",
quantity: 0,
uom: "LT"
}
]
}
|> EsCqrsAnatomy.App.dispatch()
""",
file: __ENV__.file
)
Uniqueness Check
The order id is what it’s called technical key. What identify an order from a business prospective (business key), it’s the order number.
So we must avoid to have 2 orders with the same order number.
How can we handle this cross aggregates rule?
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Projections.Order
alias EsCqrsAnatomy.Order.Commands.CreateOrder
defimpl Commanded.Middleware.Uniqueness.UniqueFields,
for: EsCqrsAnatomy.Order.Commands.CreateOrder do
def unique(%CreateOrder{order_number: order_number}),
do: [
{:order_number, "already exist", order_number,
ignore_case: false, is_unique: &order_number_is_unique?/4, order_number: order_number}
]
def order_number_is_unique?(_field, value, _owner, _opts),
do: Order.order_number_is_unique?(value)
end
""",
file: __ENV__.file
)
The first Command that arrive, check if the order number already exist into the Projection. If it’s available, it store the value in a cache to keep locked the value for a give time. In this way another Command, find the order number into the cache and the command is rejected
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
already_existing_order_number =
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Projections.Order
alias EsCqrsAnatomy.Order.Commands.CreateOrder
alias EsCqrsAnatomy.Order.Commands.OrderItem
already_existing_order_number = Order.first_order_number() |> List.first()
%CreateOrder{
id: Faker.UUID.v4(),
order_number: already_existing_order_number,
business_partner: Faker.Internet.email(),
items: [
%OrderItem{
product_id: Faker.UUID.v4(),
quantity: 1,
uom: "KG"
}
]
}
|> EsCqrsAnatomy.App.dispatch()
""",
file: __ENV__.file
)
Command Enrichment
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Order.Commands.CompleteOrder do
use TypedStruct
use Vex.Struct
@derive Jason.Encoder
typedstruct enforce: true do
field(:id, String.t())
field(:blocked_product_ids, list(String.t()), enforce: false)
end
validates(:id, presence: true, uuid: true)
end
alias EsCqrsAnatomy.Middleware.Enrichment.EnrichmentProtocol
alias EsCqrsAnatomy.Order.Commands.CompleteOrder
alias EsCqrsAnatomy.Order.Constants
alias EsCqrsAnatomy.Order.Projections.OrderItem
defimpl EnrichmentProtocol, for: CompleteOrder do
def enrich(%CompleteOrder{} = command) do
%CompleteOrder{id: id} = command
products_in_order = OrderItem.products_in_order(id)
command = %CompleteOrder{
command
| blocked_product_ids: lookup_external_data(products_in_order)
}
{:ok, command}
end
defp lookup_external_data(products_in_order) do
if Enum.member?(products_in_order, Constants.blocked_product_id()) do
[Constants.blocked_product_id()]
else
[]
end
end
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
order_id_to_block =
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Commands.{CreateOrder}
alias EsCqrsAnatomy.Order.Commands.OrderItem
alias EsCqrsAnatomy.Order.Constants
order_id_to_block =
%CreateOrder{
id: Faker.UUID.v4(),
order_number: Faker.String.base64(5),
business_partner: Faker.Internet.email(),
items: [
%OrderItem{
product_id: Constants.blocked_product_id(),
quantity: 1,
uom: "KG"
}
]
}
|> EsCqrsAnatomy.App.dispatch(include_execution_result: true)
|> elem(1)
|> Map.get(:aggregate_uuid)
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Commands.{CompleteOrder}
%CompleteOrder{
id: order_id_to_block
}
|> EsCqrsAnatomy.App.dispatch()
""",
file: __ENV__.file
)
Finally the Aggregate
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Order.Aggregate.Order do
use TypedStruct
alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder, DeleteOrder}
alias EsCqrsAnatomy.Order.Events.{OrderCreated, OrderCompleted, OrderDeleted}
alias EsCqrsAnatomy.Order.Aggregate.OrderStatus
@derive Jason.Encoder
typedstruct enforce: true do
field(:id, String.t())
field(:order_number, String.t())
field(:business_partner, String.t())
field(:status, String.t())
end
def execute(%__MODULE__{id: nil}, %CreateOrder{} = command) do
%OrderCreated{
id: command.id,
order_number: command.order_number,
business_partner: command.business_partner,
items: command.items
}
end
def execute(%__MODULE__{}, %CreateOrder{}), do: {:error, "Order already created"}
def execute(%__MODULE__{status: "COMPLETED"}, %CompleteOrder{}),
do: {:error, "Order already completed"}
def execute(%__MODULE__{}, %CompleteOrder{blocked_product_ids: blocked_product_ids})
when length(blocked_product_ids) > 0 do
{:error, "Order contains blocked products: #{IO.inspect(blocked_product_ids)}"}
end
def execute(%__MODULE__{id: id}, %CompleteOrder{}), do: %OrderCompleted{id: id}
def execute(%__MODULE__{id: id}, %DeleteOrder{}), do: %OrderDeleted{id: id}
def apply(%__MODULE__{} = order, %OrderCreated{} = event) do
%__MODULE__{
order
| id: event.id,
order_number: event.order_number,
business_partner: event.business_partner,
status: OrderStatus.open()
}
end
def apply(%__MODULE__{} = order, %OrderCompleted{} = event) do
%__MODULE__{
order
| id: event.id,
status: OrderStatus.completed()
}
end
def apply(%__MODULE__{} = order, _), do: order
end
""",
file: __ENV__.file
)
Event Handlers
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Router do
use Commanded.Commands.Router
alias EsCqrsAnatomy.Middleware.{Validate, Enrichment}
alias Commanded.Middleware.Uniqueness
alias EsCqrsAnatomy.Order.Aggregate.Order
alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder}
alias EsCqrsAnatomy.Shipment.Aggregate.Shipment
alias EsCqrsAnatomy.Shipment.Commands.{CreateShipment, CompleteShipment}
middleware(Enrichment)
middleware(Validate)
middleware(Uniqueness)
identify(Order, by: :id)
identify(Shipment, by: :id)
dispatch([CreateOrder, CompleteOrder], to: Order)
dispatch([CreateShipment, CompleteShipment], to: Shipment)
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Shipment.Policies.Shipment do
use Commanded.Event.Handler,
application: EsCqrsAnatomy.App,
name: "shipment",
start_from: :current
use EsCqrsAnatomy.Base.EventHandler
alias EsCqrsAnatomy.Order.Events.OrderCompleted
alias EsCqrsAnatomy.Shipment.Commands.CreateShipment
def handle(%OrderCompleted{id: id}, %{
event_id: causation_id,
correlation_id: correlation_id
}) do
%CreateShipment{
id: UUID.uuid4(),
order_id: id
}
|> EsCqrsAnatomy.App.dispatch(causation_id: causation_id, correlation_id: correlation_id)
end
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.EventHandlerSupervisor do
use Supervisor
def start_link(_args) do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_args) do
Supervisor.init(
[
EsCqrsAnatomy.Order.Projectors.Orders,
EsCqrsAnatomy.Shipment.Policies.Shipment
],
strategy: :one_for_one
)
end
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"Process.whereis(EsCqrsAnatomy.EventHandlerSupervisor) |> Process.exit(:kill)",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
order_id =
Kino.RPC.eval_string(
node,
~S"""
alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder}
alias EsCqrsAnatomy.Order.Commands.OrderItem
alias EsCqrsAnatomy.Order.Constants
order_id =
%CreateOrder{
id: Faker.UUID.v4(),
order_number: Faker.String.base64(5),
business_partner: Faker.Internet.email(),
items: [
%OrderItem{
product_id: Faker.UUID.v4(),
quantity: 1,
uom: "KG"
}
]
}
|> EsCqrsAnatomy.App.dispatch(include_execution_result: true)
|> elem(1)
|> Map.get(:aggregate_uuid)
%CompleteOrder{
id: order_id
}
|> EsCqrsAnatomy.App.dispatch()
order_id
""",
file: __ENV__.file
)
result5 =
Postgrex.query!(
conn,
"""
SELECT
event_id::text,
event_type,
data
FROM events
WHERE event_type = 'Shipment.ShipmentCreated'
AND encode(data, 'escape')::json ->> 'order_id' = $1
""",
[order_id]
)
result6 =
Postgrex.query!(
conn,
"""
SELECT
subscription_name,
stream_uuid,
last_seen
FROM subscriptions
WHERE subscription_name = 'shipment'
""",
[]
)
result7 =
Postgrex.query!(
conn,
"""
SELECT
stream_id,
stream_uuid,
stream_version
FROM streams
ORDER BY stream_id ASC
""",
[]
)
result8 =
Postgrex.query!(
conn,
"""
SELECT
e.event_type,
se.stream_id,
se.stream_version,
se.original_stream_id,
se.original_stream_version
FROM stream_events se
JOIN events e
ON se.event_id = e.event_id
ORDER BY
se.original_stream_id ASC,
se.original_stream_version ASC,
se.stream_version ASC,
se.stream_id DESC
""",
[]
)
Projector
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Order.Projectors.Orders do
use Commanded.Projections.Ecto,
application: EsCqrsAnatomy.App,
repo: EsCqrsAnatomy.Repo,
name: "orders"
use EsCqrsAnatomy.Base.EventHandler
alias EsCqrsAnatomy.Order.Events.{
OrderCreated,
OrderCompleted,
OrderDeleted
}
alias EsCqrsAnatomy.Order.Projections.{Order, OrderItem}
alias EsCqrsAnatomy.Order.Aggregate.OrderStatus
project(
%OrderCreated{} = event,
_metadata,
fn multi ->
multi
|> Ecto.Multi.insert(
:orders,
%Order{
id: event.id,
order_number: event.order_number,
business_partner: event.business_partner,
status: OrderStatus.open(),
items:
event.items
|> Enum.map(
&%OrderItem{
id: UUID.uuid4(),
order_id: event.id,
product_id: &1.product_id,
quantity: &1.quantity,
uom: &1.uom
}
)
}
)
end
)
project(
%OrderCompleted{id: id},
_metadata,
fn multi ->
multi
|> Ecto.Multi.run(:order_to_update, fn repo, _changes ->
{:ok, repo.get(Order, id)}
end)
|> Ecto.Multi.update(:order, fn %{order_to_update: order} ->
Ecto.Changeset.change(order,
status: OrderStatus.completed()
)
end)
end
)
project(
%OrderDeleted{id: id},
_metadata,
fn multi ->
multi
|> Ecto.Multi.run(:order_to_delete, fn repo, _changes ->
{rows_deleted, _} = from(o in Order, where: o.id == ^id) |> repo.delete_all()
{:ok, rows_deleted}
end)
end
)
end
""",
file: __ENV__.file
)
opts = [
hostname: "postgres",
port: 5432,
username: "postgres",
password: "postgres",
database: "read_store"
]
{:ok, conn} = Kino.start_child({Postgrex, opts})
Remember to switch to conn_rs
result9 =
Postgrex.query!(
conn,
"""
SELECT
o.order_number,
o.business_partner,
o.status,
oi.product_id,
oi.quantity,
oi.uom
FROM orders o
JOIN order_items oi
ON o.id = oi.order_id
ORDER BY
o.order_number,
oi.quantity
""",
[]
)
projection_version =
Postgrex.query!(
conn,
"""
select
projection_name,
last_seen_event_number
from projection_versions
where projection_name = 'orders'
""",
[]
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
last_seen_event_number =
Kino.RPC.eval_string(
node,
~S"""
%{rows: [[_ , last_seen_event_number]]} = projection_version
last_seen_event_number
""",
file: __ENV__.file
)
opts = [
hostname: "postgres",
port: 5432,
username: "postgres",
password: "postgres",
database: "event_store"
]
{:ok, conn} = Kino.start_child({Postgrex, opts})
result11 =
Postgrex.query!(
conn,
"""
SELECT
event_type,
data
FROM events e
JOIN stream_events se
ON e.event_id = se.event_id
WHERE
se.stream_id = 0
AND se.stream_version = $1
""",
[last_seen_event_number]
)
Process Manager (aka Saga)
Performance
Stream Linking
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.Policies.StreamLinker do
use Commanded.Event.Handler,
application: EsCqrsAnatomy.App,
name: "stream_linker",
start_from: :origin
require Logger
alias Commanded.Event.FailureContext
alias EsCqrsAnatomy.Order.Events.{OrderCreated, OrderCompleted}
def handle(%OrderCreated{}, %{event_id: event_id}) do
orders_stream_id = UUID.uuid5(:oid, "orders")
EsCqrsAnatomy.EventStore.link_to_stream(orders_stream_id, :any_version, [event_id])
end
def handle(%OrderCompleted{}, %{event_id: event_id}) do
orders_stream_id = UUID.uuid5(:oid, "orders")
EsCqrsAnatomy.EventStore.link_to_stream(orders_stream_id, :any_version, [event_id])
end
def error({:error, reason}, event, %FailureContext{context: context}) do
context = record_failure(context)
case Map.get(context, :failures) do
too_many when too_many >= 3 ->
# skip bad event after third failure
Logger.error(
"#{__MODULE__} Skipping bad event, too many failures: #{inspect(event)} for reason: #{inspect(reason)}"
)
:skip
_ ->
# retry event, failure count is included in context map
{:retry, context}
end
end
defp record_failure(context) do
Map.update(context, :failures, 1, fn failures -> failures + 1 end)
end
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
defmodule EsCqrsAnatomy.EventHandlerSupervisor do
use Supervisor
def start_link(_args) do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_args) do
Supervisor.init(
[
EsCqrsAnatomy.Order.Projectors.Orders,
EsCqrsAnatomy.Shipment.Policies.Shipment,
EsCqrsAnatomy.Policies.StreamLinker
],
strategy: :one_for_one
)
end
end
""",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
Kino.RPC.eval_string(
node,
~S"Process.whereis(EsCqrsAnatomy.EventHandlerSupervisor) |> Process.exit(:kill)",
file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_COOKIE")))
orders_stream_id =
Kino.RPC.eval_string(node, ~S"orders_stream_id = UUID.uuid5(:oid, \"orders\")",
file: __ENV__.file
)
result13 =
Postgrex.query!(
conn,
"""
SELECT
stream_uuid,
stream_version
FROM streams
WHERE stream_uuid = $1
""",
[orders_stream_id]
)
Event Handlers Pool
Aggregate Snapshoot
config :es_cqrs_anatomy, EsCqrsAnatomy.App,
snapshotting: %{
EsCqrsAnatomy.Order.Aggregate.Order => [
snapshot_every: 1,
snapshot_version: 1
]
}
result12 =
Postgrex.query!(
conn,
"""
SELECT
s.source_version,
s.source_type,
s.metadata,
s.data
FROM snapshots s
""",
[]
)