Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Chapter 3 - Introducing GenStage

chapter3-introducing-genstage.livemd

Chapter 3 - Introducing GenStage

Mix.install([
  {:gen_stage, "~> 1.2"}
])

Intro

GenStage provides fundamental building blocks for data processing pipelines that include support for back-pressure: a critical capability of reliable data pipelines.

GenServer allows building servers, GenStage allows building stages which are pieces of a data pipeline.

Which can be simple

graph LR;
  Stage1(Stage) --> Stage2(Stage) --> Stage3(Stage) --> Stage4(Stage);

or complex

graph LR;
  Stage1A(Stage)-->Stage2A(Stage);
  Stage1A(Stage)-->Stage2B(Stage);
  
  Stage1B(Stage)-->Stage2A(Stage);
  Stage1B(Stage)-->Stage2B(Stage);

  Stage2A(Stage)-->Stage3A(Stage);
  Stage2A(Stage)-->Stage3B(Stage);
  Stage2A(Stage)-->Stage3C(Stage);

  Stage2B(Stage)-->Stage3A(Stage);
  Stage2B(Stage)-->Stage3B(Stage);
  Stage2B(Stage)-->Stage3C(Stage);

  Stage3A(Stage)-->Stage4(Stage)
  Stage3B(Stage)-->Stage4(Stage)
  Stage3C(Stage)-->Stage4(Stage)

In GenStage it’s the last stage of a pipeline that controls the flow of data. Its demand for data is what drives data moving through the stages. If the last stage does not ask for data then the data does not flow.

graph LR;
  Stage1(Stage)-->Stage2(Stage)-->Stage3(Stage)-->Stage4(Stage);
  Stage4-.req data.->Stage3
  Stage3-.req data.->Stage2
  Stage2-.req data.->Stage1

This design decision ensure that back-pressure is always present in the system. If any stage is too busy to ask for data then the data flow naturally stops as well.

There are three different types of stages

  • producer: produces events (any Elixir data type)
  • consumer: receives events, subscribes to a producer
  • producer/consumer: does what you think

Build a pipeline

defmodule Scraper do
  def work() do
    1..5
    |> Enum.random()
    |> :timer.seconds()
    |> Process.sleep()
  end
end
defmodule PageProducer do
  use GenStage
  require Logger

  def start_link(_args) do
    initial_state = []
    GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
  end

  def init(initial_state) do
    Logger.info("PageProducer init")
    {:producer, initial_state}
  end

  def handle_demand(demand, state) do
    Logger.info("PageProducer received demand for #{demand} pages")
    events = []
    {:noreply, events, state}
  end
end
defmodule PageConsumer do
  use GenStage
  require Logger

  def start_link(_args) do
    initial_state = []
    GenStage.start_link(__MODULE__, initial_state)
  end

  def init(initial_state) do
    Logger.info("PageConsumer init")
    {:consumer, initial_state, subscribe_to: [PageProducer]}
  end

  def handle_events(events, _from, state) do
    Logger.info("PageConsumer received #{inspect(events)}")

    Enum.each(events, fn _page ->
      Scraper.work()
    end)

    {:noreply, [], state}
  end
end
defmodule Scraper.Application do
  require Logger

  def start() do
    children = [
      PageProducer,
      PageConsumer
    ]

    opts = [strategy: :one_for_one, name: Scraper.Supervisor]

    case Supervisor.start_link(children, opts) do
      {:error, {:already_started, pid}} ->
        Logger.info("Restarting supervisor #{inspect(pid)}")
        Supervisor.stop(pid)
        start()

      result ->
        result
    end
  end
end
Scraper.Application.start()
15:25:19.955 [info]  PageProducer init

15:25:19.955 [info]  PageConsumer init

15:25:19.955 [info]  PageProducer received demand for 1000 pages

GenStage optimizes the data flow for peak efficiency that keeps producers and consumers busy. By default the max_demand of a consumer is 1000 and the min_demand is 500. When starting up a consumer will start off asking for its max_demand.

You can set max_demand of 1 if you want to ensure jobs aren’t processed in batches.

sequenceDiagram
    Consumer-->>Producer: Request 1000
    Producer->>Consumer: Supply 1000
    Consumer->>Consumer: 1000 remain
    Consumer-->>Consumer: Process 500
    Consumer->>Consumer: 500 remain
    Consumer-->>Producer: Request 500
    Producer->>Consumer: Supply 500
    Consumer->>Consumer: 1000 remain
    Consumer-->>Consumer: Process 500
Supervisor.stop(Scraper.Supervisor)

