Chapter 3 - Introducing GenStage
Mix.install([
{:gen_stage, "~> 1.2"}
])
Intro
GenStage provides fundamental building blocks for data processing pipelines that include support for back-pressure: a critical capability of reliable data pipelines.
GenServer allows building servers, GenStage allows building stages which are pieces of a data pipeline.
Which can be simple
graph LR;
Stage1(Stage) --> Stage2(Stage) --> Stage3(Stage) --> Stage4(Stage);
or complex
graph LR;
Stage1A(Stage)-->Stage2A(Stage);
Stage1A(Stage)-->Stage2B(Stage);
Stage1B(Stage)-->Stage2A(Stage);
Stage1B(Stage)-->Stage2B(Stage);
Stage2A(Stage)-->Stage3A(Stage);
Stage2A(Stage)-->Stage3B(Stage);
Stage2A(Stage)-->Stage3C(Stage);
Stage2B(Stage)-->Stage3A(Stage);
Stage2B(Stage)-->Stage3B(Stage);
Stage2B(Stage)-->Stage3C(Stage);
Stage3A(Stage)-->Stage4(Stage)
Stage3B(Stage)-->Stage4(Stage)
Stage3C(Stage)-->Stage4(Stage)
In GenStage it’s the last stage of a pipeline that controls the flow of data. Its demand for data is what drives data moving through the stages. If the last stage does not ask for data then the data does not flow.
graph LR;
Stage1(Stage)-->Stage2(Stage)-->Stage3(Stage)-->Stage4(Stage);
Stage4-.req data.->Stage3
Stage3-.req data.->Stage2
Stage2-.req data.->Stage1
This design decision ensure that back-pressure is always present in the system. If any stage is too busy to ask for data then the data flow naturally stops as well.
There are three different types of stages
- producer: produces events (any Elixir data type)
- consumer: receives events, subscribes to a producer
- producer/consumer: does what you think
Build a pipeline
defmodule Scraper do
def work() do
1..5
|> Enum.random()
|> :timer.seconds()
|> Process.sleep()
end
end
defmodule PageProducer do
use GenStage
require Logger
def start_link(_args) do
initial_state = []
GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
end
def init(initial_state) do
Logger.info("PageProducer init")
{:producer, initial_state}
end
def handle_demand(demand, state) do
Logger.info("PageProducer received demand for #{demand} pages")
events = []
{:noreply, events, state}
end
end
defmodule PageConsumer do
use GenStage
require Logger
def start_link(_args) do
initial_state = []
GenStage.start_link(__MODULE__, initial_state)
end
def init(initial_state) do
Logger.info("PageConsumer init")
{:consumer, initial_state, subscribe_to: [PageProducer]}
end
def handle_events(events, _from, state) do
Logger.info("PageConsumer received #{inspect(events)}")
Enum.each(events, fn _page ->
Scraper.work()
end)
{:noreply, [], state}
end
end
defmodule Scraper.Application do
require Logger
def start() do
children = [
PageProducer,
PageConsumer
]
opts = [strategy: :one_for_one, name: Scraper.Supervisor]
case Supervisor.start_link(children, opts) do
{:error, {:already_started, pid}} ->
Logger.info("Restarting supervisor #{inspect(pid)}")
Supervisor.stop(pid)
start()
result ->
result
end
end
end
Scraper.Application.start()
15:25:19.955 [info] PageProducer init
15:25:19.955 [info] PageConsumer init
15:25:19.955 [info] PageProducer received demand for 1000 pages
GenStage optimizes the data flow for peak efficiency that keeps producers and consumers busy. By default the max_demand of a consumer is 1000 and the min_demand is 500. When starting up a consumer will start off asking for its max_demand.
You can set max_demand of 1 if you want to ensure jobs aren’t processed in batches.
sequenceDiagram
Consumer-->>Producer: Request 1000
Producer->>Consumer: Supply 1000
Consumer->>Consumer: 1000 remain
Consumer-->>Consumer: Process 500
Consumer->>Consumer: 500 remain
Consumer-->>Producer: Request 500
Producer->>Consumer: Supply 500
Consumer->>Consumer: 1000 remain
Consumer-->>Consumer: Process 500
Supervisor.stop(Scraper.Supervisor)
GenStage Pipeline 2
defmodule PageConsumer2 do
use GenStage
require Logger
def start_link(_args) do
initial_state = []
GenStage.start_link(__MODULE__, initial_state)
end
def init(initial_state) do
Logger.info("PageConsumer init")
subscriptions = [
{PageProducer2, min_demand: 0, max_demand: 3}
]
{:consumer, initial_state, subscribe_to: subscriptions}
end
def handle_events(events, _from, state) do
Logger.info("PageConsumer received #{inspect(events)}")
Enum.each(events, fn _page ->
Scraper.work()
end)
{:noreply, [], state}
end
end
defmodule PageProducer2 do
use GenStage
require Logger
def start_link(_args) do
initial_state = []
GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
end
def init(initial_state) do
Logger.info("PageProducer init")
{:producer, initial_state}
end
def handle_demand(demand, state) do
Logger.info("PageProducer received demand for #{demand} pages")
events = []
{:noreply, events, state}
end
def scrape_pages(pages) when is_list(pages) do
GenStage.cast(__MODULE__, {:pages, pages})
end
def handle_cast({:pages, pages}, state) do
Logger.info("cast some pages #{inspect(pages)}")
{:noreply, pages, state}
end
end
defmodule Scraper.Application2 do
require Logger
def start() do
children = [
PageProducer2,
PageConsumer2
]
opts = [strategy: :one_for_one, name: Scraper.Supervisor2]
case Supervisor.start_link(children, opts) do
{:error, {:already_started, pid}} ->
Logger.info("Restarting supervisor #{inspect(pid)}")
Supervisor.stop(pid)
start()
result ->
result
end
end
end
Scraper.Application2.start()
pages = ["page1", "page2", "page3", "page4", "page5"]
PageProducer2.scrape_pages(pages)
PageProducer2.scrape_pages(["page6", "page7"])
Supervisor.stop(Scraper.Supervisor2)
Adding more consumers
Reduce consumer demand to min: 0, max: 1
defmodule PageConsumer3 do
use GenStage
require Logger
def start_link(_args) do
initial_state = []
GenStage.start_link(__MODULE__, initial_state)
end
def init(initial_state) do
Logger.info("PageConsumer init")
subscriptions = [
{PageProducer2, min_demand: 0, max_demand: 1}
]
{:consumer, initial_state, subscribe_to: subscriptions}
end
def handle_events(events, _from, state) do
Logger.info("PageConsumer received #{inspect(events)}")
Enum.each(events, fn _page ->
Scraper.work()
end)
{:noreply, [], state}
end
end
Start two parallel consumers.
defmodule Scraper.Application3 do
require Logger
def start() do
children = [
PageProducer2,
Supervisor.child_spec(PageConsumer3, id: :consumer_a),
Supervisor.child_spec(PageConsumer3, id: :consumer_b)
]
opts = [strategy: :one_for_one, name: Scraper.Supervisor3]
case Supervisor.start_link(children, opts) do
{:error, {:already_started, pid}} ->
Logger.info("Restarting supervisor #{inspect(pid)}")
Supervisor.stop(pid)
start()
result ->
result
end
end
end
Scraper.Application3.start()
pages = ["page1", "page2", "page3", "page4", "page5"]
PageProducer2.scrape_pages(pages)
PageProducer2.scrape_pages(["page6", "page7"])
Supervisor.stop(Scraper.Supervisor3)
What happens when producer buffer limit is exceeded?
defmodule PageProducer4 do
use GenStage
require Logger
def start_link(_args) do
initial_state = []
GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
end
def init(initial_state) do
Logger.info("PageProducer init with buffer size 1")
{:producer, initial_state, buffer_size: 1}
end
def handle_demand(demand, state) do
Logger.info("PageProducer received demand for #{demand} pages")
events = []
{:noreply, events, state}
end
def scrape_pages(pages) when is_list(pages) do
GenStage.cast(__MODULE__, {:pages, pages})
end
def handle_cast({:pages, pages}, state) do
Logger.info("cast some pages #{inspect(pages)}")
{:noreply, pages, state}
end
end
defmodule PageConsumer4 do
use GenStage
require Logger
def start_link(_args) do
initial_state = []
GenStage.start_link(__MODULE__, initial_state)
end
def init(initial_state) do
Logger.info("PageConsumer init")
subscriptions = [
{PageProducer4, min_demand: 0, max_demand: 1}
]
{:consumer, initial_state, subscribe_to: subscriptions}
end
def handle_events(events, _from, state) do
Logger.info("PageConsumer received #{inspect(events)}")
Enum.each(events, fn _page ->
Scraper.work()
end)
{:noreply, [], state}
end
end
defmodule Scraper.Application4 do
require Logger
def start() do
children = [
PageProducer4,
Supervisor.child_spec(PageConsumer4, id: :consumer_a),
Supervisor.child_spec(PageConsumer4, id: :consumer_b)
]
opts = [strategy: :one_for_one, name: Scraper.Supervisor4]
case Supervisor.start_link(children, opts) do
{:error, {:already_started, pid}} ->
Logger.info("Restarting supervisor #{inspect(pid)}")
Supervisor.stop(pid)
start()
result ->
result
end
end
end
Scraper.Application4.start()
pages = ["page1", "page2", "page3", "page4", "page5"]
PageProducer4.scrape_pages(pages)
PageProducer4.scrape_pages(["page6", "page7"])
Supervisor.stop(Scraper.Supervisor4)
Adding concurrency with ConsumerSupervisor
defmodule PageProducer do
use GenStage
require Logger
def start_link(_args) do
initial_state = []
GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
end
def init(initial_state) do
Logger.info("PageProducer init")
{:producer, initial_state}
end
def handle_demand(demand = 1, state) do
Logger.info("PageProducer received demand for #{demand} page")
events = []
{:noreply, events, state}
end
def handle_demand(demand, state) do
Logger.info("PageProducer received demand for #{demand} pages")
events = []
{:noreply, events, state}
end
def scrape_pages(pages) when is_list(pages) do
GenStage.cast(__MODULE__, {:pages, pages})
end
def handle_cast({:pages, pages}, state) do
Logger.info("cast some pages #{inspect(pages)}")
{:noreply, pages, state}
end
end
defmodule PageConsumerSupervisor do
use ConsumerSupervisor
require Logger
def start_link(_args) do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
Logger.info("PageConsumerSupervisor init")
children = [
%{
id: PageConsumer,
start: {PageConsumer, :start_link, []},
restart: :transient
}
]
opts = [
strategy: :one_for_one,
subscribe_to: [
{PageProducer, max_demand: 2}
]
]
ConsumerSupervisor.init(children, opts)
end
end
defmodule PageConsumer do
require Logger
def start_link(event) do
Logger.info("PageConsumer started by PageConsumerSupervisor with event #{event}")
Task.start_link(fn ->
Scraper.work()
end)
end
end
defmodule Scraper.Application do
require Logger
def start() do
children = [
PageProducer,
PageConsumerSupervisor
]
opts = [strategy: :one_for_one, name: Scraper.Supervisor]
case Supervisor.start_link(children, opts) do
{:error, {:already_started, pid}} ->
Logger.info("Restarting supervisor #{inspect(pid)}")
Supervisor.stop(pid)
start()
result ->
result
end
end
end
Scraper.Application.start()
pages = ["page1", "page2", "page3", "page4", "page5"]
PageProducer.scrape_pages(pages)
PageProducer.scrape_pages(["page6", "page7"])
Supervisor.stop(Scraper.Supervisor)
Multi-stage data pipelines
defmodule Scraper do
def work() do
1..5
|> Enum.random()
|> :timer.seconds()
|> Process.sleep()
end
def online?(_url) do
work()
Enum.random([false, true, true])
end
end
defmodule OnlinePageConsumer do
require Logger
def start_link(event) do
Logger.info("OnlinePageConsumer started with event #{event}")
Task.start_link(fn ->
Scraper.work()
end)
end
end
defmodule OnlinePageProducerConsumer do
require Logger
use GenStage
def start_link(_args) do
initial_state = []
GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
end
def init(initial_state) do
Logger.info("OnlinePageProducerConsumer init")
subscription = [
{PageProducer, min_demand: 0, max_demand: 1}
]
{:producer_consumer, initial_state, subscribe_to: subscription}
end
def handle_events(events, _from, state) do
Logger.info("OnlinePageProducerConsumer received #{inspect(events)}")
events = Enum.filter(events, &Scraper.online?/1)
Logger.info("Online: #{inspect(events)}")
{:noreply, events, state}
end
end
defmodule OnlinePageConsumerSupervisor do
use ConsumerSupervisor
require Logger
def start_link(_args) do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
Logger.info("PageConsumerSupervisor init")
children = [
%{
id: OnlinePageConsumer,
start: {OnlinePageConsumer, :start_link, []},
restart: :transient
}
]
opts = [
strategy: :one_for_one,
subscribe_to: [
{OnlinePageProducerConsumer, max_demand: 2}
]
]
ConsumerSupervisor.init(children, opts)
end
end
defmodule Scraper.Application do
require Logger
def start() do
children = [
PageProducer,
OnlinePageProducerConsumer,
OnlinePageConsumerSupervisor
]
opts = [strategy: :one_for_one, name: Scraper.Supervisor]
case Supervisor.start_link(children, opts) do
{:error, {:already_started, pid}} ->
Logger.info("Restarting supervisor #{inspect(pid)}")
Supervisor.stop(Scraper.Supervisor)
start()
result ->
result
end
end
end
Scraper.Application.start()
pages = ["page1", "page2", "page3", "page4", "page5"]
PageProducer.scrape_pages(pages)
PageProducer.scrape_pages(["page6", "page7"])
Supervisor.stop(Scraper.Supervisor)
Scaling up a stage with extra processes
defmodule Scraper.Application do
require Logger
def start() do
children = [
{Registry, keys: :unique, name: ProducerConsumerRegistry},
PageProducer,
producer_consumer_spec(id: 1),
producer_consumer_spec(id: 2),
OnlinePageConsumerSupervisor
]
opts = [strategy: :one_for_one, name: Scraper.Supervisor]
case Supervisor.start_link(children, opts) do
{:error, {:already_started, pid}} ->
Logger.info("Restarting supervisor #{inspect(pid)}")
Supervisor.stop(pid)
start()
result ->
result
end
end
def producer_consumer_spec(id: id) do
id = "online_page_producer_consumer_#{id}"
Supervisor.child_spec({OnlinePageProducerConsumer, id}, id: id)
end
def stop() do
Supervisor.stop(Scraper.Supervisor)
end
end
defmodule OnlinePageProducerConsumer do
require Logger
use GenStage
def start_link(id) do
initial_state = []
GenStage.start_link(__MODULE__, initial_state, name: via(id))
end
def init(initial_state) do
Logger.info("OnlinePageProducerConsumer init")
subscription = [
{PageProducer, min_demand: 0, max_demand: 1}
]
{:producer_consumer, initial_state, subscribe_to: subscription}
end
def handle_events(events, _from, state) do
Logger.info("OnlinePageProducerConsumer received #{inspect(events)}")
events = Enum.filter(events, &Scraper.online?/1)
Logger.info("Online: #{inspect(events)}")
{:noreply, events, state}
end
def via(id) do
{:via, Registry, {ProducerConsumerRegistry, id}}
end
end
defmodule OnlinePageConsumerSupervisor do
use ConsumerSupervisor
require Logger
def start_link(_args) do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
Logger.info("PageConsumerSupervisor init (from scaling up)")
children = [
%{
id: OnlinePageConsumer,
start: {OnlinePageConsumer, :start_link, []},
restart: :transient
}
]
opts = [
strategy: :one_for_one,
subscribe_to: [
{OnlinePageProducerConsumer.via("online_page_producer_consumer_1"), []},
{OnlinePageProducerConsumer.via("online_page_producer_consumer_2"), []}
]
]
ConsumerSupervisor.init(children, opts)
end
end
defmodule OnlinePageConsumer do
require Logger
def start_link(event) do
Logger.info("OnlinePageConsumer #{inspect(self())} (scaling up) started with event #{event}")
Task.start_link(fn ->
Scraper.work()
end)
end
end
Scraper.Application.start()
pages = ["page1", "page2", "page3", "page4", "page5"]
PageProducer.scrape_pages(pages)
PageProducer.scrape_pages(["page6", "page7"])
Scraper.Application.stop()