Powered by AppSignal & Oban Pro

Session 26: Checkpoint - Full Distributed Integration

26_checkpoint_distributed_agents.livemd

Session 26: Checkpoint - Full Distributed Integration

Mix.install([])

Introduction

This is the final checkpoint for Phase 5. You’ve built a distributed multi-agent system across Sessions 22-25. Now it’s time to verify that everything works together - from single-node operations through multi-node cross-node communication.

This session walks through structured verification of every Phase 5 component.

Learning Goals

By the end of this session, you’ll have verified:

  • All Phase 4 functionality still works (backward compatibility)
  • ETS-backed AgentDirectory operates correctly
  • Multi-node cluster formation via libcluster
  • Cross-node agent discovery and routing
  • PubSub event broadcasting across nodes
  • Node failure recovery

Section 1: Project Structure Review

What Changed in Phase 5

phase_5_changes = """
New Files:
  agent_framework/
    lib/agent_framework/
      agent_directory.ex         ← ETS-backed name→{node,pid} directory

  agent_api/
    lib/agent_api/
      agent_events.ex            ← PubSub event broadcasting
      cluster/
        cluster_monitor.ex       ← Node up/down monitoring
        agent_event_logger.ex    ← Event logging subscriber
        agent_router.ex          ← Cross-node agent discovery

Modified Files:
  agent_framework/
    lib/agent_framework/
      application.ex             ← Added AgentDirectory to supervision tree

  agent_api/
    mix.exs                      ← Added libcluster dependency
    config/dev.exs               ← Added libcluster Gossip topology
    lib/agent_api/
      application.ex             ← Added cluster supervision children
      a2a/task_manager.ex        ← Uses AgentRouter for cross-node lookup
    lib/agent_api_web/
      controllers/a2a_controller.ex ← Added GetClusterInfo, enhanced ListAgents
"""

Section 2: New Module Overview

Architecture Diagram

architecture = """
Phase 5 Architecture:

  ┌─────────── Node 1 ──────────────────────────────────────────┐
  │                                                             │
  │  AgentApi.Supervisor                                        │
  │    ├── Phoenix.PubSub ◄──── cross-node sync ────►           │
  │    ├── Cluster.Supervisor (libcluster)                      │
  │    ├── ClusterMonitor (node events)                         │
  │    ├── AgentEventLogger (logs events)                       │
  │    └── AgentApiWeb.Endpoint                                 │
  │          └── A2AController                                  │
  │                └── TaskManager                              │
  │                      └── AgentRouter                        │
  │                            ├── AgentDirectory (ETS)         │
  │                            ├── Local Registry               │
  │                            └── Remote RPC ──────────►       │
  │                                                             │
  │  AgentFramework.Supervisor                                  │
  │    ├── Registry                                             │
  │    ├── AgentDirectory (ETS table owner)                     │
  │    └── AgentSupervisor (DynamicSupervisor)                  │
  │          ├── AgentServer "Worker-1"                         │
  │          └── AgentServer "Worker-2"                         │
  │                                                             │
  └─────────────────────────────────────────────────────────────┘
                         │
                    TCP connection
                    (distributed Erlang)
                         │
  ┌─────────── Node 2 ──────────────────────────────────────────┐
  │  (Same structure, different agents)                         │
  │    AgentServer "Analyzer-1"                                 │
  │    AgentServer "Worker-3"                                   │
  └─────────────────────────────────────────────────────────────┘
"""

Module Responsibilities