GenStage Pipeline 2

defmodule PageConsumer2 do
  use GenStage
  require Logger

  def start_link(_args) do
    initial_state = []
    GenStage.start_link(__MODULE__, initial_state)
  end

  def init(initial_state) do
    Logger.info("PageConsumer init")

    subscriptions = [
      {PageProducer2, min_demand: 0, max_demand: 3}
    ]

    {:consumer, initial_state, subscribe_to: subscriptions}
  end

  def handle_events(events, _from, state) do
    Logger.info("PageConsumer received #{inspect(events)}")

    Enum.each(events, fn _page ->
      Scraper.work()
    end)

    {:noreply, [], state}
  end
end
defmodule PageProducer2 do
  use GenStage
  require Logger

  def start_link(_args) do
    initial_state = []
    GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
  end

  def init(initial_state) do
    Logger.info("PageProducer init")
    {:producer, initial_state}
  end

  def handle_demand(demand, state) do
    Logger.info("PageProducer received demand for #{demand} pages")
    events = []
    {:noreply, events, state}
  end

  def scrape_pages(pages) when is_list(pages) do
    GenStage.cast(__MODULE__, {:pages, pages})
  end

  def handle_cast({:pages, pages}, state) do
    Logger.info("cast some pages #{inspect(pages)}")
    {:noreply, pages, state}
  end
end
defmodule Scraper.Application2 do
  require Logger

  def start() do
    children = [
      PageProducer2,
      PageConsumer2
    ]

    opts = [strategy: :one_for_one, name: Scraper.Supervisor2]

    case Supervisor.start_link(children, opts) do
      {:error, {:already_started, pid}} ->
        Logger.info("Restarting supervisor #{inspect(pid)}")
        Supervisor.stop(pid)
        start()

      result ->
        result
    end
  end
end
Scraper.Application2.start()
pages = ["page1", "page2", "page3", "page4", "page5"]

PageProducer2.scrape_pages(pages)
PageProducer2.scrape_pages(["page6", "page7"])
Supervisor.stop(Scraper.Supervisor2)

Adding more consumers

Reduce consumer demand to min: 0, max: 1

defmodule PageConsumer3 do
  use GenStage
  require Logger

  def start_link(_args) do
    initial_state = []
    GenStage.start_link(__MODULE__, initial_state)
  end

  def init(initial_state) do
    Logger.info("PageConsumer init")

    subscriptions = [
      {PageProducer2, min_demand: 0, max_demand: 1}
    ]

    {:consumer, initial_state, subscribe_to: subscriptions}
  end

  def handle_events(events, _from, state) do
    Logger.info("PageConsumer received #{inspect(events)}")

    Enum.each(events, fn _page ->
      Scraper.work()
    end)

    {:noreply, [], state}
  end
end

Start two parallel consumers.

defmodule Scraper.Application3 do
  require Logger

  def start() do
    children = [
      PageProducer2,
      Supervisor.child_spec(PageConsumer3, id: :consumer_a),
      Supervisor.child_spec(PageConsumer3, id: :consumer_b)
    ]

    opts = [strategy: :one_for_one, name: Scraper.Supervisor3]

    case Supervisor.start_link(children, opts) do
      {:error, {:already_started, pid}} ->
        Logger.info("Restarting supervisor #{inspect(pid)}")
        Supervisor.stop(pid)
        start()

      result ->
        result
    end
  end
end
Scraper.Application3.start()
pages = ["page1", "page2", "page3", "page4", "page5"]

PageProducer2.scrape_pages(pages)
PageProducer2.scrape_pages(["page6", "page7"])
Supervisor.stop(Scraper.Supervisor3)

What happens when producer buffer limit is exceeded?

defmodule PageProducer4 do
  use GenStage
  require Logger

  def start_link(_args) do
    initial_state = []
    GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
  end

  def init(initial_state) do
    Logger.info("PageProducer init with buffer size 1")
    {:producer, initial_state, buffer_size: 1}
  end

  def handle_demand(demand, state) do
    Logger.info("PageProducer received demand for #{demand} pages")
    events = []
    {:noreply, events, state}
  end

  def scrape_pages(pages) when is_list(pages) do
    GenStage.cast(__MODULE__, {:pages, pages})
  end

  def handle_cast({:pages, pages}, state) do
    Logger.info("cast some pages #{inspect(pages)}")
    {:noreply, pages, state}
  end
