Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Using cqrs_tools with Commanded

livebooks/commanded.livemd

Using cqrs_tools with Commanded

Rationale

Commanded is brilliant event sourcing library for Elixir. It makes implementing event sourcing pretty effortless.

It doesn’t, however, provide any prescriptions for creating commands or events. For simple projects this isn’t a huge deal. For those, we just create simple structs.

When a project grows, without guidance, it can end get pretty messy when dealing with those parts. Especially when it comes to data validation.

cqrs_tools aims to make creating these messages very easy while offering a developer a lot of power.

Install Dependencies

# Turn off automatic jason encoder implementations for this demo
Application.put_env(:cqrs_tools, :create_jason_encoders, false)

Mix.install([
  # Commaded is an event sourcing library
  {:commanded, "~> 1.2"},

  # CQRS Tools is a set of macros to make your life easy in a CQRS world.
  {:cqrs_tools, "~> 0.3"},

  # A toolkit for data mapping and language integrated query.
  {:ecto, "~> 3.6"},

  # The ETS adapter for Ecto.
  {:etso, "~> 0.1.5"},

  # A fast library for JSON encoding and decoding.
  {:jason, "~> 1.2"}
])

Define an Ecto Repo

defmodule Cqrs.Repo do
  use Ecto.Repo, otp_app: :cqrs_tools_bank_account, adapter: Etso.Adapter
end

Cqrs.Repo.start_link([])

Define a Commanded Application

defmodule Cqrs.App do
  @moduledoc false
  use Commanded.Application,
    otp_app: :cqrs_tools_bank_account,
    event_store: [
      adapter: Commanded.EventStore.Adapters.InMemory
    ],
    default_dispatch_opts: [
      consistency: :strong,
      returning: :execution_result
    ]
end

Commands and Events

Commands

defmodule Cqrs.BankAccount.Protocol.OpenAccount do
  use Cqrs.Command, dispatcher: Cqrs.App

  field :account_number, :string
  field :initial_balance, :decimal
end

alias Cqrs.BankAccount.Protocol.OpenAccount

Create a Command

command = OpenAccount.new!(account_number: "abc789", initial_balance: 1000)

Events

defmodule Cqrs.BankAccount.Protocol.OpenAccount do
  use Cqrs.Command, dispatcher: Cqrs.App

  field :account_number, :string
  field :initial_balance, :decimal

  derive_event AccountOpened
end

alias Cqrs.BankAccount.Protocol.{OpenAccount, AccountOpened}

Create an Event

AccountOpened.new(command)

Dispatch a Command

OpenAccount.dispatch(command)

Nothing happens quite yet.

The rest of the commands and events

defmodule Cqrs.BankAccount.Protocol.DepositMoney do
  use Cqrs.Command, dispatcher: Cqrs.App

  field :account_number, :string
  field :amount, :decimal

  derive_event MoneyDeposited, with: [:balance]
end

defmodule Cqrs.BankAccount.Protocol.WithdrawMoney do
  use Cqrs.Command, dispatcher: Cqrs.App

  field :account_number, :string
  field :amount, :decimal

  derive_event MoneyWithdrawn, with: [:balance]
  derive_event AccountOverdrawn, with: [:balance], drop: [:amount]
end

defmodule Cqrs.BankAccount.Protocol.CloseAccount do
  use Cqrs.Command, dispatcher: Cqrs.App

  field :account_number, :string

  derive_event BankAccountClosed
end

alias Cqrs.BankAccount.Protocol.{DepositMoney, WithdrawMoney, CloseAccount}

Aggregate Roots and Command Dispatching

The Aggregate Root

defmodule Cqrs.BankAccount.BankAccountAggregate do
  defstruct [:account_number, :state, balance: 0]

  alias Cqrs.BankAccount.Protocol
  alias Protocol.{OpenAccount, AccountOpened}
  alias Protocol.{DepositMoney, MoneyDeposited}
  alias Protocol.{WithdrawMoney, MoneyWithdrawn, AccountOverdrawn}
  alias Protocol.{CloseAccount, BankAccountClosed}

  alias Cqrs.BankAccount.BankAccountAggregate, as: State

  def execute(%State{account_number: nil}, %OpenAccount{initial_balance: balance} = command) do
    if balance > 0,
      do: AccountOpened.new(command),
      else: {:error, :invalid_initial_balance}
  end

  def execute(_state, %OpenAccount{}) do
    {:error, :bank_account_already_exists}
  end

  # Catch all when trying to execute a command to a non-existant account
  def execute(%State{account_number: nil}, _), do: {:error, :account_not_found}

  # Catch all when trying to execute a command to a closed account
  def execute(%State{state: state}, _) when state != :active do
    {:error, :account_has_been_closed}
  end

  def execute(%State{balance: balance}, %DepositMoney{amount: amount} = command) do
    new_balance = Decimal.add(balance, amount)

    if amount > 0,
      do: MoneyDeposited.new(command, balance: new_balance),
      else: {:error, :what_are_you_thinking}
  end

  def execute(%State{balance: balance}, %WithdrawMoney{amount: amount} = command) do
    new_balance = Decimal.sub(balance, amount)
    withdrawn = MoneyWithdrawn.new(command, balance: new_balance)

    if Decimal.lt?(new_balance, 0),
      do: [withdrawn, AccountOverdrawn.new(command, balance: new_balance)],
      else: withdrawn
  end

  def execute(_state, %CloseAccount{} = command), do: BankAccountClosed.new(command)

  # State Mutations

  def apply(state, %AccountOpened{account_number: number, initial_balance: balance}) do
    %{state | account_number: number, balance: balance, state: :active}
  end

  def apply(%State{balance: balance} = state, %MoneyDeposited{amount: amount}) do
    %{state | balance: Decimal.add(balance, amount)}
  end

  def apply(%State{} = state, %MoneyWithdrawn{balance: balance}) do
    %{state | balance: balance}
  end

  def apply(state, %AccountOverdrawn{}), do: state
  def apply(state, %BankAccountClosed{}), do: %{state | state: :closed}
