Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Change Data Capture with Broadway

guides/change_data_capture.livemd

Change Data Capture with Broadway

Mix.install([
  {:tigerbeetlex, "~> 0.16.47"},
  {:broadway, "~> 1.2"},
  {:broadway_rabbitmq, "~> 0.7"},
  {:jason, "~> 1.2"}
])

Introduction

Since version 0.16.43, TigerBeetle can stream changes (transfers and balance updates) on RabbitMQ. TigerBeetlex provides structs and functions to help decode the raw JSON data of CDC events into well-defined struct.

This guide shows how to use Broadway and its RabbitMQ connector to easily create a data pipeline that consumes TigerBeetle CDC data. This can be used as a starting point to build your own pipeline. The guide can also be executed in Livebook.

Requirements

RabbitMQ

The guide assumes you run RabbitMQ locally on port 5672 with default credentials (guest:guest). You can run an instance of RabbitMQ using Docker with this command:

docker run -it --rm --name rabbitmq -d -p 5672:5672 -p 15672:15672 rabbitmq:4-management

This also exposes the RabbitMQ management dashboard on http://localhost:15672.

There’s no need to declare any exchange or queue since you pipeline will be responsible for its own setup.

Setup

In your mix.exs add TigerBeetlex, Broadway and Jason as dependencies

[
  {:tigerbeetlex, "~> 0.16.47"},
  {:broadway, "~> 1.2"},
  {:broadway_rabbitmq, "~> 0.7"},
  {:jason, "~> 1.2"}
]

Building the pipeline

Here’s the basic Broadway pipeline to consume TigerBeetle CDC data

defmodule MyApp.CDCPipeline do
  use Broadway

  alias TigerBeetlex.CDC.Event
  alias Broadway.Message

  @exchange "tigerbeetle"
  @queue "tigerbeetle_cdc_broadway"

  def start_link(_opts \\ []) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayRabbitMQ.Producer,
           connection: [
             host: "localhost",
             username: "guest",
             password: "guest",
           ],
           after_connect: &declare_rabbitmq_topology/1,
           queue: @queue,
           declare: [durable: true],
           bindings: [{@exchange, []}],
           on_failure: :reject_and_requeue}
      ],
      processors: [
        default: []
      ]
    )
  end

  defp declare_rabbitmq_topology(amqp_channel) do
    AMQP.Exchange.declare(amqp_channel, @exchange, :fanout, durable: true)
  end

  @impl true
  def handle_message(_processor, message, _context) do
    message
    |> Message.update_data(fn data ->
      # The message data is the JSON string representing the event.
      # We're going to decode it and then transform it in a `TigerBeetlex.CDC.Event` struct.
      data
      |> Jason.decode!()
      |> Event.cast!()
    end)
    |> process_message()
  end

  defp process_message(message) do
    # Here you would put your processing logic. We're just going to print the event.
    IO.inspect(message.data)

    # Note that `handle_message` must return the (possibly modified) message.
    message
  end
end

Here are some key highlights about the pipeline, feel free to consult the documentation of Broadway and BroadwayRabbitMQ for more information:

  • The connection key supports all options supported by AMQP.Connection.open to configure your RabbitMQ connection
  • The pipeline declares a tigerbeetle fanout exchange and a tigerbeetle_cdc_broadway queue, bound to the exchange. This means that all messages sent to the exchange will be sent to the queue, regardless of the routing key
  • Both the exchange and the queue are created with durable: true, meaning they survive RabbitMQ restarts. Declaring exchanges and queues is idempotent so it’s ok for the pipeline to declare them again
  • on_failure is set to :reject_and_requeue, which means that if a message fails to be processed it will be rejected and re-enqueued. The behavior you want to use here depends on your use case and your need to do strict in-order processing.

Once you have your pipeline defined, you can start it under your supervision tree. Here we’re just going to start it manually.

case MyApp.CDCPipeline.start_link() do
  {:ok, pid} -> {:ok, pid}
  # Since we're executing on Livebook, handle re-execution
  {:error, {:already_started, pid}} -> {:ok, pid}
  {:error, reason} -> {:error, reason}
end

Start TigerBeetle CDC Job

To start TigerBeetle CDC job issue the command below.

> #### ⚠️ Big data ahead > > The first time that it’s run, the CDC job will publish all data in your cluster since the beginning of time. > > If you want to only start publising from the current date, you have to pass the --timestamp-last providing an appropriate epoch timestamp in nanoseconds.

./tigerbeetle amqp --addresses=127.0.0.1:3000 --cluster=0 \
    --host=127.0.0.1 \
    --vhost=/ \
    --user=guest --password=guest \
    --publish-exchange=tigerbeetle

This will connect to the TigerBeetle cluster pointed to by addresses and cluster and start publising CDC data on the tigerbeetle exchange. Check out the full documentation of the tigerbeetle amqp message here.

Pushing some data

Let’s create two accounts to transact with.

alias TigerBeetlex.Account
alias TigerBeetlex.Connection
alias TigerBeetlex.ID

address = "127.0.0.1:3000"

{:ok, _pid} = Connection.start_link(name: :tb, cluster_id: <<0::128>>, addresses: [address])

account_1 =
  %Account{
    id: ID.from_int(42_000),
    ledger: 1,
    code: 2
  }

account_2 =
  %Account{
    id: ID.from_int(42_001),
    ledger: 1,
    code: 3
  }

{:ok, _account_errors} = Connection.create_accounts(:tb, [account_1, account_2])
# Error handling omitted

Now let’s create a Transfer between the accounts.

alias TigerBeetlex.Transfer

transfers = [
  %Transfer{
    id: ID.generate(),
    debit_account_id: ID.from_int(42_000),
    credit_account_id: ID.from_int(42_001),
    amount: 10_000,
    ledger: 1,
    code: 720
  }
]

{:ok, []} = Connection.create_transfers(:tb, transfers)

You should now see the print output from the pipeline (if you’re executing this in Livebook, it’s below the cell where you create the transfer).

And that’s it, you are now consuming TigerBeetle CDC data! Be sure to read Broadway and BroadwayRabbitMQ documentation and check out all their features (like, for example, batching) to understand how to modify this basic pipeline to suit your usecase.