Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

GenStage producer that doesn't feed the demand

genstage-producer-that-doesnt-feed-the-demand.livemd

GenStage producer that doesn’t feed the demand

Mix.install([
  {:gen_stage, "~> 1.1"}
])

Section

defmodule Producer do
  use GenStage

  def init(_state), do: {:producer, :no_state}

  def handle_demand(demand, _state) when demand > 0, do: send_demand(demand)

  def handle_info({:send_demand, demand}, _state), do: send_demand(demand)

  defp send_demand(demand) do
    data = get_data(demand)

    if length(data) < demand do
      Process.send_after(self(), {:send_demand, demand}, :timer.seconds(1))
    end

    {:noreply, data, :no_state}
  end

  def get_data(_demand), do: [:rand.uniform()]
end
defmodule Consumer do
  use GenStage

  def init(_), do: {:consumer, :no_state, subscribe_to: [Producer]}

  def handle_events(events, _from, state) do
    Enum.each(events, &amp;IO.inspect/1)
    {:noreply, [], state}
  end
end
GenStage.start_link(Producer, nil, name: Producer)
GenStage.start_link(Consumer, nil, name: Consumer)