Using cqrs_tools with Commanded
Rationale
Commanded is brilliant event sourcing library for Elixir. It makes implementing event sourcing pretty effortless.
It doesn’t, however, provide any prescriptions for creating commands or events. For simple projects this isn’t a huge deal. For those, we just create simple structs.
When a project grows, without guidance, it can end get pretty messy when dealing with those parts. Especially when it comes to data validation.
cqrs_tools
aims to make creating these messages very easy while offering a developer a lot of power.
Install Dependencies
# Turn off automatic jason encoder implementations for this demo
Application.put_env(:cqrs_tools, :create_jason_encoders, false)
Mix.install([
# Commaded is an event sourcing library
{:commanded, "~> 1.2"},
# CQRS Tools is a set of macros to make your life easy in a CQRS world.
{:cqrs_tools, "~> 0.3"},
# A toolkit for data mapping and language integrated query.
{:ecto, "~> 3.6"},
# The ETS adapter for Ecto.
{:etso, "~> 0.1.5"},
# A fast library for JSON encoding and decoding.
{:jason, "~> 1.2"}
])
Define an Ecto Repo
defmodule Cqrs.Repo do
use Ecto.Repo, otp_app: :cqrs_tools_bank_account, adapter: Etso.Adapter
end
Cqrs.Repo.start_link([])
Define a Commanded Application
defmodule Cqrs.App do
@moduledoc false
use Commanded.Application,
otp_app: :cqrs_tools_bank_account,
event_store: [
adapter: Commanded.EventStore.Adapters.InMemory
],
default_dispatch_opts: [
consistency: :strong,
returning: :execution_result
]
end
Commands and Events
Commands
defmodule Cqrs.BankAccount.Protocol.OpenAccount do
use Cqrs.Command, dispatcher: Cqrs.App
field :account_number, :string
field :initial_balance, :decimal
end
alias Cqrs.BankAccount.Protocol.OpenAccount
Create a Command
command = OpenAccount.new!(account_number: "abc789", initial_balance: 1000)
Events
defmodule Cqrs.BankAccount.Protocol.OpenAccount do
use Cqrs.Command, dispatcher: Cqrs.App
field :account_number, :string
field :initial_balance, :decimal
derive_event AccountOpened
end
alias Cqrs.BankAccount.Protocol.{OpenAccount, AccountOpened}
Create an Event
AccountOpened.new(command)
Dispatch a Command
OpenAccount.dispatch(command)
Nothing happens quite yet.
The rest of the commands and events
defmodule Cqrs.BankAccount.Protocol.DepositMoney do
use Cqrs.Command, dispatcher: Cqrs.App
field :account_number, :string
field :amount, :decimal
derive_event MoneyDeposited, with: [:balance]
end
defmodule Cqrs.BankAccount.Protocol.WithdrawMoney do
use Cqrs.Command, dispatcher: Cqrs.App
field :account_number, :string
field :amount, :decimal
derive_event MoneyWithdrawn, with: [:balance]
derive_event AccountOverdrawn, with: [:balance], drop: [:amount]
end
defmodule Cqrs.BankAccount.Protocol.CloseAccount do
use Cqrs.Command, dispatcher: Cqrs.App
field :account_number, :string
derive_event BankAccountClosed
end
alias Cqrs.BankAccount.Protocol.{DepositMoney, WithdrawMoney, CloseAccount}
Aggregate Roots and Command Dispatching
The Aggregate Root
defmodule Cqrs.BankAccount.BankAccountAggregate do
defstruct [:account_number, :state, balance: 0]
alias Cqrs.BankAccount.Protocol
alias Protocol.{OpenAccount, AccountOpened}
alias Protocol.{DepositMoney, MoneyDeposited}
alias Protocol.{WithdrawMoney, MoneyWithdrawn, AccountOverdrawn}
alias Protocol.{CloseAccount, BankAccountClosed}
alias Cqrs.BankAccount.BankAccountAggregate, as: State
def execute(%State{account_number: nil}, %OpenAccount{initial_balance: balance} = command) do
if balance > 0,
do: AccountOpened.new(command),
else: {:error, :invalid_initial_balance}
end
def execute(_state, %OpenAccount{}) do
{:error, :bank_account_already_exists}
end
# Catch all when trying to execute a command to a non-existant account
def execute(%State{account_number: nil}, _), do: {:error, :account_not_found}
# Catch all when trying to execute a command to a closed account
def execute(%State{state: state}, _) when state != :active do
{:error, :account_has_been_closed}
end
def execute(%State{balance: balance}, %DepositMoney{amount: amount} = command) do
new_balance = Decimal.add(balance, amount)
if amount > 0,
do: MoneyDeposited.new(command, balance: new_balance),
else: {:error, :what_are_you_thinking}
end
def execute(%State{balance: balance}, %WithdrawMoney{amount: amount} = command) do
new_balance = Decimal.sub(balance, amount)
withdrawn = MoneyWithdrawn.new(command, balance: new_balance)
if Decimal.lt?(new_balance, 0),
do: [withdrawn, AccountOverdrawn.new(command, balance: new_balance)],
else: withdrawn
end
def execute(_state, %CloseAccount{} = command), do: BankAccountClosed.new(command)
# State Mutations
def apply(state, %AccountOpened{account_number: number, initial_balance: balance}) do
%{state | account_number: number, balance: balance, state: :active}
end
def apply(%State{balance: balance} = state, %MoneyDeposited{amount: amount}) do
%{state | balance: Decimal.add(balance, amount)}
end
def apply(%State{} = state, %MoneyWithdrawn{balance: balance}) do
%{state | balance: balance}
end
def apply(state, %AccountOverdrawn{}), do: state
def apply(state, %BankAccountClosed{}), do: %{state | state: :closed}
end
The Router
defmodule Cqrs.BankAccount.Router do
use Commanded.Commands.Router
alias Cqrs.BankAccount.BankAccountAggregate
alias Cqrs.BankAccount.Protocol.{
OpenAccount,
DepositMoney,
WithdrawMoney,
CloseAccount
}
dispatch [OpenAccount, DepositMoney, WithdrawMoney, CloseAccount],
to: BankAccountAggregate,
identity: :account_number,
identity_prefix: "account-"
end
Commanded Application
defmodule Cqrs.App do
@moduledoc false
use Commanded.Application,
otp_app: :cqrs_tools_bank_account,
event_store: [
adapter: Commanded.EventStore.Adapters.InMemory
],
default_dispatch_opts: [
consistency: :strong,
returning: :execution_result
]
router Cqrs.BankAccount.Router
end
Cqrs.App.start_link([])
We can now dispatch the commands to the aggregate root.
OpenAccount.dispatch(command)
Bounded Contexts
After creating our commands and events, I like to expose them through a single module. But having to type the same thing over and over again is tedious AF. And since they are all executed the same way (call new
with some attributes and call execute
the result of that), we can generate that code.
defmodule Cqrs.BankAccount do
use Cqrs.BoundedContext
use Cqrs.BoundedContext.Commanded
alias Cqrs.BankAccount.Router
import_commands Router
end
alias Cqrs.BankAccount
All of our commands are now visible from the BankAccount module!
BankAccount.__info__(:functions)
account_number = "123"
BankAccount.open_account(account_number: account_number, initial_balance: 100)
BankAccount.deposit_money(account_number: account_number, amount: 15)
BankAccount.withdraw_money(account_number: account_number, amount: 40)
BankAccount.close_account(account_number: account_number)
The Read Model
Data Schema
defmodule Cqrs.ReadModel.BankAccount do
use Ecto.Schema
import Ecto.Changeset
@primary_key false
schema "bank_accounts" do
field :account_number, :string, primary_key: true
field :balance, :decimal
field :state, Ecto.Enum, values: [:active, :closed]
timestamps()
end
def changeset(account \\ %__MODULE__{}, attrs) do
account
|> cast(attrs, [:account_number, :balance, :state])
|> validate_required([:account_number, :balance])
end
end
Projectors
defmodule Cqrs.ReadModel.BankAccountProjector do
use Commanded.Event.Handler,
application: Cqrs.App,
name: "bank_account_projector-v1",
consistency: :strong
alias Cqrs.Repo
alias Cqrs.ReadModel.BankAccount
alias Cqrs.BankAccount.Protocol.{
AccountOpened,
MoneyDeposited,
MoneyWithdrawn,
BankAccountClosed
}
def handle(%AccountOpened{initial_balance: balance} = event, _metadata) do
event
|> Map.from_struct()
|> Map.put(:state, :active)
|> Map.put(:balance, balance)
|> BankAccount.changeset()
|> Repo.insert!()
:ok
end
def handle(%MoneyDeposited{account_number: number} = event, _metadata) do
attrs = Map.from_struct(event)
BankAccount
|> Repo.get!(number)
|> BankAccount.changeset(attrs)
|> Repo.update!()
:ok
end
def handle(%MoneyWithdrawn{account_number: number} = event, _metadata) do
attrs = Map.from_struct(event)
BankAccount
|> Repo.get!(number)
|> BankAccount.changeset(attrs)
|> Repo.update!()
:ok
end
def handle(%BankAccountClosed{account_number: number}, _metadata) do
BankAccount
|> Repo.get!(number)
|> BankAccount.changeset(%{state: :closed})
|> Repo.update!()
:ok
end
end
Supervisor
A real-world application will have many projectors. So it is good practice to start them under a supervisor.
defmodule Cqrs.ReadModel do
@moduledoc false
use Supervisor
alias Cqrs.ReadModel.BankAccountProjector
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
children = [BankAccountProjector]
Supervisor.init(children, strategy: :one_for_one)
end
end
Cqrs.ReadModel.start_link([])
Let’s test this out.
alias Cqrs.{ReadModel, Repo}
account_number = "ABC123"
BankAccount.open_account(account_number: account_number, initial_balance: 100)
Repo.get(ReadModel.BankAccount, account_number)
Now deposit some money
BankAccount.deposit_money(account_number: account_number, amount: 50.23)
Repo.get(ReadModel.BankAccount, account_number)
Queries
Now that we have everything working so far, it’s time that we define what queries our application supports.
Queries should be as well defined as your commands and events. They should be able to be not only executed, but also be created without executing them. This will help with query composition or the need to defer execution.
Our First Query
For our first query, let’s allow a user to load a bank account by account number.
This is the simplest implementation of a query that you can make. It defines one required filter and uses Repo.one
to execute itself.
defmodule Cqrs.Queries.GetBankAccount do
use Cqrs.Query
alias Cqrs.{Repo, ReadModel.BankAccount}
filter :account_number, :string, required: true
@impl true
def handle_create([account_number: number], _opts) do
from(a in BankAccount, where: a.account_number == ^number)
end
@impl true
def handle_execute(query, _opts) do
Repo.one(query)
end
end
alias Cqrs.Queries.GetBankAccount
You create a query by calling the new
function.
Queries share the same validation characteristics of Commands. You can not execute an invalid query.
GetBankAccount.new(account_number: 123) |> GetBankAccount.execute()
GetBankAccount.new(account_number: account_number)
GetBankAccount.new(account_number: account_number)
|> GetBankAccount.execute()
Let’s create another query to get a list of bank accounts.
This time we’ll define three filters; each of them optional. We can also support dynamically setting options for the query. Here we’re defaulting the limit to 25
but a caller can override it. You can do the same for ordering or anything else you want to support.
Two new macros are introduced in this query. They are used to not only generate documentation but to also make it clear to the reader what is involved in this query.
-
option
is used to document the supported options of the query.
Options declared here are guaranteed to be present with the default value set, if not provided by the caller, in the opts list.
-
binding
is used to document the named bindings used in the query.
This is helpful if you want to use this query to compose a larger query.
There are no guarantees here. As the author of the query, it is up to you to actually define the named binding.
defmodule Cqrs.Queries.ListBankAccounts do
use Cqrs.Query
alias Cqrs.{Repo, ReadModel.BankAccount}
filter :balance_gt, :decimal
filter :balance_lt, :decimal
filter :state, :enum, values: [:active, :closed]
option :limit, :integer, default: 25
binding :account, BankAccount
@impl true
def handle_create(filters, opts) do
limit = Keyword.fetch!(opts, :limit)
base_query = from(a in BankAccount, as: :account, limit: ^limit)
Enum.reduce(filters, base_query, fn
{:balance_gt, balance}, query -> from q in query, where: q.balance > ^balance
{:balance_lt, balance}, query -> from q in query, where: q.balance < ^balance
{:state, state}, query -> from q in query, where: q.state == ^state
end)
end
@impl true
def handle_execute(query, _opts) do
Repo.all(query)
end
end
alias Cqrs.BankAccount
alias Cqrs.Queries.ListBankAccounts
Play time!
BankAccount.open_account(account_number: "checking123", initial_balance: 100)
BankAccount.open_account(account_number: "savings123", initial_balance: 50_000)
ListBankAccounts.new() |> ListBankAccounts.execute()
ListBankAccounts.new(balance_lt: 105) |> ListBankAccounts.execute()
ListBankAccounts.new(balance_lt: 20_000, balance_gt: 50) |> ListBankAccounts.execute()
BankAccount.close_account(account_number: "checking123")
ListBankAccounts.new(state: :closed) |> ListBankAccounts.execute()
Adding Queries to a Bounded Context
While having to type all that out to expose a query through a module isn’t the worst thing in the world, I personally don’t like having to do it.
Just like we did for commands, we can expose our queries pretty easily.
The BoundedContext.query
macro is made for this.
Let’s recompile our BankAccount
module.
defmodule Cqrs.BankAccount do
use Cqrs.BoundedContext
use Cqrs.BoundedContext.Commanded
alias Cqrs.Queries
alias Cqrs.BankAccount.Router
import_commands Router
query Queries.GetBankAccount
query Queries.ListBankAccounts
end
All of our queries are now visible from the BankAccount
module!
Cqrs.BankAccount.__info__(:functions)
We can now create and execute our queries directly from the BankAccount
module.
BankAccount.list_bank_accounts(balance_lt: 20_000, balance_gt: 50)
And return errors or raise an exception if we pass invalid data
BankAccount.list_bank_accounts(balance_lt: :what, balance_gt: :this_is_no_good)
BankAccount.list_bank_accounts!(balance_lt: :what, balance_gt: :this_is_no_good)