end

The Router

defmodule Cqrs.BankAccount.Router do
  use Commanded.Commands.Router

  alias Cqrs.BankAccount.BankAccountAggregate

  alias Cqrs.BankAccount.Protocol.{
    OpenAccount,
    DepositMoney,
    WithdrawMoney,
    CloseAccount
  }

  dispatch [OpenAccount, DepositMoney, WithdrawMoney, CloseAccount],
    to: BankAccountAggregate,
    identity: :account_number,
    identity_prefix: "account-"
end

Commanded Application

defmodule Cqrs.App do
  @moduledoc false
  use Commanded.Application,
    otp_app: :cqrs_tools_bank_account,
    event_store: [
      adapter: Commanded.EventStore.Adapters.InMemory
    ],
    default_dispatch_opts: [
      consistency: :strong,
      returning: :execution_result
    ]

  router Cqrs.BankAccount.Router
end

Cqrs.App.start_link([])

We can now dispatch the commands to the aggregate root.

OpenAccount.dispatch(command)

Bounded Contexts

After creating our commands and events, I like to expose them through a single module. But having to type the same thing over and over again is tedious AF. And since they are all executed the same way (call new with some attributes and call execute the result of that), we can generate that code.

defmodule Cqrs.BankAccount do
  use Cqrs.BoundedContext
  use Cqrs.BoundedContext.Commanded

  alias Cqrs.BankAccount.Router

  import_commands Router
end

alias Cqrs.BankAccount

All of our commands are now visible from the BankAccount module!

BankAccount.__info__(:functions)
account_number = "123"

BankAccount.open_account(account_number: account_number, initial_balance: 100)
BankAccount.deposit_money(account_number: account_number, amount: 15)
BankAccount.withdraw_money(account_number: account_number, amount: 40)
BankAccount.close_account(account_number: account_number)

The Read Model

Data Schema

defmodule Cqrs.ReadModel.BankAccount do
  use Ecto.Schema

  import Ecto.Changeset

  @primary_key false
  schema "bank_accounts" do
    field :account_number, :string, primary_key: true
    field :balance, :decimal
    field :state, Ecto.Enum, values: [:active, :closed]
    timestamps()
  end

  def changeset(account \\ %__MODULE__{}, attrs) do
    account
    |> cast(attrs, [:account_number, :balance, :state])
    |> validate_required([:account_number, :balance])
  end
end

Projectors

defmodule Cqrs.ReadModel.BankAccountProjector do
  use Commanded.Event.Handler,
    application: Cqrs.App,
    name: "bank_account_projector-v1",
    consistency: :strong

  alias Cqrs.Repo
  alias Cqrs.ReadModel.BankAccount

  alias Cqrs.BankAccount.Protocol.{
    AccountOpened,
    MoneyDeposited,
    MoneyWithdrawn,
    BankAccountClosed
  }

  def handle(%AccountOpened{initial_balance: balance} = event, _metadata) do
    event
    |> Map.from_struct()
    |> Map.put(:state, :active)
    |> Map.put(:balance, balance)
    |> BankAccount.changeset()
    |> Repo.insert!()

    :ok
  end

  def handle(%MoneyDeposited{account_number: number} = event, _metadata) do
    attrs = Map.from_struct(event)

    BankAccount
    |> Repo.get!(number)
    |> BankAccount.changeset(attrs)
    |> Repo.update!()

    :ok
  end

  def handle(%MoneyWithdrawn{account_number: number} = event, _metadata) do
    attrs = Map.from_struct(event)

    BankAccount
    |> Repo.get!(number)
    |> BankAccount.changeset(attrs)
    |> Repo.update!()

    :ok
  end

  def handle(%BankAccountClosed{account_number: number}, _metadata) do
    BankAccount
    |> Repo.get!(number)
    |> BankAccount.changeset(%{state: :closed})
    |> Repo.update!()

    :ok
  end
end

Supervisor

A real-world application will have many projectors. So it is good practice to start them under a supervisor.

