Powered by AppSignal & Oban Pro

Streaming

guides/getting_started/stream.livemd

Streaming

gRPC streaming in Elixir introduces a fully composable way to process data as it flows between client and server. Instead of treating each request as an isolated transaction, streaming allows messages to be transformed incrementally, combined with other streams and enriched through side-effects while still in transit.

The objective here is to demonstrate how GRPC.Stream enables functional composition applied over live data, allowing pipelines to evolve, react and continue processing even under constant input.

By exploring unary, server streaming and bidirectional streaming examples, this document highlights how state, concurrency and data transformation can interact seamlessly in an event-driven communication model.


Setup

app_root = Path.join(__DIR__, "..")

Mix.install(
  [
    {:grpc, path: app_root, env: :dev}
  ],
  config_path: Path.join(app_root, "config/config.exs"),
  lockfile: Path.join(app_root, "mix.lock")
)

Proto Messages & Service

For simplicity, all proto definitions are inline.

defmodule Stream.HelloRequest do
  use Protobuf, syntax: :proto3
  field :name, 1, type: :string
end

defmodule Stream.HelloReply do
  use Protobuf, syntax: :proto3
  field :message, 1, type: :string
end

defmodule Stream.EchoServer.Service do
  use GRPC.Service, name: "stream.EchoServer"

  rpc :SayUnaryHello, Stream.HelloRequest, Stream.HelloReply
  rpc :SayServerHello, Stream.HelloRequest, stream(Stream.HelloReply)
  rpc :SayBidStreamHello, stream(Stream.HelloRequest), stream(Stream.HelloReply)
end

defmodule Stream.EchoServer.Stub do
  use GRPC.Stub, service: Stream.EchoServer.Service
end

Transformer (Helper Process)

Used for unary example composition.

defmodule Transformer do
  use GenServer

  alias Stream.HelloRequest
  alias Stream.HelloReply

  def start_link(_) do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def init(_), do: {:ok, %{}}

  def handle_info({:request, %HelloRequest{} = value, from}, state) do
    Process.send(from, {:response, %HelloReply{message: "Hello #{value.name}"}}, [])
    {:noreply, state}
  end
end

{:ok, _} = Transformer.start_link(nil)

Server Implementation

defmodule EchoStreamServer do
  use GRPC.Server, service: Stream.EchoServer.Service

  alias GRPC.Stream, as: GRPCStream
  alias Stream.HelloRequest
  alias Stream.HelloReply

  # Unary example
  def say_unary_hello(%HelloRequest{name: name}, _stream) do
    GRPCStream.unary(request)
    |> GRPCStream.ask(Transformer)
    |> GRPCStream.map(fn
      %HelloReply{} = reply ->
        %HelloReply{message: "[Reply] #{reply.message}"}

      {:error, reason} ->
        {:error, GRPC.RPCError.exception(message: "[Error] #{inspect(reason)}")}
    end)
    |> GRPCStream.run()
  end

  # Server‑Side streaming
  def say_server_hello(%HelloRequest{name: name} = _req, stream) do
    Stream.repeatedly(fn ->
      %HelloReply{message: "Hello from server → #{name}"}
    end)
    |> Stream.take(5)
    |> GRPCStream.from()
    |> GRPCStream.run_with(stream)
  end

  # Bidirectional streaming
  def say_bid_stream_hello(request_stream, stream) do
    GRPCStream.from(request_stream, join_with: output_join_stream())
    |> GRPCStream.map(fn
      %HelloRequest{name: name} ->
        %HelloReply{message: "Welcome #{name}!"}
      msg ->
        msg
    end)
    |> GRPCStream.run_with(stream)
  end

  defp output_join_stream() do
    Stream.repeatedly(fn ->
      %Stream.HelloReply{message: "↔ Server heartbeat"}
    end)
  end
end

Endpoint + Supervisor

defmodule StreamingEndpoint do
  use GRPC.Endpoint
  intercept(GRPC.Server.Interceptors.Logger)
  run(EchoStreamServer)
end

{:ok, _pid} =
  GRPC.Server.Supervisor.start_link(
    endpoint: StreamingEndpoint,
    port: 50054,
    start_server: true
  )

IO.puts("Streaming gRPC Server running on :50054")

Client Tests

Unary

{:ok, _} = GRPC.Client.Supervisor.start_link()
{:ok, channel} = GRPC.Stub.connect("localhost:50054")

{:ok, reply} =
  Stream.EchoServer.Stub.say_unary_hello(
    channel,
    %Stream.HelloRequest{name: "Unary Test"}
  )

IO.inspect(reply, label: "Unary reply")

Server Streaming

{:ok, stream} =
  Stream.EchoServer.Stub.say_server_hello(
    channel,
    %Stream.HelloRequest{name: "Server Stream"}
  )

Enum.each(stream, fn msg ->
  IO.inspect(msg, label: "◀ Server message")
end)

Bidirectional Streaming

{:ok, bidi_stream} =
  Stream.EchoServer.Stub.say_bid_stream_hello(channel)

# Send 3 input messages
Enum.each(~w(Alice Bob Carol)a, fn name ->
  GRPC.Stub.send_request(
    bidi_stream,
    %Stream.HelloRequest{name: name}
  )
  Process.sleep(150)
end)

# Close input stream
GRPC.Stub.end_stream(bidi_stream)

# Receive responses
{:ok, ex_stream} = GRPC.Stub.recv(bidi_stream)
Enum.each(ex_stream, fn msg ->
  IO.inspect(msg, label: "↔ Stream reply")
end)

The Stream API supports composable stream transformations via ask, map, run and others functions, enabling clean and declarative stream pipelines. For a complete list of available operators see here.