module_responsibilities = %{
  "AgentDirectory" => """
    ETS-backed directory: name → {node, pid}
    GenServer owns the ETS table, coordinates writes.
    Any process reads directly from ETS (concurrent).
    Provides: register, unregister, lookup, all_agents, agents_on_node
  """,

  "ClusterMonitor" => """
    GenServer monitoring node connections via :net_kernel.monitor_nodes/1.
    Tracks connected nodes in state.
    Cleans up AgentDirectory on node disconnect.
    Broadcasts cluster events via PubSub.
  """,

  "AgentEvents" => """
    PubSub wrapper for domain-specific event broadcasting.
    Topics: "agent:events", "agent:events:", "cluster:events"
    Events: agent_started, agent_stopped, task_completed, task_received
  """,

  "AgentEventLogger" => """
    GenServer that subscribes to all agent and cluster events.
    Logs events with appropriate severity levels.
    Pattern: subscribe in init, handle in handle_info.
  """,

  "AgentRouter" => """
    Cross-node agent discovery and routing.
    Tiered lookup: ETS → local Registry → remote RPC.
    Caches results in AgentDirectory.
    Provides: find_agent, get_agent_state, send_task, list_all_agents
  """
}

Section 3: Single-Node Verification

Phase 4 Tests Must Still Pass

# Run agent_framework tests
cd agent_framework && mix test

# Run agent_api tests
cd agent_api && mix test
# All existing functionality should work identically.
# The Phase 5 changes are additive - nothing was removed.

backward_compatibility = """
Checklist:
  [ ] agent_framework: mix test passes
  [ ] agent_api: mix test passes
  [ ] Health endpoint works: GET /health/
  [ ] Agent card works: GET /.well-known/agent.json
  [ ] StartAgent works: POST /a2a
  [ ] SendMessage works: POST /a2a
  [ ] GetAgentState works: POST /a2a
  [ ] ListAgents works: POST /a2a
  [ ] ProcessNext works: POST /a2a
"""

Single-Node curl Tests

# Start the server
cd agent_api
iex -S mix phx.server

# 1. Health check
curl http://localhost:4000/health/ | jq

# 2. Agent card
curl http://localhost:4000/.well-known/agent.json | jq

# 3. Start an agent
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-1"},"id":1}' | jq

# 4. Send a task
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"SendMessage","params":{"agent":"Worker-1","action":"search","params":{"query":"OTP"}},"id":2}' | jq

# 5. Get agent state
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"GetAgentState","params":{"agent":"Worker-1"},"id":3}' | jq

# 6. Process the task
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"ProcessNext","params":{"agent":"Worker-1"},"id":4}' | jq

# 7. List agents (with new node info)
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"ListAgents","params":{},"id":5}' | jq

# 8. NEW: Get cluster info
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"GetClusterInfo","params":{},"id":6}' | jq

Section 4: Multi-Node Setup

Starting 2 Nodes with libcluster

# Terminal 1:
cd agent_api
PORT=4000 iex --sname node1 --cookie agent_secret -S mix phx.server

# Terminal 2:
cd agent_api
PORT=4001 iex --sname node2 --cookie agent_secret -S mix phx.server

# libcluster's Gossip strategy will automatically connect them.
# Watch the console for connection messages.

Verify Connection

# On node1:
Node.list()
# Should show: [:node2@yourmachine]

AgentApi.Cluster.ClusterMonitor.cluster_info()
# Should show: %{self: :node1@..., connected: [:node2@...], total: 2}
# Via HTTP:
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"GetClusterInfo","params":{},"id":1}' | jq

# Expected:
# {
#   "result": {
#     "self": "node1@yourmachine",
#     "connected": ["node2@yourmachine"],
#     "total_nodes": 2
#   }
# }

Section 5: Cross-Node Agent Communication Tests

Start Agents on Different Nodes

# Start Worker-1 on node1
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-1"},"id":1}' | jq

# Start Analyzer-1 on node2
curl -s -X POST http://localhost:4001/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Analyzer-1"},"id":2}' | jq

# List agents from node1 (should show BOTH)
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"ListAgents","params":{},"id":3}' | jq

Cross-Node Task Dispatch

# Send a task to Analyzer-1 via node1 (cross-node!)
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"SendMessage","params":{"agent":"Analyzer-1","action":"analyze","params":{"data":"cross-node test"}},"id":4}' | jq

