Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

GenStage

03-genstage.livemd

GenStage

Mix.install([:gen_stage])

{:ok, _} = Application.ensure_all_started(:crypto)

What Is GenStage

GenStage is a library maintained by the Elixir core team. It provides an abstraction over asynchronous computation that happens through multiple stages.

The idea is this: you have something generating events (whatever an event is). You want to feed those events through a pipeline of stages with varying topologies. That’s what GenStage gives you.

A GenStage stage is an OTP behaviour, similar to GenServer or :gen_statem. Below is a small example of a minimal producer stage.

defmodule SampleStage do
  use GenStage

  @impl true
  def init(_), do: {:producer, :nostate}

  @impl true
  def handle_demand(_demand, :nostate) do
    {:noreply, _events = [], :nostate}
  end
end

Stage Types

GenStage provides three stage types:

  • Producer stages
  • Producer-consumer stages
  • Consumer stages

A GenStage pipeline can only have one producer and one consumer, plus any number of producer-consumers. A pipeline could look something like this:

GenStage stages signal their type by returning it from the init/1 callback.

Demand

A foundational notion in GenStage is demand. We say that GenStage pipelines are demand-driven. Pipelines don’t flow from producers to consumers directly: the flow starts the opposite way. The very consuming end of the pipeline starts to “send demand” upstream, declaring itself ready to consume n events.

The demand flows upstream through any producer-consumers and eventually up to the producer. The producer should always generate events according to the demand that arrived to it. Only then events flow downstream through the stages.

The point of this demand-driven flow is to provide backpressure. Events will flow through the pipeline only as fast as stages can consume them.

A Simple Pipeline

Producer

Let’s start with producers. A producer’s job is to produce events according to the downstream demand. It has to implement the handle_demand/2 callback. GenStage invokes this callback whenever there is downstream demand, and passes the demand as an integer to it.

As an example, let’s build a producer that just produces random binaries, indefinitely.

defmodule RandomBinaryProducer do
  use GenStage

  def start_link(binary_size) do
    GenStage.start_link(__MODULE__, binary_size)
  end

  @impl true
  def init(binary_size) do
    {:producer, binary_size}
  end

  @impl true
  def handle_demand(demand, binary_size = _state) do
    # Processing is expensive! Let's simulate that by sleeping for a bit.
    Process.sleep(Enum.random(1000..5000))

    events =
      Stream.repeatedly(fn -> :crypto.strong_rand_bytes(binary_size) end)
      |> Enum.take(demand)

    {:noreply, events, binary_size}
  end
end

Consumer

Now, let’s add the dumbest consumer: it’ll consume these random binaries and print them to the standard output. So much for interesting examples, right?!

Consumers implement the handle_events/3 callback, which is invoked when there are new events coming from the producer. handle_events/3 can return events to pass downstream to the pipeline. For consumers, however, that list of events must always be []. We’ll see it in action in producer-consumers.

defmodule PrinterConsumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :nostate)
  end

  @impl true
  def init(:nostate) do
    {:consumer, :nostate}
  end

  @impl true
  def handle_events(binaries, _from, state) do
    Enum.each(binaries, &IO.inspect(&1, label: "Binary consumed in #{inspect(self())}"))
    {:noreply, _events = [], state}
  end
end

Wiring It Up

Alright, we’re ready to run our pipeline. As we mentioned, events flow downstream but the pipeline is “kicked off” by demand going upstream, from consumers all the way to producers.

For this reason, we have to glue the pipeline together starting from the consumer. GenStage provides functions to subscribe a consumer to a producer, such as GenStage.sync_subscribe/3.

{:ok, producer} = RandomBinaryProducer.start_link(_size = 12)
{:ok, consumer1} = PrinterConsumer.start_link()
{:ok, consumer2} = PrinterConsumer.start_link()

IO.puts("Ready, set, go!")

{:ok, subscription_tag1} =
  GenStage.sync_subscribe(consumer1,
    to: producer,
    cancel: :temporary,
    min_demand: 5,
    max_demand: 10
  )

{:ok, subscription_tag2} =
  GenStage.sync_subscribe(consumer2,
    to: producer,
    cancel: :temporary,
    min_demand: 5,
    max_demand: 10
  )

# After 10s, we shut down the pipeline to avoid it printing forever.
Process.sleep(10_000)
GenStage.cancel({producer, subscription_tag1}, :shutdown)
GenStage.cancel({producer, subscription_tag2}, :shutdown)

Producer-consumer

Let’s add a producer-consumer. It’s going to add the MD5 hash of each event it consumes, and emit the event downstream as {original_event, md5_hash}.

defmodule Hasher do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :nostate)
  end

  @impl true
  def init(:nostate) do
    {:producer_consumer, :nostate}
  end

  @impl true
  def handle_events(events, _from, :nostate) do
    events =
      for event <- events do
        {event, Base.encode64(:erlang.md5(event))}
      end

    # Here, "events" is not empty.
    {:noreply, events, :nostate}
  end
