Change Data Capture with Broadway
Mix.install([
{:tigerbeetlex, "~> 0.16.47"},
{:broadway, "~> 1.2"},
{:broadway_rabbitmq, "~> 0.7"},
{:jason, "~> 1.2"}
])
Introduction
Since version 0.16.43, TigerBeetle can stream changes (transfers and balance updates) on RabbitMQ. TigerBeetlex provides structs and functions to help decode the raw JSON data of CDC events into well-defined struct.
This guide shows how to use Broadway and its RabbitMQ connector to easily create a data pipeline that consumes TigerBeetle CDC data. This can be used as a starting point to build your own pipeline. The guide can also be executed in Livebook.
Requirements
RabbitMQ
The guide assumes you run RabbitMQ locally on port 5672 with default credentials (guest:guest
). You can run an instance of RabbitMQ using Docker with this command:
docker run -it --rm --name rabbitmq -d -p 5672:5672 -p 15672:15672 rabbitmq:4-management
This also exposes the RabbitMQ management dashboard on http://localhost:15672
.
There’s no need to declare any exchange or queue since you pipeline will be responsible for its own setup.
Setup
In your mix.exs
add TigerBeetlex, Broadway and Jason as dependencies
[
{:tigerbeetlex, "~> 0.16.47"},
{:broadway, "~> 1.2"},
{:broadway_rabbitmq, "~> 0.7"},
{:jason, "~> 1.2"}
]
Building the pipeline
Here’s the basic Broadway pipeline to consume TigerBeetle CDC data
defmodule MyApp.CDCPipeline do
use Broadway
alias TigerBeetlex.CDC.Event
alias Broadway.Message
@exchange "tigerbeetle"
@queue "tigerbeetle_cdc_broadway"
def start_link(_opts \\ []) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayRabbitMQ.Producer,
connection: [
host: "localhost",
username: "guest",
password: "guest",
],
after_connect: &declare_rabbitmq_topology/1,
queue: @queue,
declare: [durable: true],
bindings: [{@exchange, []}],
on_failure: :reject_and_requeue}
],
processors: [
default: []
]
)
end
defp declare_rabbitmq_topology(amqp_channel) do
AMQP.Exchange.declare(amqp_channel, @exchange, :fanout, durable: true)
end
@impl true
def handle_message(_processor, message, _context) do
message
|> Message.update_data(fn data ->
# The message data is the JSON string representing the event.
# We're going to decode it and then transform it in a `TigerBeetlex.CDC.Event` struct.
data
|> Jason.decode!()
|> Event.cast!()
end)
|> process_message()
end
defp process_message(message) do
# Here you would put your processing logic. We're just going to print the event.
IO.inspect(message.data)
# Note that `handle_message` must return the (possibly modified) message.
message
end
end
Here are some key highlights about the pipeline, feel free to consult the documentation of Broadway
and BroadwayRabbitMQ
for more information:
-
The
connection
key supports all options supported byAMQP.Connection.open
to configure your RabbitMQ connection -
The pipeline declares a
tigerbeetle
fanout exchange and atigerbeetle_cdc_broadway
queue, bound to the exchange. This means that all messages sent to the exchange will be sent to the queue, regardless of the routing key -
Both the exchange and the queue are created with
durable: true
, meaning they survive RabbitMQ restarts. Declaring exchanges and queues is idempotent so it’s ok for the pipeline to declare them again -
on_failure
is set to:reject_and_requeue
, which means that if a message fails to be processed it will be rejected and re-enqueued. The behavior you want to use here depends on your use case and your need to do strict in-order processing.
Once you have your pipeline defined, you can start it under your supervision tree. Here we’re just going to start it manually.
case MyApp.CDCPipeline.start_link() do
{:ok, pid} -> {:ok, pid}
# Since we're executing on Livebook, handle re-execution
{:error, {:already_started, pid}} -> {:ok, pid}
{:error, reason} -> {:error, reason}
end
Start TigerBeetle CDC Job
To start TigerBeetle CDC job issue the command below.
> #### ⚠️ Big data ahead
>
> The first time that it’s run, the CDC job will publish all data in your cluster since the beginning of time.
>
> If you want to only start publising from the current date, you have to pass the --timestamp-last
providing an appropriate epoch timestamp in nanoseconds.
./tigerbeetle amqp --addresses=127.0.0.1:3000 --cluster=0 \
--host=127.0.0.1 \
--vhost=/ \
--user=guest --password=guest \
--publish-exchange=tigerbeetle
This will connect to the TigerBeetle cluster pointed to by addresses
and cluster
and start publising CDC data on the tigerbeetle
exchange. Check out the full documentation of the tigerbeetle amqp
message here.
Pushing some data
Let’s create two accounts to transact with.
alias TigerBeetlex.Account
alias TigerBeetlex.Connection
alias TigerBeetlex.ID
address = "127.0.0.1:3000"
{:ok, _pid} = Connection.start_link(name: :tb, cluster_id: <<0::128>>, addresses: [address])
account_1 =
%Account{
id: ID.from_int(42_000),
ledger: 1,
code: 2
}
account_2 =
%Account{
id: ID.from_int(42_001),
ledger: 1,
code: 3
}
{:ok, _account_errors} = Connection.create_accounts(:tb, [account_1, account_2])
# Error handling omitted
Now let’s create a Transfer between the accounts.
alias TigerBeetlex.Transfer
transfers = [
%Transfer{
id: ID.generate(),
debit_account_id: ID.from_int(42_000),
credit_account_id: ID.from_int(42_001),
amount: 10_000,
ledger: 1,
code: 720
}
]
{:ok, []} = Connection.create_transfers(:tb, transfers)
You should now see the print output from the pipeline (if you’re executing this in Livebook, it’s below the cell where you create the transfer).
And that’s it, you are now consuming TigerBeetle CDC data! Be sure to read Broadway
and BroadwayRabbitMQ
documentation and check out all their features (like, for example, batching) to understand how to modify this basic pipeline to suit your usecase.