Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Broadway

05-broadway.livemd

Broadway

Mix.install([
  :amqp,
  :broadway,
  :broadway_rabbitmq,
  :jason,
  :kino
])

case AMQP.Connection.open() do
  {:ok, conn} ->
    :ok = AMQP.Connection.close(conn)

  {:error, reason} ->
    raise """
    there doesn't seem to be a RabbitMQ instance accepting connections at
    localhost:5672: #{inspect(reason)}
    """
end

defmodule VisualHelpers do
  # Don't do this. It's Broadway private API and it could change any time!
  def broadway_sup_tree(pipeline) do
    sup = :sys.get_state(pipeline).supervisor_pid
    # Kino.Process.sup_tree/2 doesn't render non-alive processes correctly, so let's
    # remove them.
    Supervisor.delete_child(sup, Broadway.Topology.RateLimiter)
    Kino.Process.sup_tree(sup, direction: :left_right)
  end
end

Before We Start

In the setup block above, we try to connect to RabbitMQ to check whether you have it running. We use RabbitMQ in this livebook to feed messages to our Broadway examples.

If you don’t have RabbitMQ installed, the easiest way to quickly spin up an instance is through Docker:

$ docker run --rm -it -p 5672:5672 rabbitmq

The Big Stage

Broadway is a tool to build data ingestion and data processing pipelines. It’s a sibling of Flow that builds on top of GenStage. You can think of Flow as the solution to the problem “how do I use GenStage in a simple way to process collections?”. Broadway is the answer to the question “how do I use GenStage in a simple way to ingest and process data from different sources?”.

Compared to GenStage, Broadway is more declarative. It provides many abstractions that you could build yourself with GenStage, but it makes them easy to configure. Some of those are:

  • batching
  • (automatic) acknowledgements
  • graceful shutdown and draining
  • rate limiting
  • instrumentation

That all sounds fantastic. Let’s dive in.

Broadway’s Architecture

Broadway’s architecture is made of three main components:

  • a set of producers — these produce the messages that feed the pipeline
  • a set of processors — these do per-message processing
  • a (optional) set of batch processes — these, if present, work on batches of processed messages

In its simplest form, a Broadway pipeline is made of producers and processors.

Producers

A Broadway producer is a GenStage producer that emits Broadway.Message structs as its events. One of the coolest things about Broadway is that its ecosystem comes with several existing producers. These produce messages out of the most common message queues, databases, and more. Some of the existing producers are:

Pipeline Example

Let’s start with an example of a simple Broadway pipeline. We’ll use the RabbitMQ producer. Our pipeline will consume messages from a RabbitMQ queue and, guess what, print them to standard output! 😀 Read the RabbitMQ section above to make sure you have RabbitMQ running locally.

defmodule RabbitMQPrinterPipeline do
  use Broadway

  def start_link do
    producer_opts = [
      # The queue to consume from.
      queue: "my_queue",
      on_failure: :reject_and_requeue
    ]

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {BroadwayRabbitMQ.Producer, producer_opts},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 2]
      ]
    )
  end

  # The only callback you need to process messages.
  @impl true
  def handle_message(:default, message, _context) do
    IO.inspect(message, label: "Message in processor #{inspect(self())}")
    message
  end
end

Before running this, let’s start up a connection RabbitMQ using the AMQP library. We also declare an exchange, a queue, and bind the queue to the exchange. Take a look at CloudAMQP’s RabbitMQ introduction if you want to learn more about RabbitMQ concepts.

{:ok, conn} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(conn)
:ok = AMQP.Exchange.declare(channel, "my_exchange", :topic)
{:ok, _info} = AMQP.Queue.declare(channel, "my_queue")
:ok = AMQP.Queue.bind(channel, "my_queue", "my_exchange", routing_key: "print.*")

# Publish messages
:ok = AMQP.Basic.publish(channel, "my_exchange", "print.this", "hello world!")
:ok = AMQP.Basic.publish(channel, "my_exchange", "print.that", "this is Broadway speaking!")

