Powered by AppSignal & Oban Pro

Session 24: Phoenix.PubSub

notebooks/24_phoenix_pubsub.livemd

Session 24: Phoenix.PubSub

Mix.install([])

Introduction

In Session 23, you connected BEAM nodes and made remote calls. But how do you broadcast events - like “Agent-1 just completed a task” - to every interested listener across the cluster?

You could loop through all nodes and send messages manually. Or you could use Phoenix.PubSub, a publish-subscribe system that works automatically across connected nodes. Subscribe to a topic, and you’ll receive messages published from any node in the cluster.

Sources for This Session

This session synthesizes concepts from:

Learning Goals

By the end of this session, you’ll be able to:

  • Understand the publish-subscribe pattern
  • Subscribe to and publish on topics
  • Use PubSub for agent events (started, stopped, task completed)
  • Understand how PubSub propagates across connected nodes
  • Build an event system for the agent framework

Section 1: What is PubSub?

🤔 Opening Reflection

# Consider: You have 5 processes that need to know when ANY agent
# completes a task. How would you notify them?

notification_approaches = %{
  direct_send: """
    Maintain a list of listener PIDs.
    Loop through and send() to each one.
    Problem: Who maintains the list? What if a listener crashes?
  """,

  genserver_broadcast: """
    A GenServer maintains subscribers.
    call(:subscribe) to register, cast(:broadcast, msg) to notify.
    Problem: Bottleneck - all messages go through one process.
  """,

  pubsub: """
    Processes subscribe to a "topic" (a string).
    Any process publishes to that topic.
    PubSub handles delivery to all subscribers.
    Bonus: Works across nodes automatically!
  """
}

# PubSub separates publishers from subscribers.
# Neither needs to know about the other.

The PubSub Pattern

# Publisher-Subscriber decouples event producers from consumers:
#
#   Publisher                    Subscribers
#   ┌──────────┐               ┌──────────┐
#   │ Agent-1  │──publish──>   │ Logger   │
#   │ starts   │    │          └──────────┘
#   └──────────┘    │          ┌──────────┐
#                   └──topic──>│ Monitor  │
#   ┌──────────┐    │          └──────────┘
#   │ Agent-2  │──publish──>   ┌──────────┐
#   │ completes│               │ Dashboard│
#   └──────────┘               └──────────┘
#
# Key properties:
# 1. Publishers don't know who's listening
# 2. Subscribers don't know who's publishing
# 3. Multiple subscribers per topic
# 4. Multiple publishers per topic
# 5. Topics are just strings

Section 2: Topics and Subscriptions

Topics as Strings

# Topics are just strings. Convention uses colon-separated namespaces:

topics = [
  "agent:events",           # All agent events
  "agent:events:Worker-1",  # Events for Worker-1 only
  "cluster:events",         # Cluster-level events
  "task:completed",         # All task completions
  "node:status"             # Node status changes
]

# This is just convention - topics can be any string.
# The namespacing helps organize subscriptions.

Subscribing to Topics

# Phoenix.PubSub.subscribe/2 subscribes the CURRENT PROCESS to a topic.
# The process will receive messages as regular Erlang messages.

# In your agent_api project (which has PubSub configured):
# Phoenix.PubSub.subscribe(AgentApi.PubSub, "agent:events")

# The process now receives messages like:
# {:agent_event, %{event: :agent_started, agent: "Worker-1", ...}}

# In a GenServer, handle these in handle_info:
defmodule EventListenerExample do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, nil)
  end

  @impl true
  def init(_) do
    # Subscribe on init
    # Phoenix.PubSub.subscribe(AgentApi.PubSub, "agent:events")
    {:ok, %{events: []}}
  end

  @impl true
  def handle_info({:agent_event, event}, state) do
    IO.puts("Got event: #{inspect(event)}")
    {:noreply, %{state | events: [event | state.events]}}
  end
end

Section 3: Broadcasting Messages

Phoenix.PubSub.broadcast/3

# Broadcasting sends a message to ALL subscribers of a topic:

# Phoenix.PubSub.broadcast(
#   AgentApi.PubSub,        # PubSub server name
#   "agent:events",          # Topic
#   {:agent_event, %{...}}   # Message (any term)
# )