end
defmodule PageConsumer4 do
  use GenStage
  require Logger

  def start_link(_args) do
    initial_state = []
    GenStage.start_link(__MODULE__, initial_state)
  end

  def init(initial_state) do
    Logger.info("PageConsumer init")

    subscriptions = [
      {PageProducer4, min_demand: 0, max_demand: 1}
    ]

    {:consumer, initial_state, subscribe_to: subscriptions}
  end

  def handle_events(events, _from, state) do
    Logger.info("PageConsumer received #{inspect(events)}")

    Enum.each(events, fn _page ->
      Scraper.work()
    end)

    {:noreply, [], state}
  end
end
defmodule Scraper.Application4 do
  require Logger

  def start() do
    children = [
      PageProducer4,
      Supervisor.child_spec(PageConsumer4, id: :consumer_a),
      Supervisor.child_spec(PageConsumer4, id: :consumer_b)
    ]

    opts = [strategy: :one_for_one, name: Scraper.Supervisor4]

    case Supervisor.start_link(children, opts) do
      {:error, {:already_started, pid}} ->
        Logger.info("Restarting supervisor #{inspect(pid)}")
        Supervisor.stop(pid)
        start()

      result ->
        result
    end
  end
end
Scraper.Application4.start()
pages = ["page1", "page2", "page3", "page4", "page5"]

PageProducer4.scrape_pages(pages)
PageProducer4.scrape_pages(["page6", "page7"])
Supervisor.stop(Scraper.Supervisor4)

Adding concurrency with ConsumerSupervisor

defmodule PageProducer do
  use GenStage
  require Logger

  def start_link(_args) do
    initial_state = []
    GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
  end

  def init(initial_state) do
    Logger.info("PageProducer init")
    {:producer, initial_state}
  end

  def handle_demand(demand = 1, state) do
    Logger.info("PageProducer received demand for #{demand} page")
    events = []
    {:noreply, events, state}
  end

  def handle_demand(demand, state) do
    Logger.info("PageProducer received demand for #{demand} pages")
    events = []
    {:noreply, events, state}
  end

  def scrape_pages(pages) when is_list(pages) do
    GenStage.cast(__MODULE__, {:pages, pages})
  end

  def handle_cast({:pages, pages}, state) do
    Logger.info("cast some pages #{inspect(pages)}")
    {:noreply, pages, state}
  end
end
defmodule PageConsumerSupervisor do
  use ConsumerSupervisor
  require Logger

  def start_link(_args) do
    ConsumerSupervisor.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    Logger.info("PageConsumerSupervisor init")

    children = [
      %{
        id: PageConsumer,
        start: {PageConsumer, :start_link, []},
        restart: :transient
      }
    ]

    opts = [
      strategy: :one_for_one,
      subscribe_to: [
        {PageProducer, max_demand: 2}
      ]
    ]

    ConsumerSupervisor.init(children, opts)
  end
end
defmodule PageConsumer do
  require Logger

  def start_link(event) do
    Logger.info("PageConsumer started by PageConsumerSupervisor with event #{event}")

    Task.start_link(fn ->
      Scraper.work()
    end)
  end
end
defmodule Scraper.Application do
  require Logger

  def start() do
    children = [
      PageProducer,
      PageConsumerSupervisor
    ]

    opts = [strategy: :one_for_one, name: Scraper.Supervisor]

    case Supervisor.start_link(children, opts) do
      {:error, {:already_started, pid}} ->
        Logger.info("Restarting supervisor #{inspect(pid)}")
        Supervisor.stop(pid)
        start()

      result ->
        result
    end
  end
end
Scraper.Application.start()
pages = ["page1", "page2", "page3", "page4", "page5"]

PageProducer.scrape_pages(pages)
PageProducer.scrape_pages(["page6", "page7"])
Supervisor.stop(Scraper.Supervisor)

Multi-stage data pipelines

defmodule Scraper do
  def work() do
    1..5
    |> Enum.random()
    |> :timer.seconds()
    |> Process.sleep()
  end

  def online?(_url) do
    work()
    Enum.random([false, true, true])
  end
end
defmodule OnlinePageConsumer do
  require Logger

  def start_link(event) do
    Logger.info("OnlinePageConsumer started with event #{event}")

    Task.start_link(fn ->
      Scraper.work()
    end)
  end
end
defmodule OnlinePageProducerConsumer do
  require Logger
  use GenStage

  def start_link(_args) do
    initial_state = []
    GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
  end

  def init(initial_state) do
    Logger.info("OnlinePageProducerConsumer init")

    subscription = [
      {PageProducer, min_demand: 0, max_demand: 1}
    ]

    {:producer_consumer, initial_state, subscribe_to: subscription}
  end

  def handle_events(events, _from, state) do
    Logger.info("OnlinePageProducerConsumer received #{inspect(events)}")
    events = Enum.filter(events, &Scraper.online?/1)
    Logger.info("Online: #{inspect(events)}")
    {:noreply, events, state}
  end
