Powered by AppSignal & Oban Pro

Session 25: Distributed Agents

notebooks/25_distributed_agents.livemd

Session 25: Distributed Agents

Mix.install([])

Introduction

You’ve built the pieces: ETS-backed AgentDirectory (Session 22), distributed Erlang hands-on (Session 23), and PubSub events (Session 24). Now it’s time to put them all together into a distributed multi-agent system.

This session tackles the hardest problem: finding and communicating with agents that could be on any node in the cluster. You’ll use libcluster for automatic node discovery, build an AgentRouter for cross-node agent lookup, and handle node failures gracefully.

Sources for This Session

This session synthesizes concepts from:

Learning Goals

By the end of this session, you’ll be able to:

  • Use libcluster for automatic node discovery
  • Understand cluster topologies (Gossip, Epmd, Kubernetes)
  • Build an AgentRouter for cross-node agent lookup
  • Update TaskManager for distributed routing
  • Add cluster-aware A2A methods
  • Handle node failures and agent recovery

Section 1: The Problem - Finding Agents Across Nodes

🤔 Opening Reflection

# You have a 3-node cluster:
#   Node 1: Worker-1, Worker-2
#   Node 2: Worker-3, Analyzer-1
#   Node 3: Worker-4

# An HTTP request arrives at Node 1:
# POST /a2a {"method": "SendMessage", "params": {"agent": "Analyzer-1"}}

# Problem: Analyzer-1 is on Node 2, not Node 1!

problem = """
Current TaskManager uses AgentSupervisor.whereis("Analyzer-1")
which only checks the LOCAL Registry on Node 1.

Result: {:error, :agent_not_found}

But the agent EXISTS - just on a different node.
How do we find it?
"""

# Approaches:
approaches = %{
  brute_force: """
    Loop through all nodes, :rpc.call each one to check.
    Slow: O(n) network calls per lookup.
  """,

  cached_directory: """
    Use AgentDirectory (ETS) to cache name→{node, pid}.
    Fast: O(1) ETS lookup.
    Challenge: Keep directory in sync when agents start/stop.
  """,

  hybrid: """
    1. Check AgentDirectory first (fast, cached)
    2. Fall back to local Registry
    3. Fall back to querying remote nodes
    Best of both worlds.
  """
}

# We'll implement the hybrid approach in AgentRouter.

Section 2: libcluster for Automatic Node Discovery

What is libcluster?

# Without libcluster, you manually connect nodes:
# Node.connect(:node2@hostname)

# With libcluster, nodes discover and connect AUTOMATICALLY.

libcluster_overview = """
libcluster provides automatic node discovery and connection.
You configure a "topology" (discovery strategy), and libcluster
handles finding and connecting to other nodes.

Think of it as:
  Manual:     You call each friend to invite them to the party.
  libcluster: You post the party address, friends find it themselves.
"""

Configuration

# In config/dev.exs:

libcluster_config = """
config :libcluster,
  topologies: [
    agent_cluster: [
      strategy: Cluster.Strategy.Gossip,
      config: [
        port: 45892,
        if_addr: "0.0.0.0",
        multicast_if: "127.0.0.1",
        multicast_addr: "230.1.1.251",
        multicast_ttl: 1
      ]
    ]
  ]
"""

# The Gossip strategy uses UDP multicast to discover nodes
# on the local network. Perfect for development!

Adding to Supervision Tree

# In application.ex, libcluster runs as a supervisor:

application_setup = """
def start(_type, _args) do
  topologies = Application.get_env(:libcluster, :topologies, [])

  children = [
    {Phoenix.PubSub, name: AgentApi.PubSub},
    {Cluster.Supervisor, [topologies, [name: AgentApi.ClusterSupervisor]]},
    AgentApi.Cluster.ClusterMonitor,
    AgentApi.Cluster.AgentEventLogger,
    AgentApiWeb.Endpoint
  ]

  opts = [strategy: :one_for_one, name: AgentApi.Supervisor]
  Supervisor.start_link(children, opts)
end
"""