# The message is delivered to every process subscribed to "agent:events"
# on EVERY connected node.

# Example: Broadcasting an agent start event
broadcast_example = """
Phoenix.PubSub.broadcast(
  AgentApi.PubSub,
  "agent:events",
  {:agent_event, %{
    event: :agent_started,
    agent: "Worker-1",
    node: Node.self(),
    timestamp: DateTime.utc_now()
  }}
)
"""

🤔 Message Format Design

# Question: Why wrap the event in a tagged tuple like {:agent_event, map}
# instead of just sending the map?

format_reasoning = """
Your answer: ???
"""

# Answer:
# Tagged tuples help subscribers distinguish message types in handle_info:
#
#   def handle_info({:agent_event, event}, state) do
#     # Handle agent events
#   end
#
#   def handle_info({:cluster_event, event}, state) do
#     # Handle cluster events
#   end
#
#   def handle_info(other, state) do
#     # Unknown messages
#   end
#
# Without tags, you'd need to pattern match on map contents,
# which is more fragile and less clear.

Section 4: PubSub Across Nodes

Automatic Distribution

# This is the magic of Phoenix.PubSub:
# When nodes are connected via distributed Erlang,
# PubSub broadcasts automatically propagate to ALL nodes.

cross_node_pubsub = """
  Node 1                        Node 2
  ┌──────────────────┐          ┌──────────────────┐
  │ AgentApi.PubSub  │◄────────►│ AgentApi.PubSub  │
  │                  │  auto    │                  │
  │ Publisher:       │  sync    │ Subscriber:      │
  │   broadcast(     │          │   receives the   │
  │     "agent:events",         │   message!       │
  │     {:agent_event, ...}     │                  │
  │   )              │          │                  │
  └──────────────────┘          └──────────────────┘

No extra code needed! Just connect the nodes, and PubSub
handles cross-node delivery using distributed Erlang.
"""

# How it works under the hood:
# 1. Node 1 broadcasts to "agent:events"
# 2. PubSub delivers to local subscribers on Node 1
# 3. PubSub also sends the message to Node 2's PubSub process
# 4. Node 2's PubSub delivers to its local subscribers
# All transparent to your code!

🤔 Why Not Just Use :rpc.call?

# You could manually broadcast to all nodes:
manual_broadcast = """
for node <- Node.list() do
  :rpc.call(node, Phoenix.PubSub, :local_broadcast, [
    AgentApi.PubSub,
    "agent:events",
    {:agent_event, event}
  ])
end
"""

# But PubSub is better because:
advantages = [
  "Automatic - no need to enumerate nodes",
  "Handles node joins/leaves gracefully",
  "Efficient - uses ETS internally for subscriber lookup",
  "Battle-tested - production-ready implementation",
  "Decoupled - publishers don't know about subscribers"
]

Section 5: Agent Events System

Designing the Event System

# Our AgentEvents module (in agent_api/lib/agent_api/) wraps PubSub
# with a domain-specific API:

defmodule AgentEventsExample do
  @pubsub AgentApi.PubSub

  # === Subscriptions ===

  def subscribe_agent_events do
    Phoenix.PubSub.subscribe(@pubsub, "agent:events")
  end

  def subscribe_agent(agent_name) do
    Phoenix.PubSub.subscribe(@pubsub, "agent:events:#{agent_name}")
  end

  def subscribe_cluster_events do
    Phoenix.PubSub.subscribe(@pubsub, "cluster:events")
  end

  # === Broadcasting ===

  def broadcast_agent_started(agent_name) do
    event = %{
      event: :agent_started,
      agent: agent_name,
      node: Node.self(),
      timestamp: DateTime.utc_now()
    }
    broadcast_agent_event(agent_name, event)
  end

  def broadcast_agent_stopped(agent_name) do
    event = %{
      event: :agent_stopped,
      agent: agent_name,
      node: Node.self(),
      timestamp: DateTime.utc_now()
    }
    broadcast_agent_event(agent_name, event)
  end

  def broadcast_task_completed(agent_name, result \\ %{}) do
    event = %{
      event: :task_completed,
      agent: agent_name,
      result: result,
      node: Node.self(),
      timestamp: DateTime.utc_now()
    }
    broadcast_agent_event(agent_name, event)
  end

  # Broadcasts to both the general topic and agent-specific topic
  defp broadcast_agent_event(agent_name, event) do
    Phoenix.PubSub.broadcast(@pubsub, "agent:events", {:agent_event, event})
    Phoenix.PubSub.broadcast(@pubsub, "agent:events:#{agent_name}", {:agent_event, event})
  end