end
defmodule OnlinePageConsumerSupervisor do
  use ConsumerSupervisor
  require Logger

  def start_link(_args) do
    ConsumerSupervisor.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    Logger.info("PageConsumerSupervisor init")

    children = [
      %{
        id: OnlinePageConsumer,
        start: {OnlinePageConsumer, :start_link, []},
        restart: :transient
      }
    ]

    opts = [
      strategy: :one_for_one,
      subscribe_to: [
        {OnlinePageProducerConsumer, max_demand: 2}
      ]
    ]

    ConsumerSupervisor.init(children, opts)
  end
end
defmodule Scraper.Application do
  require Logger

  def start() do
    children = [
      PageProducer,
      OnlinePageProducerConsumer,
      OnlinePageConsumerSupervisor
    ]

    opts = [strategy: :one_for_one, name: Scraper.Supervisor]

    case Supervisor.start_link(children, opts) do
      {:error, {:already_started, pid}} ->
        Logger.info("Restarting supervisor #{inspect(pid)}")
        Supervisor.stop(Scraper.Supervisor)
        start()

      result ->
        result
    end
  end
end
Scraper.Application.start()
pages = ["page1", "page2", "page3", "page4", "page5"]

PageProducer.scrape_pages(pages)
PageProducer.scrape_pages(["page6", "page7"])
Supervisor.stop(Scraper.Supervisor)

Scaling up a stage with extra processes

defmodule Scraper.Application do
  require Logger

  def start() do
    children = [
      {Registry, keys: :unique, name: ProducerConsumerRegistry},
      PageProducer,
      producer_consumer_spec(id: 1),
      producer_consumer_spec(id: 2),
      OnlinePageConsumerSupervisor
    ]

    opts = [strategy: :one_for_one, name: Scraper.Supervisor]

    case Supervisor.start_link(children, opts) do
      {:error, {:already_started, pid}} ->
        Logger.info("Restarting supervisor #{inspect(pid)}")
        Supervisor.stop(pid)
        start()

      result ->
        result
    end
  end

  def producer_consumer_spec(id: id) do
    id = "online_page_producer_consumer_#{id}"
    Supervisor.child_spec({OnlinePageProducerConsumer, id}, id: id)
  end

  def stop() do
    Supervisor.stop(Scraper.Supervisor)
  end
end
defmodule OnlinePageProducerConsumer do
  require Logger
  use GenStage

  def start_link(id) do
    initial_state = []
    GenStage.start_link(__MODULE__, initial_state, name: via(id))
  end

  def init(initial_state) do
    Logger.info("OnlinePageProducerConsumer init")

    subscription = [
      {PageProducer, min_demand: 0, max_demand: 1}
    ]

    {:producer_consumer, initial_state, subscribe_to: subscription}
  end

  def handle_events(events, _from, state) do
    Logger.info("OnlinePageProducerConsumer received #{inspect(events)}")
    events = Enum.filter(events, &Scraper.online?/1)
    Logger.info("Online: #{inspect(events)}")
    {:noreply, events, state}
  end

  def via(id) do
    {:via, Registry, {ProducerConsumerRegistry, id}}
  end
end
defmodule OnlinePageConsumerSupervisor do
  use ConsumerSupervisor
  require Logger

  def start_link(_args) do
    ConsumerSupervisor.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    Logger.info("PageConsumerSupervisor init (from scaling up)")

    children = [
      %{
        id: OnlinePageConsumer,
        start: {OnlinePageConsumer, :start_link, []},
        restart: :transient
      }
    ]

    opts = [
      strategy: :one_for_one,
      subscribe_to: [
        {OnlinePageProducerConsumer.via("online_page_producer_consumer_1"), []},
        {OnlinePageProducerConsumer.via("online_page_producer_consumer_2"), []}
      ]
    ]

    ConsumerSupervisor.init(children, opts)
  end
end
defmodule OnlinePageConsumer do
  require Logger

  def start_link(event) do
    Logger.info("OnlinePageConsumer #{inspect(self())} (scaling up) started with event #{event}")

    Task.start_link(fn ->
      Scraper.work()
    end)
  end
end
Scraper.Application.start()
pages = ["page1", "page2", "page3", "page4", "page5"]

PageProducer.scrape_pages(pages)
PageProducer.scrape_pages(["page6", "page7"])
Scraper.Application.stop()