Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

ES/CQRS's Anatomy

short-es-cqrs-anatomy.livemd

ES/CQRS’s Anatomy

Mix.install([
  {:kino_db, "~> 0.2.13"},
  {:postgrex, "~> 0.19.1"}
])

Introduction

docker compose:

image

What’s Event Sourcing?

Event sourcing is like a historical timeline for your data. Instead of storing a snapshot of the current state, event sourcing keeps a record of every change that has happened to your data. This allows you to reconstruct the data’s past state and track its evolution over time.

Think of it as a medical record for your data. Just as a doctor can review a patient’s medical history to diagnose an illness, you can use event sourcing to understand how your data has changed and identify any anomalies.

This approach is especially useful for applications that need strong audit trails, complex state tracking, or flexible scalability. However, it also comes with higher storage requirements and computational complexity.

graph LR;
  A(Order Created) -->B(Item 1 Added) -->C(Item 2 Added) --> D(Order Paid) --> E(Order Shipped);

What’s CQRS?

CQRS (Command Query Responsibility Segregation) is like having two separate sets of instructions for your data: one for changing it (commands) and one for reading it (queries). This separation allows for specializing each set of instructions, making data manipulation and retrieval more efficient, especially for applications with high write or read loads.

Let’s put ALL TOGETHER

graph LR;
  C(Command) -->|Invoked on| AGG([Aggregate]) -->|Generates|E(Event) -->|Stored to| ES[(Event Store)];
  PR(Projector) -->|Read from| ES;
  Q(Query) -->|Query to| PN([Projection]) -->|Retrive from| RS[(Read Store)];
  PR -->|Write to| RS

A Command is routed to an Aggregate, that emit an Event. Then the event is stored into the Event Store. The Projector subscribe some events and build one or more table into the Read Store. The Projection retrieve data from Read Store based on a Query

Ok, but why all this complex stuff?

Elixir for its actor model and the pattern match that fix very We’ll this job

Event Sourcing for its auditing built in, time traveling and new projections with an historical deep

CQRS to scale the read and write using the best database for out need

What’s Event Storming

Event Storming is a collaborative workshop technique for understanding and designing complex business processes or software systems. It involves a visual approach where participants use sticky notes to represent events, commands, aggregates, and other key elements on a large workspace. The goal is to foster communication, shared understanding, and uncover insights into the dynamics of the system or process being modeled. Event Storming is commonly used in agile development, domain-driven design, and other contexts where a shared understanding among team members is crucial.

The picture that explains everything image

Yet Another ERP

We put all the Domain Experts, Developers, Product Owners ecc in the same room and start doing a big picture event storming.

images

Then at the end we have a clear idea of what the system should do (or at least we think so).

images

Let’s play with Orders

Let’s create an Order invoking the relative command and take the aggregate id from the command response, for the next steps.

require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

aggregate_uuid =
  Kino.RPC.eval_string(
    node,
    ~S"""
    alias EsCqrsAnatomy.Order.Commands.CreateOrder
    alias EsCqrsAnatomy.Order.Commands.OrderItem

    %CreateOrder{
      id: Faker.UUID.v4(), 
      order_number: Faker.String.base64(5), 
      business_partner: Faker.Internet.email(),
      items: [
        %OrderItem{
          product_id: Faker.UUID.v4(),
          quantity: 1,
          uom: "KG"
        }
      ]
    } 
    |> EsCqrsAnatomy.App.dispatch(include_execution_result: true)
    |> elem(1)
    |> Map.get(:aggregate_uuid)
    """,
    file: __ENV__.file
  )

Create a connection to the Event Store

opts = [
  hostname: "postgres",
  port: 5432,
  username: "postgres",
  password: "postgres",
  database: "event_store"
]

{:ok, conn} = Kino.start_child({Postgrex, opts})

Fetch all the event of the Aggregate.
Here we cheat a bit using the event payload to identify all the events related to our Aggregate.
We see the corect way to doi it soon

result_events =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      event_id::text,
      event_type,
      data
    FROM events 
    WHERE encode(data, 'escape')::json ->> 'id' = $1
    """,
    [aggregate_uuid]
  )

Every Aggregate as it’s own Stream

result =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      stream_id,
      stream_uuid,
      stream_version 
    FROM streams 
    WHERE stream_uuid = $1
    """,
    [aggregate_uuid]
  )

Let’s invoke another command on the Aggregate

require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  alias EsCqrsAnatomy.Order.Commands.CompleteOrder

  %CompleteOrder{
    id: aggregate_uuid
  } 
  |> EsCqrsAnatomy.App.dispatch()
  """,
  file: __ENV__.file
)

Check the Stream now

stream =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      stream_id,
      stream_uuid,
      stream_version 
    FROM streams 
    WHERE stream_uuid = $1
    """,
    [aggregate_uuid]
  )

Take the Stream Id of out Aggregate

require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

stream_id =
  Kino.RPC.eval_string(
    node,
    ~S"""
    %{rows: [[stream_id | _]]} = stream
    stream_id
    """,
    file: __ENV__.file
  )