# Order matters:
# 1. PubSub first (other services need it)
# 2. Cluster.Supervisor (connects nodes)
# 3. ClusterMonitor (tracks connections)
# 4. AgentEventLogger (logs events)
# 5. Endpoint (serves HTTP)

Section 3: Cluster Topologies

🤔 Choosing a Topology

# libcluster supports multiple discovery strategies:

topologies = %{
  gossip: %{
    description: "UDP multicast - nodes announce themselves on the network",
    best_for: "Development, local network",
    config: "Multicast address and port",
    pros: ["Zero configuration", "Automatic discovery"],
    cons: ["Only works on local network", "Not for production clouds"]
  },

  epmd: %{
    description: "Static list of node names",
    best_for: "Known, fixed infrastructure",
    config: "List of node names",
    pros: ["Simple", "Predictable"],
    cons: ["Must know nodes in advance", "No dynamic scaling"]
  },

  kubernetes: %{
    description: "Uses Kubernetes API to discover pods",
    best_for: "Kubernetes deployments",
    config: "Kubernetes namespace and labels",
    pros: ["Cloud-native", "Dynamic scaling"],
    cons: ["Kubernetes only"]
  },

  dns: %{
    description: "DNS-based discovery (DNS SRV records)",
    best_for: "Docker Compose, service mesh",
    config: "DNS name to query",
    pros: ["Works with any orchestrator", "Standard protocol"],
    cons: ["Requires DNS infrastructure"]
  }
}

# For development, we use Gossip.
# For production, you'd choose based on your deployment platform.

Section 4: Building AgentRouter

The Lookup Strategy

# AgentRouter implements a tiered lookup strategy:

lookup_strategy = """
find_agent("Worker-1"):

  Step 1: Check AgentDirectory (ETS)
          Fast O(1) lookup.
          If found AND process is alive → return {node, pid}
          If found BUT process is dead → remove stale entry, continue

  Step 2: Check local Registry
          AgentSupervisor.whereis("Worker-1")
          If found → register in directory, return {node, pid}

  Step 3: Search local agents by name
          Loop through AgentSupervisor.list_agents()
          If found → register in directory, return {node, pid}

  Step 4: Query remote nodes via RPC
          For each connected node:
            :rpc.call(node, AgentSupervisor, :whereis, ["Worker-1"])
          If found → register in directory, return {node, pid}

  Step 5: Not found → return :error
"""

The Implementation

# Here's the core of AgentRouter:

defmodule AgentRouterExample do
  alias AgentFramework.{AgentDirectory, AgentServer, AgentSupervisor}

  def find_agent(agent_name) do
    # Step 1: Check directory (fast path)
    case AgentDirectory.lookup(agent_name) do
      {:ok, {node, pid}} ->
        if process_reachable?(node, pid) do
          {:ok, {node, pid}}
        else
          AgentDirectory.unregister(agent_name)
          find_agent_fallback(agent_name)
        end

      :error ->
        find_agent_fallback(agent_name)
    end
  end

  defp find_agent_fallback(agent_name) do
    # Step 2: Check local registry
    case AgentSupervisor.whereis(agent_name) do
      {:ok, pid} ->
        AgentDirectory.register(agent_name, Node.self(), pid)
        {:ok, {Node.self(), pid}}

      :error ->
        # Step 3: Query remote nodes
        find_agent_remote(agent_name)
    end
  end

  defp find_agent_remote(agent_name) do
    Node.list()
    |> Enum.find_value(:error, fn node ->
      case :rpc.call(node, AgentFramework.AgentSupervisor, :whereis, [agent_name]) do
        {:ok, pid} ->
          AgentDirectory.register(agent_name, node, pid)
          {:ok, {node, pid}}
        _ ->
          nil  # Continue to next node
      end
    end)
  end

  defp process_reachable?(node, pid) do
    if node == Node.self() do
      Process.alive?(pid)
    else
      true  # Trust directory for remote processes
    end
  end