end

🤔 Dual-Topic Broadcasting

# Notice we broadcast to TWO topics:
# 1. "agent:events" - for processes watching ALL agents
# 2. "agent:events:Worker-1" - for processes watching ONE agent

# Question: Why not just use "agent:events" for everything?

dual_topic_reasoning = """
Your answer: ???
"""

# Answer:
# Selective subscription reduces noise.
#
# A dashboard monitoring all agents subscribes to "agent:events".
# It gets EVERY event from EVERY agent.
#
# A process that only cares about Worker-1 subscribes to
# "agent:events:Worker-1". It ignores events from other agents.
#
# Without agent-specific topics, every subscriber would need
# to filter events in handle_info, wasting CPU cycles.

Section 6: Event Logger

AgentEventLogger

# A practical subscriber that logs all events:

defmodule AgentEventLoggerExample do
  use GenServer

  require Logger

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(_opts) do
    # Subscribe to both agent and cluster events
    # AgentEvents.subscribe_agent_events()
    # AgentEvents.subscribe_cluster_events()
    {:ok, %{event_count: 0}}
  end

  @impl true
  def handle_info({:agent_event, event}, state) do
    log_agent_event(event)
    {:noreply, %{state | event_count: state.event_count + 1}}
  end

  def handle_info({:cluster_event, event}, state) do
    log_cluster_event(event)
    {:noreply, %{state | event_count: state.event_count + 1}}
  end

  defp log_agent_event(%{event: :agent_started, agent: agent, node: node}) do
    Logger.info("[Events] agent_started: #{agent} on #{node}")
  end

  defp log_agent_event(%{event: :agent_stopped, agent: agent, node: node}) do
    Logger.info("[Events] agent_stopped: #{agent} on #{node}")
  end

  defp log_agent_event(%{event: :task_completed, agent: agent, node: node}) do
    Logger.info("[Events] task_completed: #{agent} on #{node}")
  end

  defp log_agent_event(%{event: type, agent: agent}) do
    Logger.info("[Events] #{type}: #{agent}")
  end

  defp log_cluster_event(%{event: :node_up, node: node}) do
    Logger.info("[Events] cluster: node_up #{node}")
  end

  defp log_cluster_event(%{event: :node_down, node: node}) do
    Logger.warning("[Events] cluster: node_down #{node}")
  end
end

🤔 Event Logger as a Pattern

# The EventLogger follows a common pattern:
# GenServer that subscribes to PubSub topics in init/1

pattern = """
1. init/1 → Subscribe to topics
2. handle_info({:agent_event, event}, state) → Process events
3. handle_info({:cluster_event, event}, state) → Process events

This pattern can be reused for:
- Metrics collection (count events, measure latency)
- Alerting (notify when agents fail)
- Audit logging (persist events to database)
- Dashboard updates (push to LiveView)

All without changing the publishing code!
That's the power of decoupled PubSub.
"""

Section 7: Integration with TaskManager

Events in the Task Lifecycle

# Our updated TaskManager broadcasts events at key points:

task_lifecycle_events = """
TaskManager.start_agent("Worker-1")
  → AgentEvents.broadcast_agent_started("Worker-1")

TaskManager.send_message("Worker-1", :search, %{query: "OTP"})
  → AgentEvents.broadcast_task_received("Worker-1", task_info)

TaskManager.process_next("Worker-1")
  → AgentEvents.broadcast_task_completed("Worker-1", result)
"""

# This means ANY subscriber gets notified automatically:
# - EventLogger logs it
# - Dashboard updates in real-time
# - Metrics collector counts it
# - Cross-node listeners receive it too!

Section 8: Interactive Exercises

Exercise 1: Build a Simple PubSub Subscriber

# This exercise works within a single IEx session using
# Elixir's built-in Registry as a simple pub/sub stand-in.