# Process the task via node1 (routed to node2)
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"ProcessNext","params":{"agent":"Analyzer-1"},"id":5}' | jq

# Get Analyzer-1's state from node1 (cross-node state query)
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"GetAgentState","params":{"agent":"Analyzer-1"},"id":6}' | jq

IEx Cross-Node Verification

# On node1:
AgentApi.Cluster.AgentRouter.find_agent("Analyzer-1")
# => {:ok, {:node2@yourmachine, #PID<...>}}

AgentFramework.AgentDirectory.all_agents()
# Shows agents on both nodes

AgentApi.Cluster.AgentRouter.list_all_agents()
# => [{"Worker-1", :node1@host, pid1}, {"Analyzer-1", :node2@host, pid2}]

Section 6: PubSub Event Broadcasting Tests

Subscribe to Events in IEx

# On node1, subscribe to agent events:
AgentApi.AgentEvents.subscribe_agent_events()

# On node2, start a new agent:
# (from terminal 2 or via curl to port 4001)
# curl -s -X POST http://localhost:4001/a2a \
#   -H "Content-Type: application/json" \
#   -d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-3"},"id":1}'

# Back on node1, check for events:
receive do
  {:agent_event, event} ->
    IO.inspect(event, label: "Received event")
after
  5000 -> IO.puts("No event received")
end

# You should see:
# Received event: %{event: :agent_started, agent: "Worker-3", node: :node2@host, ...}

Check Event Logger Output

# The AgentEventLogger on both nodes should be logging events.
# Check your console output for messages like:
#
# [info] [AgentEventLogger] agent_started: Worker-3 on node2@yourmachine
# [info] [AgentEventLogger] task_received: Analyzer-1 on node2@yourmachine
# [info] [AgentEventLogger] task_completed: Analyzer-1 on node2@yourmachine

Section 7: Node Failure Recovery Tests

Simulate Node Failure

# 1. Note the current state:
AgentFramework.AgentDirectory.all_agents()
# Shows agents on both nodes

# 2. Kill node2 (Ctrl+C twice in Terminal 2)

# 3. Check ClusterMonitor:
AgentApi.Cluster.ClusterMonitor.cluster_info()
# connected should be empty: []

# 4. Check AgentDirectory:
AgentFramework.AgentDirectory.all_agents()
# node2's agents should be REMOVED automatically

# 5. Try to reach Analyzer-1:
AgentApi.Cluster.AgentRouter.find_agent("Analyzer-1")
# => :error (agent not reachable)

# 6. Restart node2 (in Terminal 2):
# PORT=4001 iex --sname node2 --cookie agent_secret -S mix phx.server

# 7. After reconnection, verify:
Node.list()
# => [:node2@yourmachine]

AgentApi.Cluster.ClusterMonitor.cluster_info()
# total should be 2 again

Section 8: Verification Checklist

checklist = """
Phase 5 Verification Checklist:

Unit Tests:
  [ ] cd agent_framework && mix test  (all pass)
  [ ] cd agent_api && mix test         (all pass)

Single-Node (Phase 4 backward compatibility):
  [ ] GET /health/ returns healthy
  [ ] GET /.well-known/agent.json returns agent card
  [ ] StartAgent creates an agent
  [ ] SendMessage dispatches a task
  [ ] GetAgentState returns agent state
  [ ] ListAgents lists all agents
  [ ] ProcessNext processes a task

Phase 5 New Features (Single-Node):
  [ ] GetClusterInfo returns cluster status
  [ ] ListAgents includes node information
  [ ] AgentDirectory stores agent entries
  [ ] AgentEventLogger logs events

Multi-Node:
  [ ] Two nodes connect automatically (libcluster Gossip)
  [ ] GetClusterInfo shows both nodes
  [ ] ListAgents shows agents from ALL nodes
  [ ] Cross-node SendMessage works (agent on other node)
  [ ] Cross-node ProcessNext works
  [ ] Cross-node GetAgentState works
  [ ] PubSub events received on remote nodes

Node Failure:
  [ ] Node disconnect detected by ClusterMonitor
  [ ] AgentDirectory entries cleaned up on disconnect
  [ ] Node reconnect detected
  [ ] Agents accessible again after reconnection
"""

