Session 25: Distributed Agents
Mix.install([])
Introduction
You’ve built the pieces: ETS-backed AgentDirectory (Session 22), distributed Erlang hands-on (Session 23), and PubSub events (Session 24). Now it’s time to put them all together into a distributed multi-agent system.
This session tackles the hardest problem: finding and communicating with agents that could be on any node in the cluster. You’ll use libcluster for automatic node discovery, build an AgentRouter for cross-node agent lookup, and handle node failures gracefully.
Sources for This Session
This session synthesizes concepts from:
Learning Goals
By the end of this session, you’ll be able to:
- Use libcluster for automatic node discovery
- Understand cluster topologies (Gossip, Epmd, Kubernetes)
- Build an AgentRouter for cross-node agent lookup
- Update TaskManager for distributed routing
- Add cluster-aware A2A methods
- Handle node failures and agent recovery
Section 1: The Problem - Finding Agents Across Nodes
🤔 Opening Reflection
# You have a 3-node cluster:
# Node 1: Worker-1, Worker-2
# Node 2: Worker-3, Analyzer-1
# Node 3: Worker-4
# An HTTP request arrives at Node 1:
# POST /a2a {"method": "SendMessage", "params": {"agent": "Analyzer-1"}}
# Problem: Analyzer-1 is on Node 2, not Node 1!
problem = """
Current TaskManager uses AgentSupervisor.whereis("Analyzer-1")
which only checks the LOCAL Registry on Node 1.
Result: {:error, :agent_not_found}
But the agent EXISTS - just on a different node.
How do we find it?
"""
# Approaches:
approaches = %{
brute_force: """
Loop through all nodes, :rpc.call each one to check.
Slow: O(n) network calls per lookup.
""",
cached_directory: """
Use AgentDirectory (ETS) to cache name→{node, pid}.
Fast: O(1) ETS lookup.
Challenge: Keep directory in sync when agents start/stop.
""",
hybrid: """
1. Check AgentDirectory first (fast, cached)
2. Fall back to local Registry
3. Fall back to querying remote nodes
Best of both worlds.
"""
}
# We'll implement the hybrid approach in AgentRouter.
Section 2: libcluster for Automatic Node Discovery
What is libcluster?
# Without libcluster, you manually connect nodes:
# Node.connect(:node2@hostname)
# With libcluster, nodes discover and connect AUTOMATICALLY.
libcluster_overview = """
libcluster provides automatic node discovery and connection.
You configure a "topology" (discovery strategy), and libcluster
handles finding and connecting to other nodes.
Think of it as:
Manual: You call each friend to invite them to the party.
libcluster: You post the party address, friends find it themselves.
"""
Configuration
# In config/dev.exs:
libcluster_config = """
config :libcluster,
topologies: [
agent_cluster: [
strategy: Cluster.Strategy.Gossip,
config: [
port: 45892,
if_addr: "0.0.0.0",
multicast_if: "127.0.0.1",
multicast_addr: "230.1.1.251",
multicast_ttl: 1
]
]
]
"""
# The Gossip strategy uses UDP multicast to discover nodes
# on the local network. Perfect for development!
Adding to Supervision Tree
# In application.ex, libcluster runs as a supervisor:
application_setup = """
def start(_type, _args) do
topologies = Application.get_env(:libcluster, :topologies, [])
children = [
{Phoenix.PubSub, name: AgentApi.PubSub},
{Cluster.Supervisor, [topologies, [name: AgentApi.ClusterSupervisor]]},
AgentApi.Cluster.ClusterMonitor,
AgentApi.Cluster.AgentEventLogger,
AgentApiWeb.Endpoint
]
opts = [strategy: :one_for_one, name: AgentApi.Supervisor]
Supervisor.start_link(children, opts)
end
"""
# Order matters:
# 1. PubSub first (other services need it)
# 2. Cluster.Supervisor (connects nodes)
# 3. ClusterMonitor (tracks connections)
# 4. AgentEventLogger (logs events)
# 5. Endpoint (serves HTTP)
Section 3: Cluster Topologies
🤔 Choosing a Topology
# libcluster supports multiple discovery strategies:
topologies = %{
gossip: %{
description: "UDP multicast - nodes announce themselves on the network",
best_for: "Development, local network",
config: "Multicast address and port",
pros: ["Zero configuration", "Automatic discovery"],
cons: ["Only works on local network", "Not for production clouds"]
},
epmd: %{
description: "Static list of node names",
best_for: "Known, fixed infrastructure",
config: "List of node names",
pros: ["Simple", "Predictable"],
cons: ["Must know nodes in advance", "No dynamic scaling"]
},
kubernetes: %{
description: "Uses Kubernetes API to discover pods",
best_for: "Kubernetes deployments",
config: "Kubernetes namespace and labels",
pros: ["Cloud-native", "Dynamic scaling"],
cons: ["Kubernetes only"]
},
dns: %{
description: "DNS-based discovery (DNS SRV records)",
best_for: "Docker Compose, service mesh",
config: "DNS name to query",
pros: ["Works with any orchestrator", "Standard protocol"],
cons: ["Requires DNS infrastructure"]
}
}
# For development, we use Gossip.
# For production, you'd choose based on your deployment platform.
Section 4: Building AgentRouter
The Lookup Strategy
# AgentRouter implements a tiered lookup strategy:
lookup_strategy = """
find_agent("Worker-1"):
Step 1: Check AgentDirectory (ETS)
Fast O(1) lookup.
If found AND process is alive → return {node, pid}
If found BUT process is dead → remove stale entry, continue
Step 2: Check local Registry
AgentSupervisor.whereis("Worker-1")
If found → register in directory, return {node, pid}
Step 3: Search local agents by name
Loop through AgentSupervisor.list_agents()
If found → register in directory, return {node, pid}
Step 4: Query remote nodes via RPC
For each connected node:
:rpc.call(node, AgentSupervisor, :whereis, ["Worker-1"])
If found → register in directory, return {node, pid}
Step 5: Not found → return :error
"""
The Implementation
# Here's the core of AgentRouter:
defmodule AgentRouterExample do
alias AgentFramework.{AgentDirectory, AgentServer, AgentSupervisor}
def find_agent(agent_name) do
# Step 1: Check directory (fast path)
case AgentDirectory.lookup(agent_name) do
{:ok, {node, pid}} ->
if process_reachable?(node, pid) do
{:ok, {node, pid}}
else
AgentDirectory.unregister(agent_name)
find_agent_fallback(agent_name)
end
:error ->
find_agent_fallback(agent_name)
end
end
defp find_agent_fallback(agent_name) do
# Step 2: Check local registry
case AgentSupervisor.whereis(agent_name) do
{:ok, pid} ->
AgentDirectory.register(agent_name, Node.self(), pid)
{:ok, {Node.self(), pid}}
:error ->
# Step 3: Query remote nodes
find_agent_remote(agent_name)
end
end
defp find_agent_remote(agent_name) do
Node.list()
|> Enum.find_value(:error, fn node ->
case :rpc.call(node, AgentFramework.AgentSupervisor, :whereis, [agent_name]) do
{:ok, pid} ->
AgentDirectory.register(agent_name, node, pid)
{:ok, {node, pid}}
_ ->
nil # Continue to next node
end
end)
end
defp process_reachable?(node, pid) do
if node == Node.self() do
Process.alive?(pid)
else
true # Trust directory for remote processes
end
end
end
🤔 Why Cache in AgentDirectory?
# Question: After finding an agent via RPC, why cache it in AgentDirectory?
caching_rationale = """
Without caching:
Every lookup → RPC call to all nodes → O(n) network calls
100 requests/sec × 3 nodes = 300 RPC calls/sec
With caching:
First lookup → RPC call → cache result
Subsequent lookups → ETS read → O(1), no network
100 requests/sec → 1 RPC + 99 ETS reads
The tradeoff:
+ Much faster for repeated lookups
- Cache can become stale (agent moved, node crashed)
Staleness is handled by:
1. Process.alive? check for local agents
2. ClusterMonitor removes entries when nodes disconnect
3. Re-lookup on RPC failure (transparent retry)
"""
Section 5: Updating TaskManager for Distributed Routing
Before and After
# BEFORE (Phase 4 - local only):
before_code = """
def send_message(agent_name, action, params) do
case find_agent(agent_name) do
{:ok, pid} ->
AgentServer.send_task(pid, action, params)
{:ok, %{status: :created, agent: agent_name}}
:error ->
{:error, :agent_not_found}
end
end
defp find_agent(name) do
case AgentSupervisor.whereis(name) do
{:ok, pid} -> {:ok, pid}
:error -> search_agents(name) # local only
end
end
"""
# AFTER (Phase 5 - distributed):
after_code = """
def send_message(agent_name, action, params) do
case AgentRouter.find_agent(agent_name) do
{:ok, {node, pid}} ->
if node == Node.self() do
AgentServer.send_task(pid, action, params)
else
:rpc.call(node, AgentServer, :send_task, [pid, action, params])
end
AgentEvents.broadcast_task_received(agent_name, %{action: action})
{:ok, %{status: :created, agent: agent_name, node: node}}
:error ->
{:error, :agent_not_found}
end
end
"""
# Key changes:
# 1. find_agent → AgentRouter.find_agent (cross-node)
# 2. Returns {node, pid} instead of just pid
# 3. Local vs remote dispatch based on node
# 4. Events broadcast via PubSub
# 5. Node info included in response
Section 6: Cluster-Aware A2A Methods
GetClusterInfo
# New A2A method for cluster status:
get_cluster_info = """
Request:
POST /a2a
{"jsonrpc": "2.0", "method": "GetClusterInfo", "params": {}, "id": 1}
Response:
{
"jsonrpc": "2.0",
"result": {
"self": "node1@hostname",
"connected": ["node2@hostname"],
"total_nodes": 2
},
"id": 1
}
"""
Enhanced ListAgents
# ListAgents now includes node information:
enhanced_list_agents = """
Request:
POST /a2a
{"jsonrpc": "2.0", "method": "ListAgents", "params": {}, "id": 1}
Response:
{
"jsonrpc": "2.0",
"result": {
"agents": ["Worker-1", "Worker-2"],
"agents_detail": [
{"name": "Worker-1", "node": "node1@hostname"},
{"name": "Worker-2", "node": "node2@hostname"}
],
"count": 2
},
"id": 1
}
"""
# Now you can see which node each agent lives on!
Section 7: Node Failure and Agent Recovery
🤔 What Happens When a Node Dies?
# Scenario: Node 2 crashes while running Analyzer-1
failure_sequence = """
1. Node 2 crashes
2. :net_kernel detects loss of connection
3. ClusterMonitor receives {:nodedown, :node2@host}
4. ClusterMonitor calls AgentDirectory.remove_node_agents(:node2@host)
5. PubSub broadcasts {:cluster_event, %{event: :node_down, ...}}
6. AgentEventLogger logs the event
Result:
- AgentDirectory no longer has stale entries for node2
- Subsequent lookups for "Analyzer-1" return :error
- HTTP requests for "Analyzer-1" return -32001 (agent not found)
"""
# When node 2 comes back:
recovery_sequence = """
1. libcluster discovers node 2
2. :net_kernel connects automatically
3. ClusterMonitor receives {:nodeup, :node2@host}
4. PubSub broadcasts {:cluster_event, %{event: :node_up, ...}}
5. Node 2's agents restart under their local supervisor
6. Future lookups find agents via RPC and re-cache
Key insight: Agents on node 2 restart automatically because
they're supervised by DynamicSupervisor on that node.
The cluster layer just needs to clean up stale directory entries.
"""
Graceful Degradation
# The system degrades gracefully:
degradation = %{
one_node_down: """
Agents on that node unavailable.
Agents on other nodes work normally.
No cascade failures.
""",
network_partition: """
Two groups of nodes can't see each other.
Each group works independently.
When partition heals, nodes reconnect via libcluster.
""",
all_nodes_down: """
No agents available.
HTTP requests return agent_not_found.
On restart, agents are re-created by supervisors.
"""
}
# This is the BEAM's philosophy:
# "Let it crash, but make recovery automatic."
Section 8: Full Multi-Node Demo
Setup (Requires 2 Terminals)
# Terminal 1:
cd agent_api
PORT=4000 iex --sname node1 --cookie agent_secret -S mix phx.server
# Terminal 2:
cd agent_api
PORT=4001 iex --sname node2 --cookie agent_secret -S mix phx.server
Test Commands
# 1. Check cluster info (from node1)
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"GetClusterInfo","params":{},"id":1}' | jq
# 2. Start an agent on node1
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-1"},"id":2}' | jq
# 3. Start an agent on node2
curl -s -X POST http://localhost:4001/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Analyzer-1"},"id":3}' | jq
# 4. List agents from node1 (should show BOTH agents)
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"ListAgents","params":{},"id":4}' | jq
# 5. Send task to Analyzer-1 via node1 (cross-node routing!)
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"SendMessage","params":{"agent":"Analyzer-1","action":"analyze","params":{"data":"test"}},"id":5}' | jq
# 6. Process the task on Analyzer-1 (via node1, routed to node2)
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"ProcessNext","params":{"agent":"Analyzer-1"},"id":6}' | jq
IEx Verification
# On node1:
Node.list()
# => [:node2@yourmachine]
AgentFramework.AgentDirectory.all_agents()
# => [{"Worker-1", :node1@host, #PID<0.xxx.0>}, ...]
AgentApi.Cluster.AgentRouter.find_agent("Analyzer-1")
# => {:ok, {:node2@host, #PID<0.yyy.0>}}
AgentApi.Cluster.ClusterMonitor.cluster_info()
# => %{self: :node1@host, connected: [:node2@host], total: 2}
Key Takeaways
-
libcluster automates node discovery - No manual
Node.connectneeded -
AgentRouter uses tiered lookup - ETS cache → local Registry → remote RPC
-
TaskManager routes transparently - Finds agents on any node, dispatches via RPC
-
ClusterMonitor cleans up on failure - Removes stale directory entries
-
PubSub broadcasts across nodes - Events propagate automatically
-
Graceful degradation - Node failures don’t cascade; recovery is automatic
What’s Next?
In the final session of Phase 5, we’ll verify everything works with the Checkpoint project:
- Run all Phase 4 tests (must still pass)
- Multi-node setup and cross-node tests
- PubSub event broadcasting verification
- Node failure recovery tests
- Complete verification checklist