# Step 1: Start a Registry for pub/sub
{:ok, _} = Registry.start_link(keys: :duplicate, name: :simple_pubsub)

# Step 2: Subscribe the current process
{:ok, _} = Registry.register(:simple_pubsub, "events", [])

# Step 3: Publish from a spawned process
spawn(fn ->
  Registry.dispatch(:simple_pubsub, "events", fn entries ->
    for {pid, _} <- entries do
      send(pid, {:event, "Hello from spawned process!"})
    end
  end)
end)

# Step 4: Receive the message
receive do
  {:event, msg} -> IO.puts("Got: #{msg}")
after
  1000 -> IO.puts("No message received")
end

Exercise 2: Event Counter

# Build a GenServer that counts events by type

defmodule EventCounter do
  use GenServer

  def start_link(_), do: GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  def get_counts, do: GenServer.call(__MODULE__, :get_counts)
  def receive_event(event), do: GenServer.cast(__MODULE__, {:event, event})

  @impl true
  def init(_), do: {:ok, %{}}

  @impl true
  def handle_call(:get_counts, _from, counts) do
    {:reply, counts, counts}
  end

  @impl true
  def handle_cast({:event, %{event: type}}, counts) do
    new_counts = Map.update(counts, type, 1, &amp;(&amp;1 + 1))
    {:noreply, new_counts}
  end
end

# Test it:
{:ok, _} = EventCounter.start_link(nil)

EventCounter.receive_event(%{event: :agent_started, agent: "W-1"})
EventCounter.receive_event(%{event: :task_completed, agent: "W-1"})
EventCounter.receive_event(%{event: :agent_started, agent: "W-2"})
EventCounter.receive_event(%{event: :task_completed, agent: "W-1"})

EventCounter.get_counts()
# => %{agent_started: 2, task_completed: 2}

# Clean up
GenServer.stop(EventCounter)

Exercise 3: Filtered Subscriber

# Build a subscriber that only cares about specific event types

defmodule FilteredSubscriber do
  use GenServer

  def start_link(filter_types) do
    GenServer.start_link(__MODULE__, filter_types)
  end

  def get_events(pid), do: GenServer.call(pid, :get_events)

  @impl true
  def init(filter_types) do
    {:ok, %{filter: MapSet.new(filter_types), events: []}}
  end

  @impl true
  def handle_call(:get_events, _from, state) do
    {:reply, Enum.reverse(state.events), state}
  end

  @impl true
  def handle_info({:agent_event, %{event: type} = event}, state) do
    if MapSet.member?(state.filter, type) do
      {:noreply, %{state | events: [event | state.events]}}
    else
      {:noreply, state}
    end
  end

  def handle_info(_, state), do: {:noreply, state}
end

# Only listen for task completions:
{:ok, sub} = FilteredSubscriber.start_link([:task_completed])

# Simulate events
send(sub, {:agent_event, %{event: :agent_started, agent: "W-1"}})
send(sub, {:agent_event, %{event: :task_completed, agent: "W-1", result: "done"}})
send(sub, {:agent_event, %{event: :agent_stopped, agent: "W-1"}})

# Only the task_completed event was captured:
FilteredSubscriber.get_events(sub)
# => [%{event: :task_completed, agent: "W-1", result: "done"}]

GenServer.stop(sub)

Key Takeaways

  1. PubSub decouples publishers from subscribers - Neither knows about the other

  2. Topics are just strings - Use namespaces like "agent:events:Worker-1"

  3. PubSub.broadcast/3 sends to all subscribers - Local and remote nodes

  4. Cross-node is automatic - Connected nodes share PubSub messages with zero configuration

  5. AgentEvents wraps PubSub - Domain-specific API for agent lifecycle events

  6. EventLogger is a subscriber pattern - GenServer that subscribes in init/1


What’s Next?

In the next session, we’ll build the full Distributed Agents system:

  • Automatic node discovery with libcluster
  • AgentRouter for cross-node agent lookup
  • Route A2A requests to the correct node
  • Handle node failures gracefully

PubSub will broadcast events across your cluster automatically!


Navigation

Previous: Session 23 - Distributed Erlang in Practice

Next: Session 25 - Distributed Agents