Session 24: Phoenix.PubSub
Mix.install([])
Introduction
In Session 23, you connected BEAM nodes and made remote calls. But how do you broadcast events - like “Agent-1 just completed a task” - to every interested listener across the cluster?
You could loop through all nodes and send messages manually. Or you could use Phoenix.PubSub, a publish-subscribe system that works automatically across connected nodes. Subscribe to a topic, and you’ll receive messages published from any node in the cluster.
Sources for This Session
This session synthesizes concepts from:
Learning Goals
By the end of this session, you’ll be able to:
- Understand the publish-subscribe pattern
- Subscribe to and publish on topics
- Use PubSub for agent events (started, stopped, task completed)
- Understand how PubSub propagates across connected nodes
- Build an event system for the agent framework
Section 1: What is PubSub?
🤔 Opening Reflection
# Consider: You have 5 processes that need to know when ANY agent
# completes a task. How would you notify them?
notification_approaches = %{
direct_send: """
Maintain a list of listener PIDs.
Loop through and send() to each one.
Problem: Who maintains the list? What if a listener crashes?
""",
genserver_broadcast: """
A GenServer maintains subscribers.
call(:subscribe) to register, cast(:broadcast, msg) to notify.
Problem: Bottleneck - all messages go through one process.
""",
pubsub: """
Processes subscribe to a "topic" (a string).
Any process publishes to that topic.
PubSub handles delivery to all subscribers.
Bonus: Works across nodes automatically!
"""
}
# PubSub separates publishers from subscribers.
# Neither needs to know about the other.
The PubSub Pattern
# Publisher-Subscriber decouples event producers from consumers:
#
# Publisher Subscribers
# ┌──────────┐ ┌──────────┐
# │ Agent-1 │──publish──> │ Logger │
# │ starts │ │ └──────────┘
# └──────────┘ │ ┌──────────┐
# └──topic──>│ Monitor │
# ┌──────────┐ │ └──────────┘
# │ Agent-2 │──publish──> ┌──────────┐
# │ completes│ │ Dashboard│
# └──────────┘ └──────────┘
#
# Key properties:
# 1. Publishers don't know who's listening
# 2. Subscribers don't know who's publishing
# 3. Multiple subscribers per topic
# 4. Multiple publishers per topic
# 5. Topics are just strings
Section 2: Topics and Subscriptions
Topics as Strings
# Topics are just strings. Convention uses colon-separated namespaces:
topics = [
"agent:events", # All agent events
"agent:events:Worker-1", # Events for Worker-1 only
"cluster:events", # Cluster-level events
"task:completed", # All task completions
"node:status" # Node status changes
]
# This is just convention - topics can be any string.
# The namespacing helps organize subscriptions.
Subscribing to Topics
# Phoenix.PubSub.subscribe/2 subscribes the CURRENT PROCESS to a topic.
# The process will receive messages as regular Erlang messages.
# In your agent_api project (which has PubSub configured):
# Phoenix.PubSub.subscribe(AgentApi.PubSub, "agent:events")
# The process now receives messages like:
# {:agent_event, %{event: :agent_started, agent: "Worker-1", ...}}
# In a GenServer, handle these in handle_info:
defmodule EventListenerExample do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, nil)
end
@impl true
def init(_) do
# Subscribe on init
# Phoenix.PubSub.subscribe(AgentApi.PubSub, "agent:events")
{:ok, %{events: []}}
end
@impl true
def handle_info({:agent_event, event}, state) do
IO.puts("Got event: #{inspect(event)}")
{:noreply, %{state | events: [event | state.events]}}
end
end
Section 3: Broadcasting Messages
Phoenix.PubSub.broadcast/3
# Broadcasting sends a message to ALL subscribers of a topic:
# Phoenix.PubSub.broadcast(
# AgentApi.PubSub, # PubSub server name
# "agent:events", # Topic
# {:agent_event, %{...}} # Message (any term)
# )
# The message is delivered to every process subscribed to "agent:events"
# on EVERY connected node.
# Example: Broadcasting an agent start event
broadcast_example = """
Phoenix.PubSub.broadcast(
AgentApi.PubSub,
"agent:events",
{:agent_event, %{
event: :agent_started,
agent: "Worker-1",
node: Node.self(),
timestamp: DateTime.utc_now()
}}
)
"""
🤔 Message Format Design
# Question: Why wrap the event in a tagged tuple like {:agent_event, map}
# instead of just sending the map?
format_reasoning = """
Your answer: ???
"""
# Answer:
# Tagged tuples help subscribers distinguish message types in handle_info:
#
# def handle_info({:agent_event, event}, state) do
# # Handle agent events
# end
#
# def handle_info({:cluster_event, event}, state) do
# # Handle cluster events
# end
#
# def handle_info(other, state) do
# # Unknown messages
# end
#
# Without tags, you'd need to pattern match on map contents,
# which is more fragile and less clear.
Section 4: PubSub Across Nodes
Automatic Distribution
# This is the magic of Phoenix.PubSub:
# When nodes are connected via distributed Erlang,
# PubSub broadcasts automatically propagate to ALL nodes.
cross_node_pubsub = """
Node 1 Node 2
┌──────────────────┐ ┌──────────────────┐
│ AgentApi.PubSub │◄────────►│ AgentApi.PubSub │
│ │ auto │ │
│ Publisher: │ sync │ Subscriber: │
│ broadcast( │ │ receives the │
│ "agent:events", │ message! │
│ {:agent_event, ...} │ │
│ ) │ │ │
└──────────────────┘ └──────────────────┘
No extra code needed! Just connect the nodes, and PubSub
handles cross-node delivery using distributed Erlang.
"""
# How it works under the hood:
# 1. Node 1 broadcasts to "agent:events"
# 2. PubSub delivers to local subscribers on Node 1
# 3. PubSub also sends the message to Node 2's PubSub process
# 4. Node 2's PubSub delivers to its local subscribers
# All transparent to your code!
🤔 Why Not Just Use :rpc.call?
# You could manually broadcast to all nodes:
manual_broadcast = """
for node <- Node.list() do
:rpc.call(node, Phoenix.PubSub, :local_broadcast, [
AgentApi.PubSub,
"agent:events",
{:agent_event, event}
])
end
"""
# But PubSub is better because:
advantages = [
"Automatic - no need to enumerate nodes",
"Handles node joins/leaves gracefully",
"Efficient - uses ETS internally for subscriber lookup",
"Battle-tested - production-ready implementation",
"Decoupled - publishers don't know about subscribers"
]
Section 5: Agent Events System
Designing the Event System
# Our AgentEvents module (in agent_api/lib/agent_api/) wraps PubSub
# with a domain-specific API:
defmodule AgentEventsExample do
@pubsub AgentApi.PubSub
# === Subscriptions ===
def subscribe_agent_events do
Phoenix.PubSub.subscribe(@pubsub, "agent:events")
end
def subscribe_agent(agent_name) do
Phoenix.PubSub.subscribe(@pubsub, "agent:events:#{agent_name}")
end
def subscribe_cluster_events do
Phoenix.PubSub.subscribe(@pubsub, "cluster:events")
end
# === Broadcasting ===
def broadcast_agent_started(agent_name) do
event = %{
event: :agent_started,
agent: agent_name,
node: Node.self(),
timestamp: DateTime.utc_now()
}
broadcast_agent_event(agent_name, event)
end
def broadcast_agent_stopped(agent_name) do
event = %{
event: :agent_stopped,
agent: agent_name,
node: Node.self(),
timestamp: DateTime.utc_now()
}
broadcast_agent_event(agent_name, event)
end
def broadcast_task_completed(agent_name, result \\ %{}) do
event = %{
event: :task_completed,
agent: agent_name,
result: result,
node: Node.self(),
timestamp: DateTime.utc_now()
}
broadcast_agent_event(agent_name, event)
end
# Broadcasts to both the general topic and agent-specific topic
defp broadcast_agent_event(agent_name, event) do
Phoenix.PubSub.broadcast(@pubsub, "agent:events", {:agent_event, event})
Phoenix.PubSub.broadcast(@pubsub, "agent:events:#{agent_name}", {:agent_event, event})
end
end
🤔 Dual-Topic Broadcasting
# Notice we broadcast to TWO topics:
# 1. "agent:events" - for processes watching ALL agents
# 2. "agent:events:Worker-1" - for processes watching ONE agent
# Question: Why not just use "agent:events" for everything?
dual_topic_reasoning = """
Your answer: ???
"""
# Answer:
# Selective subscription reduces noise.
#
# A dashboard monitoring all agents subscribes to "agent:events".
# It gets EVERY event from EVERY agent.
#
# A process that only cares about Worker-1 subscribes to
# "agent:events:Worker-1". It ignores events from other agents.
#
# Without agent-specific topics, every subscriber would need
# to filter events in handle_info, wasting CPU cycles.
Section 6: Event Logger
AgentEventLogger
# A practical subscriber that logs all events:
defmodule AgentEventLoggerExample do
use GenServer
require Logger
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
# Subscribe to both agent and cluster events
# AgentEvents.subscribe_agent_events()
# AgentEvents.subscribe_cluster_events()
{:ok, %{event_count: 0}}
end
@impl true
def handle_info({:agent_event, event}, state) do
log_agent_event(event)
{:noreply, %{state | event_count: state.event_count + 1}}
end
def handle_info({:cluster_event, event}, state) do
log_cluster_event(event)
{:noreply, %{state | event_count: state.event_count + 1}}
end
defp log_agent_event(%{event: :agent_started, agent: agent, node: node}) do
Logger.info("[Events] agent_started: #{agent} on #{node}")
end
defp log_agent_event(%{event: :agent_stopped, agent: agent, node: node}) do
Logger.info("[Events] agent_stopped: #{agent} on #{node}")
end
defp log_agent_event(%{event: :task_completed, agent: agent, node: node}) do
Logger.info("[Events] task_completed: #{agent} on #{node}")
end
defp log_agent_event(%{event: type, agent: agent}) do
Logger.info("[Events] #{type}: #{agent}")
end
defp log_cluster_event(%{event: :node_up, node: node}) do
Logger.info("[Events] cluster: node_up #{node}")
end
defp log_cluster_event(%{event: :node_down, node: node}) do
Logger.warning("[Events] cluster: node_down #{node}")
end
end
🤔 Event Logger as a Pattern
# The EventLogger follows a common pattern:
# GenServer that subscribes to PubSub topics in init/1
pattern = """
1. init/1 → Subscribe to topics
2. handle_info({:agent_event, event}, state) → Process events
3. handle_info({:cluster_event, event}, state) → Process events
This pattern can be reused for:
- Metrics collection (count events, measure latency)
- Alerting (notify when agents fail)
- Audit logging (persist events to database)
- Dashboard updates (push to LiveView)
All without changing the publishing code!
That's the power of decoupled PubSub.
"""
Section 7: Integration with TaskManager
Events in the Task Lifecycle
# Our updated TaskManager broadcasts events at key points:
task_lifecycle_events = """
TaskManager.start_agent("Worker-1")
→ AgentEvents.broadcast_agent_started("Worker-1")
TaskManager.send_message("Worker-1", :search, %{query: "OTP"})
→ AgentEvents.broadcast_task_received("Worker-1", task_info)
TaskManager.process_next("Worker-1")
→ AgentEvents.broadcast_task_completed("Worker-1", result)
"""
# This means ANY subscriber gets notified automatically:
# - EventLogger logs it
# - Dashboard updates in real-time
# - Metrics collector counts it
# - Cross-node listeners receive it too!
Section 8: Interactive Exercises
Exercise 1: Build a Simple PubSub Subscriber
# This exercise works within a single IEx session using
# Elixir's built-in Registry as a simple pub/sub stand-in.
# Step 1: Start a Registry for pub/sub
{:ok, _} = Registry.start_link(keys: :duplicate, name: :simple_pubsub)
# Step 2: Subscribe the current process
{:ok, _} = Registry.register(:simple_pubsub, "events", [])
# Step 3: Publish from a spawned process
spawn(fn ->
Registry.dispatch(:simple_pubsub, "events", fn entries ->
for {pid, _} <- entries do
send(pid, {:event, "Hello from spawned process!"})
end
end)
end)
# Step 4: Receive the message
receive do
{:event, msg} -> IO.puts("Got: #{msg}")
after
1000 -> IO.puts("No message received")
end
Exercise 2: Event Counter
# Build a GenServer that counts events by type
defmodule EventCounter do
use GenServer
def start_link(_), do: GenServer.start_link(__MODULE__, nil, name: __MODULE__)
def get_counts, do: GenServer.call(__MODULE__, :get_counts)
def receive_event(event), do: GenServer.cast(__MODULE__, {:event, event})
@impl true
def init(_), do: {:ok, %{}}
@impl true
def handle_call(:get_counts, _from, counts) do
{:reply, counts, counts}
end
@impl true
def handle_cast({:event, %{event: type}}, counts) do
new_counts = Map.update(counts, type, 1, &(&1 + 1))
{:noreply, new_counts}
end
end
# Test it:
{:ok, _} = EventCounter.start_link(nil)
EventCounter.receive_event(%{event: :agent_started, agent: "W-1"})
EventCounter.receive_event(%{event: :task_completed, agent: "W-1"})
EventCounter.receive_event(%{event: :agent_started, agent: "W-2"})
EventCounter.receive_event(%{event: :task_completed, agent: "W-1"})
EventCounter.get_counts()
# => %{agent_started: 2, task_completed: 2}
# Clean up
GenServer.stop(EventCounter)
Exercise 3: Filtered Subscriber
# Build a subscriber that only cares about specific event types
defmodule FilteredSubscriber do
use GenServer
def start_link(filter_types) do
GenServer.start_link(__MODULE__, filter_types)
end
def get_events(pid), do: GenServer.call(pid, :get_events)
@impl true
def init(filter_types) do
{:ok, %{filter: MapSet.new(filter_types), events: []}}
end
@impl true
def handle_call(:get_events, _from, state) do
{:reply, Enum.reverse(state.events), state}
end
@impl true
def handle_info({:agent_event, %{event: type} = event}, state) do
if MapSet.member?(state.filter, type) do
{:noreply, %{state | events: [event | state.events]}}
else
{:noreply, state}
end
end
def handle_info(_, state), do: {:noreply, state}
end
# Only listen for task completions:
{:ok, sub} = FilteredSubscriber.start_link([:task_completed])
# Simulate events
send(sub, {:agent_event, %{event: :agent_started, agent: "W-1"}})
send(sub, {:agent_event, %{event: :task_completed, agent: "W-1", result: "done"}})
send(sub, {:agent_event, %{event: :agent_stopped, agent: "W-1"}})
# Only the task_completed event was captured:
FilteredSubscriber.get_events(sub)
# => [%{event: :task_completed, agent: "W-1", result: "done"}]
GenServer.stop(sub)
Key Takeaways
-
PubSub decouples publishers from subscribers - Neither knows about the other
-
Topics are just strings - Use namespaces like
"agent:events:Worker-1" -
PubSub.broadcast/3 sends to all subscribers - Local and remote nodes
-
Cross-node is automatic - Connected nodes share PubSub messages with zero configuration
-
AgentEvents wraps PubSub - Domain-specific API for agent lifecycle events
-
EventLogger is a subscriber pattern - GenServer that subscribes in init/1
What’s Next?
In the next session, we’ll build the full Distributed Agents system:
- Automatic node discovery with libcluster
- AgentRouter for cross-node agent lookup
- Route A2A requests to the correct node
- Handle node failures gracefully
PubSub will broadcast events across your cluster automatically!