end

🤔 Why Cache in AgentDirectory?

# Question: After finding an agent via RPC, why cache it in AgentDirectory?

caching_rationale = """
Without caching:
  Every lookup → RPC call to all nodes → O(n) network calls
  100 requests/sec × 3 nodes = 300 RPC calls/sec

With caching:
  First lookup → RPC call → cache result
  Subsequent lookups → ETS read → O(1), no network
  100 requests/sec → 1 RPC + 99 ETS reads

The tradeoff:
  + Much faster for repeated lookups
  - Cache can become stale (agent moved, node crashed)

Staleness is handled by:
  1. Process.alive? check for local agents
  2. ClusterMonitor removes entries when nodes disconnect
  3. Re-lookup on RPC failure (transparent retry)
"""

Section 5: Updating TaskManager for Distributed Routing

Before and After

# BEFORE (Phase 4 - local only):
before_code = """
def send_message(agent_name, action, params) do
  case find_agent(agent_name) do
    {:ok, pid} ->
      AgentServer.send_task(pid, action, params)
      {:ok, %{status: :created, agent: agent_name}}
    :error ->
      {:error, :agent_not_found}
  end
end

defp find_agent(name) do
  case AgentSupervisor.whereis(name) do
    {:ok, pid} -> {:ok, pid}
    :error -> search_agents(name)  # local only
  end
end
"""

# AFTER (Phase 5 - distributed):
after_code = """
def send_message(agent_name, action, params) do
  case AgentRouter.find_agent(agent_name) do
    {:ok, {node, pid}} ->
      if node == Node.self() do
        AgentServer.send_task(pid, action, params)
      else
        :rpc.call(node, AgentServer, :send_task, [pid, action, params])
      end
      AgentEvents.broadcast_task_received(agent_name, %{action: action})
      {:ok, %{status: :created, agent: agent_name, node: node}}
    :error ->
      {:error, :agent_not_found}
  end
end
"""

# Key changes:
# 1. find_agent → AgentRouter.find_agent (cross-node)
# 2. Returns {node, pid} instead of just pid
# 3. Local vs remote dispatch based on node
# 4. Events broadcast via PubSub
# 5. Node info included in response

Section 6: Cluster-Aware A2A Methods

GetClusterInfo

# New A2A method for cluster status:

get_cluster_info = """
Request:
  POST /a2a
  {"jsonrpc": "2.0", "method": "GetClusterInfo", "params": {}, "id": 1}

Response:
  {
    "jsonrpc": "2.0",
    "result": {
      "self": "node1@hostname",
      "connected": ["node2@hostname"],
      "total_nodes": 2
    },
    "id": 1
  }
"""

Enhanced ListAgents

# ListAgents now includes node information:

enhanced_list_agents = """
Request:
  POST /a2a
  {"jsonrpc": "2.0", "method": "ListAgents", "params": {}, "id": 1}

Response:
  {
    "jsonrpc": "2.0",
    "result": {
      "agents": ["Worker-1", "Worker-2"],
      "agents_detail": [
        {"name": "Worker-1", "node": "node1@hostname"},
        {"name": "Worker-2", "node": "node2@hostname"}
      ],
      "count": 2
    },
    "id": 1
  }
"""

# Now you can see which node each agent lives on!

Section 7: Node Failure and Agent Recovery

🤔 What Happens When a Node Dies?

# Scenario: Node 2 crashes while running Analyzer-1

failure_sequence = """
1. Node 2 crashes
2. :net_kernel detects loss of connection
3. ClusterMonitor receives {:nodedown, :node2@host}
4. ClusterMonitor calls AgentDirectory.remove_node_agents(:node2@host)
5. PubSub broadcasts {:cluster_event, %{event: :node_down, ...}}
6. AgentEventLogger logs the event

Result:
  - AgentDirectory no longer has stale entries for node2
  - Subsequent lookups for "Analyzer-1" return :error
  - HTTP requests for "Analyzer-1" return -32001 (agent not found)
"""