end
{:ok, producer} = RandomBinaryProducer.start_link(_size = 12)
{:ok, producer_consumer} = Hasher.start_link()
{:ok, consumer} = PrinterConsumer.start_link()

IO.puts("Ready, set, go!")

{:ok, first_subscription_tag} =
  GenStage.sync_subscribe(consumer,
    to: producer_consumer,
    cancel: :temporary,
    min_demand: 2,
    max_demand: 5
  )

{:ok, second_subscription_tag} =
  GenStage.sync_subscribe(producer_consumer,
    to: producer,
    cancel: :temporary,
    min_demand: 2,
    max_demand: 5
  )

Process.sleep(10_000)
GenStage.cancel({producer, second_subscription_tag}, :shutdown)
GenStage.cancel({producer_consumer, first_subscription_tag}, :shutdown)

Dispatching

Event dispatching is the missing piece in our understanding of GenStage. GenStage producers and producer-consumers dispatch events downstream based on dispatchers which can implement different dispatching strategies.

The default dispatcher is called a “demand dispatcher”. It hands events to the downstream consumer with the highest demand. This is intuitive: if a consumer has high demand, it means it already processed events and has “bandwidth” to process more.

You can write your own dispatcher by writing a module that implements the GenStage.Dispatcher behaviour. GenStage ships with two useful dispatchers.

GenStage.BroadcastDispatcher

This dispatcher dispatches copies of events to all subscribed downstream consumers. It can be useful, for example, when the same events need to be consumed by consumers that perform different kinds of work.

In the example below, you’ll notice how the same random binary is printed twice, once for each consumer.

defmodule RandomBinaryBroadcaster do
  use GenStage

  def start_link(binary_size) do
    GenStage.start_link(__MODULE__, binary_size)
  end

  @impl true
  def init(binary_size) do
    {:producer, binary_size, dispatcher: GenStage.BroadcastDispatcher}
  end

  @impl true
  def handle_demand(demand, binary_size = _state) do
    Process.sleep(Enum.random(1000..5000))

    events =
      Stream.repeatedly(fn -> :crypto.strong_rand_bytes(binary_size) end)
      |> Enum.take(demand)

    {:noreply, events, binary_size}
  end
end

{:ok, producer} = RandomBinaryBroadcaster.start_link(_size = 12)
{:ok, consumer1} = PrinterConsumer.start_link()
{:ok, consumer2} = PrinterConsumer.start_link()

IO.puts("Ready, set, go!")

{:ok, subscription_tag1} =
  GenStage.sync_subscribe(consumer1,
    to: producer,
    cancel: :temporary,
    min_demand: 5,
    max_demand: 10
  )

{:ok, subscription_tag2} =
  GenStage.sync_subscribe(consumer2,
    to: producer,
    cancel: :temporary,
    min_demand: 5,
    max_demand: 10
  )

Process.sleep(10_000)
GenStage.cancel({producer, subscription_tag1}, :shutdown)
GenStage.cancel({producer, subscription_tag2}, :shutdown)

GenStage.PartitionDispatcher

This dispatcher dispatches events based on a partitioning key on the event itself. Consumers can subscribe to a producer that uses this dispatcher and specify the partition they want to consume. This is useful to dispatch events deterministically, which can help with keeping state in the consumer (think of caching, ownership, and so on).

defmodule PartitionProducer do
  use GenStage

  require Integer

  def start_link do
    GenStage.start_link(__MODULE__, :no_state)
  end

  @impl true
  def init(:no_state) do
    dispatcher = {GenStage.PartitionDispatcher, partitions: [:odd, :even], hash: &amp;hash/1}
    {:producer, :no_state, dispatcher: dispatcher}
  end

  @impl true
  def handle_demand(demand, state) do
    Process.sleep(Enum.random(1000..5000))
    {:noreply, Enum.take_random(1..1000, demand), state}
  end

  defp hash(event) when Integer.is_even(event), do: {event, :even}
  defp hash(event) when Integer.is_odd(event), do: {event, :odd}
end

{:ok, producer} = PartitionProducer.start_link()
{:ok, consumer1} = PrinterConsumer.start_link()
{:ok, consumer2} = PrinterConsumer.start_link()

IO.puts("Ready, set, go!")

{:ok, subscription_tag1} =
  GenStage.sync_subscribe(consumer1,
    to: producer,
    partition: :even,
    cancel: :temporary,
    min_demand: 5,
    max_demand: 10
  )

{:ok, subscription_tag2} =
  GenStage.sync_subscribe(consumer2,
    to: producer,
    partition: :odd,
    cancel: :temporary,
    min_demand: 5,
    max_demand: 10
  )

Process.sleep(10_000)
GenStage.cancel({producer, subscription_tag1}, :shutdown)
GenStage.cancel({producer, subscription_tag2}, :shutdown)

As you can see, all odd integers are printed by the same consumer, and all the even ones are printed by the same (other) consumer.