GenStage producer that doesn’t feed the demand
Mix.install([
{:gen_stage, "~> 1.1"}
])
Section
defmodule Producer do
use GenStage
def init(_state), do: {:producer, :no_state}
def handle_demand(demand, _state) when demand > 0, do: send_demand(demand)
def handle_info({:send_demand, demand}, _state), do: send_demand(demand)
defp send_demand(demand) do
data = get_data(demand)
if length(data) < demand do
Process.send_after(self(), {:send_demand, demand}, :timer.seconds(1))
end
{:noreply, data, :no_state}
end
def get_data(_demand), do: [:rand.uniform()]
end
defmodule Consumer do
use GenStage
def init(_), do: {:consumer, :no_state, subscribe_to: [Producer]}
def handle_events(events, _from, state) do
Enum.each(events, &IO.inspect/1)
{:noreply, [], state}
end
end
GenStage.start_link(Producer, nil, name: Producer)
GenStage.start_link(Consumer, nil, name: Consumer)