Streaming
gRPC streaming in Elixir introduces a fully composable way to process data as it flows between client and server. Instead of treating each request as an isolated transaction, streaming allows messages to be transformed incrementally, combined with other streams and enriched through side-effects while still in transit.
The objective here is to demonstrate how GRPC.Stream enables functional composition
applied over live data, allowing pipelines to evolve, react and continue processing
even under constant input.
By exploring unary, server streaming and bidirectional streaming examples, this document highlights how state, concurrency and data transformation can interact seamlessly in an event-driven communication model.
Setup
app_root = Path.join(__DIR__, "..")
Mix.install(
[
{:grpc, path: app_root, env: :dev}
],
config_path: Path.join(app_root, "config/config.exs"),
lockfile: Path.join(app_root, "mix.lock")
)
Proto Messages & Service
For simplicity, all proto definitions are inline.
defmodule Stream.HelloRequest do
use Protobuf, syntax: :proto3
field :name, 1, type: :string
end
defmodule Stream.HelloReply do
use Protobuf, syntax: :proto3
field :message, 1, type: :string
end
defmodule Stream.EchoServer.Service do
use GRPC.Service, name: "stream.EchoServer"
rpc :SayUnaryHello, Stream.HelloRequest, Stream.HelloReply
rpc :SayServerHello, Stream.HelloRequest, stream(Stream.HelloReply)
rpc :SayBidStreamHello, stream(Stream.HelloRequest), stream(Stream.HelloReply)
end
defmodule Stream.EchoServer.Stub do
use GRPC.Stub, service: Stream.EchoServer.Service
end
Transformer (Helper Process)
Used for unary example composition.
defmodule Transformer do
use GenServer
alias Stream.HelloRequest
alias Stream.HelloReply
def start_link(_) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def init(_), do: {:ok, %{}}
def handle_info({:request, %HelloRequest{} = value, from}, state) do
Process.send(from, {:response, %HelloReply{message: "Hello #{value.name}"}}, [])
{:noreply, state}
end
end
{:ok, _} = Transformer.start_link(nil)
Server Implementation
defmodule EchoStreamServer do
use GRPC.Server, service: Stream.EchoServer.Service
alias GRPC.Stream, as: GRPCStream
alias Stream.HelloRequest
alias Stream.HelloReply
# Unary example
def say_unary_hello(%HelloRequest{name: name}, _stream) do
GRPCStream.unary(request)
|> GRPCStream.ask(Transformer)
|> GRPCStream.map(fn
%HelloReply{} = reply ->
%HelloReply{message: "[Reply] #{reply.message}"}
{:error, reason} ->
{:error, GRPC.RPCError.exception(message: "[Error] #{inspect(reason)}")}
end)
|> GRPCStream.run()
end
# Server‑Side streaming
def say_server_hello(%HelloRequest{name: name} = _req, stream) do
Stream.repeatedly(fn ->
%HelloReply{message: "Hello from server → #{name}"}
end)
|> Stream.take(5)
|> GRPCStream.from()
|> GRPCStream.run_with(stream)
end
# Bidirectional streaming
def say_bid_stream_hello(request_stream, stream) do
GRPCStream.from(request_stream, join_with: output_join_stream())
|> GRPCStream.map(fn
%HelloRequest{name: name} ->
%HelloReply{message: "Welcome #{name}!"}
msg ->
msg
end)
|> GRPCStream.run_with(stream)
end
defp output_join_stream() do
Stream.repeatedly(fn ->
%Stream.HelloReply{message: "↔ Server heartbeat"}
end)
end
end
Endpoint + Supervisor
defmodule StreamingEndpoint do
use GRPC.Endpoint
intercept(GRPC.Server.Interceptors.Logger)
run(EchoStreamServer)
end
{:ok, _pid} =
GRPC.Server.Supervisor.start_link(
endpoint: StreamingEndpoint,
port: 50054,
start_server: true
)
IO.puts("Streaming gRPC Server running on :50054")
Client Tests
Unary
{:ok, _} = GRPC.Client.Supervisor.start_link()
{:ok, channel} = GRPC.Stub.connect("localhost:50054")
{:ok, reply} =
Stream.EchoServer.Stub.say_unary_hello(
channel,
%Stream.HelloRequest{name: "Unary Test"}
)
IO.inspect(reply, label: "Unary reply")
Server Streaming
{:ok, stream} =
Stream.EchoServer.Stub.say_server_hello(
channel,
%Stream.HelloRequest{name: "Server Stream"}
)
Enum.each(stream, fn msg ->
IO.inspect(msg, label: "◀ Server message")
end)
Bidirectional Streaming
{:ok, bidi_stream} =
Stream.EchoServer.Stub.say_bid_stream_hello(channel)
# Send 3 input messages
Enum.each(~w(Alice Bob Carol)a, fn name ->
GRPC.Stub.send_request(
bidi_stream,
%Stream.HelloRequest{name: name}
)
Process.sleep(150)
end)
# Close input stream
GRPC.Stub.end_stream(bidi_stream)
# Receive responses
{:ok, ex_stream} = GRPC.Stub.recv(bidi_stream)
Enum.each(ex_stream, fn msg ->
IO.inspect(msg, label: "↔ Stream reply")
end)
The Stream API supports composable stream transformations via ask, map, run and others functions, enabling clean and declarative stream pipelines. For a complete list of available operators see here.