defmodule Cqrs.ReadModel do
  @moduledoc false
  use Supervisor

  alias Cqrs.ReadModel.BankAccountProjector

  def start_link(opts) do
    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_opts) do
    children = [BankAccountProjector]
    Supervisor.init(children, strategy: :one_for_one)
  end
end

Cqrs.ReadModel.start_link([])

Let’s test this out.

alias Cqrs.{ReadModel, Repo}

account_number = "ABC123"

BankAccount.open_account(account_number: account_number, initial_balance: 100)

Repo.get(ReadModel.BankAccount, account_number)

Now deposit some money

BankAccount.deposit_money(account_number: account_number, amount: 50.23)
Repo.get(ReadModel.BankAccount, account_number)

Queries

Now that we have everything working so far, it’s time that we define what queries our application supports.

Queries should be as well defined as your commands and events. They should be able to be not only executed, but also be created without executing them. This will help with query composition or the need to defer execution.

Our First Query

For our first query, let’s allow a user to load a bank account by account number.

This is the simplest implementation of a query that you can make. It defines one required filter and uses Repo.one to execute itself.

defmodule Cqrs.Queries.GetBankAccount do
  use Cqrs.Query

  alias Cqrs.{Repo, ReadModel.BankAccount}

  filter :account_number, :string, required: true

  @impl true
  def handle_create([account_number: number], _opts) do
    from(a in BankAccount, where: a.account_number == ^number)
  end

  @impl true
  def handle_execute(query, _opts) do
    Repo.one(query)
  end
end

alias Cqrs.Queries.GetBankAccount

You create a query by calling the new function.

Queries share the same validation characteristics of Commands. You can not execute an invalid query.

GetBankAccount.new(account_number: 123) |> GetBankAccount.execute()
GetBankAccount.new(account_number: account_number)
GetBankAccount.new(account_number: account_number)
|> GetBankAccount.execute()

Let’s create another query to get a list of bank accounts.

This time we’ll define three filters; each of them optional. We can also support dynamically setting options for the query. Here we’re defaulting the limit to 25 but a caller can override it. You can do the same for ordering or anything else you want to support.

Two new macros are introduced in this query. They are used to not only generate documentation but to also make it clear to the reader what is involved in this query.

  • option is used to document the supported options of the query.

Options declared here are guaranteed to be present with the default value set, if not provided by the caller, in the opts list.

  • binding is used to document the named bindings used in the query.

This is helpful if you want to use this query to compose a larger query.

There are no guarantees here. As the author of the query, it is up to you to actually define the named binding.

defmodule Cqrs.Queries.ListBankAccounts do
  use Cqrs.Query

  alias Cqrs.{Repo, ReadModel.BankAccount}

  filter :balance_gt, :decimal
  filter :balance_lt, :decimal
  filter :state, :enum, values: [:active, :closed]

  option :limit, :integer, default: 25

  binding :account, BankAccount

  @impl true
  def handle_create(filters, opts) do
    limit = Keyword.fetch!(opts, :limit)

    base_query = from(a in BankAccount, as: :account, limit: ^limit)

    Enum.reduce(filters, base_query, fn
      {:balance_gt, balance}, query -> from q in query, where: q.balance > ^balance
      {:balance_lt, balance}, query -> from q in query, where: q.balance < ^balance
      {:state, state}, query -> from q in query, where: q.state == ^state
    end)
  end

  @impl true
  def handle_execute(query, _opts) do
    Repo.all(query)
  end
end

alias Cqrs.BankAccount
alias Cqrs.Queries.ListBankAccounts

Play time!

BankAccount.open_account(account_number: "checking123", initial_balance: 100)
BankAccount.open_account(account_number: "savings123", initial_balance: 50_000)

ListBankAccounts.new() |> ListBankAccounts.execute()
ListBankAccounts.new(balance_lt: 105) |> ListBankAccounts.execute()
ListBankAccounts.new(balance_lt: 20_000, balance_gt: 50) |> ListBankAccounts.execute()
BankAccount.close_account(account_number: "checking123")
ListBankAccounts.new(state: :closed) |> ListBankAccounts.execute()

Adding Queries to a Bounded Context

While having to type all that out to expose a query through a module isn’t the worst thing in the world, I personally don’t like having to do it.

Just like we did for commands, we can expose our queries pretty easily.

The BoundedContext.query macro is made for this.

Let’s recompile our BankAccount module.

defmodule Cqrs.BankAccount do
  use Cqrs.BoundedContext
  use Cqrs.BoundedContext.Commanded

  alias Cqrs.Queries
  alias Cqrs.BankAccount.Router

  import_commands Router

  query Queries.GetBankAccount
  query Queries.ListBankAccounts
end

All of our queries are now visible from the BankAccount module!

Cqrs.BankAccount.__info__(:functions)

We can now create and execute our queries directly from the BankAccount module.

BankAccount.list_bank_accounts(balance_lt: 20_000, balance_gt: 50)

And return errors or raise an exception if we pass invalid data

BankAccount.list_bank_accounts(balance_lt: :what, balance_gt: :this_is_no_good)
BankAccount.list_bank_accounts!(balance_lt: :what, balance_gt: :this_is_no_good)