Check the event i that Stream

result4 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT
      event_id::text,
      stream_id,
      stream_version
    FROM stream_events se
    WHERE stream_id = $1
    ORDER BY stream_version ASC
    """,
    [stream_id]
  )

Put all together to see all events for a give Aggregate

result3 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      e.event_id::text,
      event_type,
      data
    FROM events e
      JOIN stream_events se
        ON e.event_id = se.event_id
      JOIN streams s
        ON s.stream_id = se.stream_id
    WHERE s.stream_uuid = $1
    ORDER BY se.stream_version ASC
    """,
    [aggregate_uuid]
  )

The C stand for Command

How a command is routed to the right Aggregate?
With the Router of course!

We define which fields are the key of the Aggregate and which Command should be routed to which Aggregate

Note! We have removed CreateOrder intentionally. Let’s see what happen…

require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Router do
    use Commanded.Commands.Router

    alias EsCqrsAnatomy.Order.Aggregate.Order

    alias EsCqrsAnatomy.Order.Commands.{CompleteOrder}
    
    identify(Order, by: :id)

    dispatch([CompleteOrder], to: Order)
    
  end
  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

command_result =
  Kino.RPC.eval_string(
    node,
    ~S"""
    alias EsCqrsAnatomy.Order.Commands.CreateOrder
    alias EsCqrsAnatomy.Order.Commands.OrderItem

    %CreateOrder{
      id: Faker.UUID.v4(), 
      order_number: Faker.String.base64(5), 
      business_partner: Faker.Internet.email(),
      items: [
        %OrderItem{
          product_id: Faker.UUID.v4(),
          quantity: 1,
          uom: "KG"
        }
      ]
    } 
    |> EsCqrsAnatomy.App.dispatch()
    """,
    file: __ENV__.file
  )

Cool! The Command is not registered to any Aggregate

Finally the Aggregate

require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Order.Aggregate.Order do
    use TypedStruct

    alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder, DeleteOrder}
    alias EsCqrsAnatomy.Order.Events.{OrderCreated, OrderCompleted, OrderDeleted}
    alias EsCqrsAnatomy.Order.Aggregate.OrderStatus

    @derive Jason.Encoder
    typedstruct enforce: true do
      field(:id, String.t())
      field(:status, String.t())
    end

    def execute(%__MODULE__{id: nil}, %CreateOrder{} = command) do
      %OrderCreated{
        id: command.id,
        order_number: command.order_number,
        business_partner: command.business_partner,
        items: command.items
      }
    end

    def execute(%__MODULE__{}, %CreateOrder{}), do: {:error, "Order already created"}

    def execute(%__MODULE__{status: "COMPLETED"}, %CompleteOrder{}),
      do: {:error, "Order already completed"}

    def execute(%__MODULE__{}, %CompleteOrder{blocked_product_ids: blocked_product_ids})
        when length(blocked_product_ids) > 0 do
      {:error, "Order contains blocked products: #{IO.inspect(blocked_product_ids)}"}
    end

    def execute(%__MODULE__{id: id}, %CompleteOrder{}), do: %OrderCompleted{id: id}

    def execute(%__MODULE__{id: id}, %DeleteOrder{}), do: %OrderDeleted{id: id}

    def apply(%__MODULE__{} = order, %OrderCreated{} = event) do
      %__MODULE__{
        order
        | id: event.id,
          status: OrderStatus.open()
      }
    end

    def apply(%__MODULE__{} = order, %OrderCompleted{} = event) do
      %__MODULE__{
        order | status: OrderStatus.completed()
      }
    end

    def apply(%__MODULE__{} = order, _), do: order
  end

  """,
  file: __ENV__.file
)

Event Handlers

images

require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Router do
    use Commanded.Commands.Router

    alias EsCqrsAnatomy.Middleware.{Validate, Enrichment}
    alias Commanded.Middleware.Uniqueness
    alias EsCqrsAnatomy.Order.Aggregate.Order
    alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder}
    alias EsCqrsAnatomy.Shipment.Aggregate.Shipment
    alias EsCqrsAnatomy.Shipment.Commands.{CreateShipment, CompleteShipment}

    middleware(Enrichment)
    middleware(Validate)
    middleware(Uniqueness)

    identify(Order, by: :id)
    identify(Shipment, by: :id)

    dispatch([CreateOrder, CompleteOrder], to: Order)
    dispatch([CreateShipment, CompleteShipment], to: Shipment)
  end
  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Shipment.Policies.Shipment do
    use Commanded.Event.Handler,
      application: EsCqrsAnatomy.App,
      name: "shipment",
      start_from: :current
      
    use EsCqrsAnatomy.Base.EventHandler
    
    alias EsCqrsAnatomy.Order.Events.OrderCompleted
    alias EsCqrsAnatomy.Shipment.Commands.CreateShipment

    def handle(%OrderCompleted{id: id}, %{
          event_id: causation_id,
          correlation_id: correlation_id
        }) do
      %CreateShipment{
        id: UUID.uuid4(),
        order_id: id
      }
      |> EsCqrsAnatomy.App.dispatch(causation_id: causation_id, correlation_id: correlation_id)
    end

  end
  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.EventHandlerSupervisor do
    use Supervisor

    def start_link(_args) do
      Supervisor.start_link(__MODULE__, [], name: __MODULE__)
    end

    def init(_args) do
      Supervisor.init(
        [
          EsCqrsAnatomy.Order.Projectors.Orders,
          EsCqrsAnatomy.Shipment.Policies.Shipment
        ],
        strategy: :one_for_one
      )
    end
  end

  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"Process.whereis(EsCqrsAnatomy.EventHandlerSupervisor) |> Process.exit(:kill)",
  file: __ENV__.file
)
require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

order_id =
  Kino.RPC.eval_string(
    node,
    ~S"""
    alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder}
    alias EsCqrsAnatomy.Order.Commands.OrderItem
    alias EsCqrsAnatomy.Order.Constants

    order_id = 
      %CreateOrder{
        id: Faker.UUID.v4(), 
        order_number: Faker.String.base64(5), 
        business_partner: Faker.Internet.email(),
        items: [
          %OrderItem{
            product_id: Faker.UUID.v4(),
            quantity: 1,
            uom: "KG"
          }
        ]
      } 
      |> EsCqrsAnatomy.App.dispatch(include_execution_result: true)
      |> elem(1)
      |> Map.get(:aggregate_uuid)

    %CompleteOrder{
      id: order_id
    } 
    |> EsCqrsAnatomy.App.dispatch()

    order_id
    """,
    file: __ENV__.file
  )
result5 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      event_id::text,
      event_type,
      data
    FROM events 
    WHERE event_type = 'Shipment.ShipmentCreated' 
    AND encode(data, 'escape')::json ->> 'order_id' = $1
    """,
    [order_id]
  )