:ok = AMQP.Channel.close(channel)
:ok = AMQP.Connection.close(conn)

Okay, we’re ready to rumble. We already published a couple of messages in the code above. These are now sitting in the my_queue queue in RabbitMQ. When we’ll connect a consumer to it, RabbitMQ will deliver the messages to the consumer. We’ll start our Broadway pipeline below, which does exactly that: it subscribes a consumer to the queue.

# Start the pipeline
{:ok, pipeline} = RabbitMQPrinterPipeline.start_link()

# Shut it down after 3 seconds
Process.sleep(3_000)
Broadway.stop(pipeline)

As you can see, we return the Broadway.Message struct itself from the handle_message/3 callback. This functional approach is fantastic, because it lets us only operate on data. It also lets us tell Broadway when something went wrong, for example by returning a “failed” message using Broadway.Message.failed/2.

When handle_message/3 returns, Broadway acknowledges the message. What this means depends on the producer. In RabbitMQ’s case, it means that Broadway does a RabbitMQ ack of the message, which in turn causes RabbitMQ itself to consider the message as consumed. For other producers, semantics might be slightly different.

If there’s a crash in handle_message/3, Broadway will follow the :on_failure option we gave when starting the producer. In this case, it will reject the message and requeue it, using the RabbitMQ “nack” operation with requeue: true.

Batching

Broadway pipelines also support batching. You can declare different batchers when starting the Broadway pipeline. When you return a message from the handle_message/3 callback, you can put a batcher term in it. Broadway uses this term to batch the message to the right batcher. Let’s see an example where consume messages from RabbitMQ that contain a simple JSON like:

{"value": 42}

We’ll get the "value" key and batch it based on whether it’s even or odd.

defmodule RabbitMQBatchedPipeline do
  use Broadway

  require Integer

  def start_link do
    producer_opts = [
      queue: "my_queue",
      on_failure: :reject_and_requeue
    ]

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {BroadwayRabbitMQ.Producer, producer_opts},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 2]
      ],
      batchers: [
        odd: [concurrency: 1, batch_size: 3],
        even: [concurrency: 1, batch_size: 3]
      ]
    )
  end

  @impl true
  def handle_message(:default, message, _context) do
    %{"value" => value} = Jason.decode!(message.data)
    message = Broadway.Message.put_data(message, value)

    if Integer.is_even(value) do
      Broadway.Message.put_batcher(message, :even)
    else
      Broadway.Message.put_batcher(message, :odd)
    end
  end

  @impl true
  def handle_batch(batcher, messages, _batch_info, _context) do
    messages
    |> Enum.map(& &1.data)
    |> IO.inspect(label: "Batch of messages in #{inspect(batcher)} batcher")

    messages
  end
end

Let’s run this code. We’ll also visualize the supervision tree started by Broadway at the end, which is pretty cool and gives you an idea of what’s going on under the hood.

{:ok, conn} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(conn)
:ok = AMQP.Exchange.declare(channel, "my_exchange", :topic)
{:ok, _info} = AMQP.Queue.declare(channel, "my_queue")
:ok = AMQP.Queue.bind(channel, "my_queue", "my_exchange", routing_key: "print.*")

# Publish messages
for int <- 500..520 do
  :ok = AMQP.Basic.publish(channel, "my_exchange", "print.this", ~s({"value": #{int}}))
end

# Start the pipeline
{:ok, pipeline} = RabbitMQBatchedPipeline.start_link()

# Shut it down after 5 seconds
Task.start(fn ->
  Process.sleep(5_000)
  Broadway.stop(pipeline)
end)

# Visualize the supervision tree started by Broadway.
VisualHelpers.broadway_sup_tree(pipeline)

Batching works on batch size plus batch timeout. If a batches reaches the configured size, then it gets handed to the batcher. If it doesn’t reach the configured size within a configurable timeout, it gets handed to the batcher anyways when the timeout expires, as we can see in the example above with the last two batches.