Section 9: Complete Test Script

#!/bin/bash
# Phase 5 Integration Test Script
# Run with: bash test_phase5.sh

echo "=== Phase 5: Distributed Agents Integration Test ==="
echo ""

BASE="http://localhost:4000"

echo "--- Single-Node Tests ---"
echo ""

echo "1. Health check:"
curl -s $BASE/health/ | jq .status
echo ""

echo "2. Agent card:"
curl -s $BASE/.well-known/agent.json | jq .name
echo ""

echo "3. Start Worker-1:"
curl -s -X POST $BASE/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-1"},"id":1}' | jq .result.status
echo ""

echo "4. Send task to Worker-1:"
curl -s -X POST $BASE/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"SendMessage","params":{"agent":"Worker-1","action":"search","params":{"query":"distributed"}},"id":2}' | jq .result.status
echo ""

echo "5. Get Worker-1 state:"
curl -s -X POST $BASE/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"GetAgentState","params":{"agent":"Worker-1"},"id":3}' | jq .result.name
echo ""

echo "6. Process Worker-1 task:"
curl -s -X POST $BASE/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"ProcessNext","params":{"agent":"Worker-1"},"id":4}' | jq .result.status
echo ""

echo "7. List agents:"
curl -s -X POST $BASE/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"ListAgents","params":{},"id":5}' | jq .result
echo ""

echo "8. Cluster info:"
curl -s -X POST $BASE/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"GetClusterInfo","params":{},"id":6}' | jq .result
echo ""

echo "=== Single-node tests complete ==="
echo ""
echo "For multi-node tests, start node2 on PORT=4001 and run:"
echo "  curl -s -X POST http://localhost:4001/a2a ..."
echo ""

Section 10: Key Learnings from Phase 5

🤔 Final Reflection

phase_5_summary = """
Phase 5 brought together four major concepts:

1. ETS (Session 22)
   - In-memory key-value storage
   - Concurrent reads without bottleneck
   - GenServer + ETS pattern for safe ownership

2. Distributed Erlang (Session 23)
   - Named nodes, cookies, connections
   - :rpc.call for remote execution
   - Location transparency

3. Phoenix.PubSub (Session 24)
   - Publish-subscribe across nodes
   - Decoupled event system
   - Automatic cross-node propagation

4. Distributed Agents (Session 25)
   - libcluster for automatic discovery
   - AgentRouter for tiered lookups
   - Graceful degradation on failure

Together, these create a system where:
  - Agents run on any node
  - HTTP requests route to the correct node
  - Events broadcast to all nodes
  - Node failures are handled automatically

This is the BEAM's sweet spot: building distributed,
fault-tolerant systems with surprisingly little code.
"""

What You’ve Built (Phases 1-5)

full_journey = """
Phase 1: Agent struct (immutable data)
Phase 2: Process-based agents (concurrency, manual supervision)
Phase 3: OTP agents (GenServer, DynamicSupervisor, Application)
Phase 4: Phoenix API (HTTP endpoints, A2A protocol, JSON-RPC)
Phase 5: Distribution (ETS, clustering, PubSub, cross-node routing)

From a simple struct to a distributed multi-agent system
that communicates via HTTP, discovers peers automatically,
and recovers from failures.

The BEAM made this possible with:
  - Lightweight processes (millions possible)
  - Location transparency (same API local/remote)
  - Let-it-crash philosophy (supervisors handle failure)
  - Built-in distribution (clustering is a feature, not an add-on)
"""

Navigation

Previous: Session 25 - Distributed Agents