Projector

require Kino.RPC
node = :"escqrsanatomy@host1.com"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Order.Projectors.Orders do
    use Commanded.Projections.Ecto,
      application: EsCqrsAnatomy.App,
      repo: EsCqrsAnatomy.Repo,
      name: "orders"

    use EsCqrsAnatomy.Base.EventHandler

    alias EsCqrsAnatomy.Order.Events.{
      OrderCreated,
      OrderCompleted,
      OrderDeleted
    }

    alias EsCqrsAnatomy.Order.Projections.{Order, OrderItem}
    alias EsCqrsAnatomy.Order.Aggregate.OrderStatus

    project(
      %OrderCreated{} = event,
      _metadata,
      fn multi ->
        multi
        |> Ecto.Multi.insert(
          :orders,
          %Order{
            id: event.id,
            order_number: event.order_number,
            business_partner: event.business_partner,
            status: OrderStatus.open(),
            items:
              event.items
              |> Enum.map(
                &%OrderItem{
                  id: UUID.uuid4(),
                  order_id: event.id,
                  product_id: &1.product_id,
                  quantity: &1.quantity,
                  uom: &1.uom
                }
              )
          }
        )
      end
    )

    project(
      %OrderCompleted{id: id},
      _metadata,
      fn multi ->
        multi
        |> Ecto.Multi.run(:order_to_update, fn repo, _changes ->
          {:ok, repo.get(Order, id)}
        end)
        |> Ecto.Multi.update(:order, fn %{order_to_update: order} ->
          Ecto.Changeset.change(order,
            status: OrderStatus.completed()
          )
        end)
      end
    )

    project(
      %OrderDeleted{id: id},
      _metadata,
      fn multi ->
        multi
        |> Ecto.Multi.run(:order_to_delete, fn repo, _changes ->
          {rows_deleted, _} = from(o in Order, where: o.id == ^id) |> repo.delete_all()
          {:ok, rows_deleted}
        end)
      end
    )

  end
  """,
  file: __ENV__.file
)
opts = [
  hostname: "postgres",
  port: 5432,
  username: "postgres",
  password: "postgres",
  database: "read_store"
]

{:ok, conn} = Kino.start_child({Postgrex, opts})
result9 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT
      o.order_number, 
      o.business_partner, 
      o.status, 
      oi.product_id, 
      oi.quantity, 
      oi.uom 
    FROM orders o 
    	JOIN order_items oi 
    		ON o.id = oi.order_id 
    ORDER BY 
      o.order_number, 
      oi.quantity 
    """,
    []
  )

Performance

Aggregate Snapshoot

opts = [
  hostname: "postgres",
  port: 5432,
  username: "postgres",
  password: "postgres",
  database: "event_store"
]

{:ok, conn} = Kino.start_child({Postgrex, opts})
config :es_cqrs_anatomy, EsCqrsAnatomy.App,
  snapshotting: %{
    EsCqrsAnatomy.Order.Aggregate.Order => [
      snapshot_every: 1,
      snapshot_version: 1
    ]
  }
result12 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT
    	s.source_version,
    	s.source_type,
      s.metadata,
    	s.data
    FROM snapshots s 
    """,
    []
  )