Session 26: Checkpoint - Full Distributed Integration
Mix.install([])
Introduction
This is the final checkpoint for Phase 5. You’ve built a distributed multi-agent system across Sessions 22-25. Now it’s time to verify that everything works together - from single-node operations through multi-node cross-node communication.
This session walks through structured verification of every Phase 5 component.
Learning Goals
By the end of this session, you’ll have verified:
- All Phase 4 functionality still works (backward compatibility)
- ETS-backed AgentDirectory operates correctly
- Multi-node cluster formation via libcluster
- Cross-node agent discovery and routing
- PubSub event broadcasting across nodes
- Node failure recovery
Section 1: Project Structure Review
What Changed in Phase 5
phase_5_changes = """
New Files:
agent_framework/
lib/agent_framework/
agent_directory.ex ← ETS-backed name→{node,pid} directory
agent_api/
lib/agent_api/
agent_events.ex ← PubSub event broadcasting
cluster/
cluster_monitor.ex ← Node up/down monitoring
agent_event_logger.ex ← Event logging subscriber
agent_router.ex ← Cross-node agent discovery
Modified Files:
agent_framework/
lib/agent_framework/
application.ex ← Added AgentDirectory to supervision tree
agent_api/
mix.exs ← Added libcluster dependency
config/dev.exs ← Added libcluster Gossip topology
lib/agent_api/
application.ex ← Added cluster supervision children
a2a/task_manager.ex ← Uses AgentRouter for cross-node lookup
lib/agent_api_web/
controllers/a2a_controller.ex ← Added GetClusterInfo, enhanced ListAgents
"""
Section 2: New Module Overview
Architecture Diagram
architecture = """
Phase 5 Architecture:
┌─────────── Node 1 ──────────────────────────────────────────┐
│ │
│ AgentApi.Supervisor │
│ ├── Phoenix.PubSub ◄──── cross-node sync ────► │
│ ├── Cluster.Supervisor (libcluster) │
│ ├── ClusterMonitor (node events) │
│ ├── AgentEventLogger (logs events) │
│ └── AgentApiWeb.Endpoint │
│ └── A2AController │
│ └── TaskManager │
│ └── AgentRouter │
│ ├── AgentDirectory (ETS) │
│ ├── Local Registry │
│ └── Remote RPC ──────────► │
│ │
│ AgentFramework.Supervisor │
│ ├── Registry │
│ ├── AgentDirectory (ETS table owner) │
│ └── AgentSupervisor (DynamicSupervisor) │
│ ├── AgentServer "Worker-1" │
│ └── AgentServer "Worker-2" │
│ │
└─────────────────────────────────────────────────────────────┘
│
TCP connection
(distributed Erlang)
│
┌─────────── Node 2 ──────────────────────────────────────────┐
│ (Same structure, different agents) │
│ AgentServer "Analyzer-1" │
│ AgentServer "Worker-3" │
└─────────────────────────────────────────────────────────────┘
"""
Module Responsibilities
module_responsibilities = %{
"AgentDirectory" => """
ETS-backed directory: name → {node, pid}
GenServer owns the ETS table, coordinates writes.
Any process reads directly from ETS (concurrent).
Provides: register, unregister, lookup, all_agents, agents_on_node
""",
"ClusterMonitor" => """
GenServer monitoring node connections via :net_kernel.monitor_nodes/1.
Tracks connected nodes in state.
Cleans up AgentDirectory on node disconnect.
Broadcasts cluster events via PubSub.
""",
"AgentEvents" => """
PubSub wrapper for domain-specific event broadcasting.
Topics: "agent:events", "agent:events:", "cluster:events"
Events: agent_started, agent_stopped, task_completed, task_received
""",
"AgentEventLogger" => """
GenServer that subscribes to all agent and cluster events.
Logs events with appropriate severity levels.
Pattern: subscribe in init, handle in handle_info.
""",
"AgentRouter" => """
Cross-node agent discovery and routing.
Tiered lookup: ETS → local Registry → remote RPC.
Caches results in AgentDirectory.
Provides: find_agent, get_agent_state, send_task, list_all_agents
"""
}
Section 3: Single-Node Verification
Phase 4 Tests Must Still Pass
# Run agent_framework tests
cd agent_framework && mix test
# Run agent_api tests
cd agent_api && mix test
# All existing functionality should work identically.
# The Phase 5 changes are additive - nothing was removed.
backward_compatibility = """
Checklist:
[ ] agent_framework: mix test passes
[ ] agent_api: mix test passes
[ ] Health endpoint works: GET /health/
[ ] Agent card works: GET /.well-known/agent.json
[ ] StartAgent works: POST /a2a
[ ] SendMessage works: POST /a2a
[ ] GetAgentState works: POST /a2a
[ ] ListAgents works: POST /a2a
[ ] ProcessNext works: POST /a2a
"""
Single-Node curl Tests
# Start the server
cd agent_api
iex -S mix phx.server
# 1. Health check
curl http://localhost:4000/health/ | jq
# 2. Agent card
curl http://localhost:4000/.well-known/agent.json | jq
# 3. Start an agent
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-1"},"id":1}' | jq
# 4. Send a task
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"SendMessage","params":{"agent":"Worker-1","action":"search","params":{"query":"OTP"}},"id":2}' | jq
# 5. Get agent state
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"GetAgentState","params":{"agent":"Worker-1"},"id":3}' | jq
# 6. Process the task
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"ProcessNext","params":{"agent":"Worker-1"},"id":4}' | jq
# 7. List agents (with new node info)
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"ListAgents","params":{},"id":5}' | jq
# 8. NEW: Get cluster info
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"GetClusterInfo","params":{},"id":6}' | jq
Section 4: Multi-Node Setup
Starting 2 Nodes with libcluster
# Terminal 1:
cd agent_api
PORT=4000 iex --sname node1 --cookie agent_secret -S mix phx.server
# Terminal 2:
cd agent_api
PORT=4001 iex --sname node2 --cookie agent_secret -S mix phx.server
# libcluster's Gossip strategy will automatically connect them.
# Watch the console for connection messages.
Verify Connection
# On node1:
Node.list()
# Should show: [:node2@yourmachine]
AgentApi.Cluster.ClusterMonitor.cluster_info()
# Should show: %{self: :node1@..., connected: [:node2@...], total: 2}
# Via HTTP:
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"GetClusterInfo","params":{},"id":1}' | jq
# Expected:
# {
# "result": {
# "self": "node1@yourmachine",
# "connected": ["node2@yourmachine"],
# "total_nodes": 2
# }
# }
Section 5: Cross-Node Agent Communication Tests
Start Agents on Different Nodes
# Start Worker-1 on node1
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-1"},"id":1}' | jq
# Start Analyzer-1 on node2
curl -s -X POST http://localhost:4001/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Analyzer-1"},"id":2}' | jq
# List agents from node1 (should show BOTH)
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"ListAgents","params":{},"id":3}' | jq
Cross-Node Task Dispatch
# Send a task to Analyzer-1 via node1 (cross-node!)
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"SendMessage","params":{"agent":"Analyzer-1","action":"analyze","params":{"data":"cross-node test"}},"id":4}' | jq
# Process the task via node1 (routed to node2)
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"ProcessNext","params":{"agent":"Analyzer-1"},"id":5}' | jq
# Get Analyzer-1's state from node1 (cross-node state query)
curl -s -X POST http://localhost:4000/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"GetAgentState","params":{"agent":"Analyzer-1"},"id":6}' | jq
IEx Cross-Node Verification
# On node1:
AgentApi.Cluster.AgentRouter.find_agent("Analyzer-1")
# => {:ok, {:node2@yourmachine, #PID<...>}}
AgentFramework.AgentDirectory.all_agents()
# Shows agents on both nodes
AgentApi.Cluster.AgentRouter.list_all_agents()
# => [{"Worker-1", :node1@host, pid1}, {"Analyzer-1", :node2@host, pid2}]
Section 6: PubSub Event Broadcasting Tests
Subscribe to Events in IEx
# On node1, subscribe to agent events:
AgentApi.AgentEvents.subscribe_agent_events()
# On node2, start a new agent:
# (from terminal 2 or via curl to port 4001)
# curl -s -X POST http://localhost:4001/a2a \
# -H "Content-Type: application/json" \
# -d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-3"},"id":1}'
# Back on node1, check for events:
receive do
{:agent_event, event} ->
IO.inspect(event, label: "Received event")
after
5000 -> IO.puts("No event received")
end
# You should see:
# Received event: %{event: :agent_started, agent: "Worker-3", node: :node2@host, ...}
Check Event Logger Output
# The AgentEventLogger on both nodes should be logging events.
# Check your console output for messages like:
#
# [info] [AgentEventLogger] agent_started: Worker-3 on node2@yourmachine
# [info] [AgentEventLogger] task_received: Analyzer-1 on node2@yourmachine
# [info] [AgentEventLogger] task_completed: Analyzer-1 on node2@yourmachine
Section 7: Node Failure Recovery Tests
Simulate Node Failure
# 1. Note the current state:
AgentFramework.AgentDirectory.all_agents()
# Shows agents on both nodes
# 2. Kill node2 (Ctrl+C twice in Terminal 2)
# 3. Check ClusterMonitor:
AgentApi.Cluster.ClusterMonitor.cluster_info()
# connected should be empty: []
# 4. Check AgentDirectory:
AgentFramework.AgentDirectory.all_agents()
# node2's agents should be REMOVED automatically
# 5. Try to reach Analyzer-1:
AgentApi.Cluster.AgentRouter.find_agent("Analyzer-1")
# => :error (agent not reachable)
# 6. Restart node2 (in Terminal 2):
# PORT=4001 iex --sname node2 --cookie agent_secret -S mix phx.server
# 7. After reconnection, verify:
Node.list()
# => [:node2@yourmachine]
AgentApi.Cluster.ClusterMonitor.cluster_info()
# total should be 2 again
Section 8: Verification Checklist
checklist = """
Phase 5 Verification Checklist:
Unit Tests:
[ ] cd agent_framework && mix test (all pass)
[ ] cd agent_api && mix test (all pass)
Single-Node (Phase 4 backward compatibility):
[ ] GET /health/ returns healthy
[ ] GET /.well-known/agent.json returns agent card
[ ] StartAgent creates an agent
[ ] SendMessage dispatches a task
[ ] GetAgentState returns agent state
[ ] ListAgents lists all agents
[ ] ProcessNext processes a task
Phase 5 New Features (Single-Node):
[ ] GetClusterInfo returns cluster status
[ ] ListAgents includes node information
[ ] AgentDirectory stores agent entries
[ ] AgentEventLogger logs events
Multi-Node:
[ ] Two nodes connect automatically (libcluster Gossip)
[ ] GetClusterInfo shows both nodes
[ ] ListAgents shows agents from ALL nodes
[ ] Cross-node SendMessage works (agent on other node)
[ ] Cross-node ProcessNext works
[ ] Cross-node GetAgentState works
[ ] PubSub events received on remote nodes
Node Failure:
[ ] Node disconnect detected by ClusterMonitor
[ ] AgentDirectory entries cleaned up on disconnect
[ ] Node reconnect detected
[ ] Agents accessible again after reconnection
"""
Section 9: Complete Test Script
#!/bin/bash
# Phase 5 Integration Test Script
# Run with: bash test_phase5.sh
echo "=== Phase 5: Distributed Agents Integration Test ==="
echo ""
BASE="http://localhost:4000"
echo "--- Single-Node Tests ---"
echo ""
echo "1. Health check:"
curl -s $BASE/health/ | jq .status
echo ""
echo "2. Agent card:"
curl -s $BASE/.well-known/agent.json | jq .name
echo ""
echo "3. Start Worker-1:"
curl -s -X POST $BASE/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-1"},"id":1}' | jq .result.status
echo ""
echo "4. Send task to Worker-1:"
curl -s -X POST $BASE/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"SendMessage","params":{"agent":"Worker-1","action":"search","params":{"query":"distributed"}},"id":2}' | jq .result.status
echo ""
echo "5. Get Worker-1 state:"
curl -s -X POST $BASE/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"GetAgentState","params":{"agent":"Worker-1"},"id":3}' | jq .result.name
echo ""
echo "6. Process Worker-1 task:"
curl -s -X POST $BASE/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"ProcessNext","params":{"agent":"Worker-1"},"id":4}' | jq .result.status
echo ""
echo "7. List agents:"
curl -s -X POST $BASE/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"ListAgents","params":{},"id":5}' | jq .result
echo ""
echo "8. Cluster info:"
curl -s -X POST $BASE/a2a \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"GetClusterInfo","params":{},"id":6}' | jq .result
echo ""
echo "=== Single-node tests complete ==="
echo ""
echo "For multi-node tests, start node2 on PORT=4001 and run:"
echo " curl -s -X POST http://localhost:4001/a2a ..."
echo ""
Section 10: Key Learnings from Phase 5
🤔 Final Reflection
phase_5_summary = """
Phase 5 brought together four major concepts:
1. ETS (Session 22)
- In-memory key-value storage
- Concurrent reads without bottleneck
- GenServer + ETS pattern for safe ownership
2. Distributed Erlang (Session 23)
- Named nodes, cookies, connections
- :rpc.call for remote execution
- Location transparency
3. Phoenix.PubSub (Session 24)
- Publish-subscribe across nodes
- Decoupled event system
- Automatic cross-node propagation
4. Distributed Agents (Session 25)
- libcluster for automatic discovery
- AgentRouter for tiered lookups
- Graceful degradation on failure
Together, these create a system where:
- Agents run on any node
- HTTP requests route to the correct node
- Events broadcast to all nodes
- Node failures are handled automatically
This is the BEAM's sweet spot: building distributed,
fault-tolerant systems with surprisingly little code.
"""
What You’ve Built (Phases 1-5)
full_journey = """
Phase 1: Agent struct (immutable data)
Phase 2: Process-based agents (concurrency, manual supervision)
Phase 3: OTP agents (GenServer, DynamicSupervisor, Application)
Phase 4: Phoenix API (HTTP endpoints, A2A protocol, JSON-RPC)
Phase 5: Distribution (ETS, clustering, PubSub, cross-node routing)
From a simple struct to a distributed multi-agent system
that communicates via HTTP, discovers peers automatically,
and recovers from failures.
The BEAM made this possible with:
- Lightweight processes (millions possible)
- Location transparency (same API local/remote)
- Let-it-crash philosophy (supervisors handle failure)
- Built-in distribution (clustering is a feature, not an add-on)
"""