# When node 2 comes back:
recovery_sequence = """
1. libcluster discovers node 2
2. :net_kernel connects automatically
3. ClusterMonitor receives {:nodeup, :node2@host}
4. PubSub broadcasts {:cluster_event, %{event: :node_up, ...}}
5. Node 2's agents restart under their local supervisor
6. Future lookups find agents via RPC and re-cache

Key insight: Agents on node 2 restart automatically because
they're supervised by DynamicSupervisor on that node.
The cluster layer just needs to clean up stale directory entries.
"""

Graceful Degradation

# The system degrades gracefully:

degradation = %{
  one_node_down: """
    Agents on that node unavailable.
    Agents on other nodes work normally.
    No cascade failures.
  """,

  network_partition: """
    Two groups of nodes can't see each other.
    Each group works independently.
    When partition heals, nodes reconnect via libcluster.
  """,

  all_nodes_down: """
    No agents available.
    HTTP requests return agent_not_found.
    On restart, agents are re-created by supervisors.
  """
}

# This is the BEAM's philosophy:
# "Let it crash, but make recovery automatic."

Section 8: Full Multi-Node Demo

Setup (Requires 2 Terminals)

# Terminal 1:
cd agent_api
PORT=4000 iex --sname node1 --cookie agent_secret -S mix phx.server

# Terminal 2:
cd agent_api
PORT=4001 iex --sname node2 --cookie agent_secret -S mix phx.server

Test Commands

# 1. Check cluster info (from node1)
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"GetClusterInfo","params":{},"id":1}' | jq

# 2. Start an agent on node1
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Worker-1"},"id":2}' | jq

# 3. Start an agent on node2
curl -s -X POST http://localhost:4001/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"StartAgent","params":{"name":"Analyzer-1"},"id":3}' | jq

# 4. List agents from node1 (should show BOTH agents)
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"ListAgents","params":{},"id":4}' | jq

# 5. Send task to Analyzer-1 via node1 (cross-node routing!)
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"SendMessage","params":{"agent":"Analyzer-1","action":"analyze","params":{"data":"test"}},"id":5}' | jq

# 6. Process the task on Analyzer-1 (via node1, routed to node2)
curl -s -X POST http://localhost:4000/a2a \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"ProcessNext","params":{"agent":"Analyzer-1"},"id":6}' | jq

IEx Verification

# On node1:
Node.list()
# => [:node2@yourmachine]

AgentFramework.AgentDirectory.all_agents()
# => [{"Worker-1", :node1@host, #PID<0.xxx.0>}, ...]

AgentApi.Cluster.AgentRouter.find_agent("Analyzer-1")
# => {:ok, {:node2@host, #PID<0.yyy.0>}}

AgentApi.Cluster.ClusterMonitor.cluster_info()
# => %{self: :node1@host, connected: [:node2@host], total: 2}

Key Takeaways

  1. libcluster automates node discovery - No manual Node.connect needed

  2. AgentRouter uses tiered lookup - ETS cache → local Registry → remote RPC

  3. TaskManager routes transparently - Finds agents on any node, dispatches via RPC

  4. ClusterMonitor cleans up on failure - Removes stale directory entries

  5. PubSub broadcasts across nodes - Events propagate automatically

  6. Graceful degradation - Node failures don’t cascade; recovery is automatic


What’s Next?

In the final session of Phase 5, we’ll verify everything works with the Checkpoint project:

  • Run all Phase 4 tests (must still pass)
  • Multi-node setup and cross-node tests
  • PubSub event broadcasting verification
  • Node failure recovery tests
  • Complete verification checklist

Navigation

Previous: Session 24 - Phoenix.PubSub

Next: Session 26 - Checkpoint: Distributed Agents