Session 23: Distributed Erlang in Practice
Mix.install([])
Introduction
In Session 15, you learned the theory of distributed Erlang - nodes, cookies, and location transparency. In Session 22, you built an ETS-backed AgentDirectory for cross-node agent lookups.
Now it’s time to put it all together. This session is hands-on: you’ll start multiple BEAM nodes, connect them, make remote GenServer calls, and build a ClusterMonitor that tracks node connections.
Sources for This Session
This session synthesizes concepts from:
Learning Goals
By the end of this session, you’ll be able to:
- Start named BEAM nodes and connect them
-
Make remote GenServer calls using
{Module, node}tuples -
Use
:rpc.call/4for arbitrary remote execution -
Monitor node connections with
:net_kernel.monitor_nodes/1 - Build a ClusterMonitor GenServer
- Start and communicate with agents on remote nodes
Section 1: Review - Nodes and Cookies
🤔 Quick Recall from Session 15
# Before we get hands-on, recall the basics:
recall = %{
what_is_a_node: """
A running BEAM instance with a name.
Started with: iex --sname mynode --cookie mysecret
Node.self() returns the node's name.
""",
what_is_a_cookie: """
A shared secret that nodes must have in common to connect.
Think of it as a simple password for the cluster.
""",
how_to_connect: """
Node.connect(:other_node@hostname)
Returns true if connected, false if failed.
""",
location_transparency: """
GenServer.call(pid, :msg) # local
GenServer.call({Name, node}, :msg) # remote
Same API, different locations. Code doesn't need to know.
"""
}
Section 2: Starting Two Nodes with agent_framework
Hands-On: Two Terminals
This section requires two terminal windows. You’ll start two instances of the agent_framework application and connect them.
# Terminal 1:
cd agent_framework
iex --sname node1 --cookie agent_secret -S mix
# Terminal 2:
cd agent_framework
iex --sname node2 --cookie agent_secret -S mix
# On node1, check your identity:
Node.self()
# => :node1@yourmachine
# On node2, connect to node1:
Node.connect(:node1@yourmachine)
# => true
# On either node, list connections:
Node.list()
# => [:node1@yourmachine] (from node2's perspective)
# => [:node2@yourmachine] (from node1's perspective)
🤔 What Just Happened?
# When you called Node.connect, the BEAM:
# 1. Looked up node1's address using epmd (Erlang Port Mapper Daemon)
# 2. Verified the cookie matched
# 3. Established a TCP connection
# 4. Made both nodes aware of each other
# Connection is bidirectional and transitive:
# If A connects to B, and B connects to C,
# then A is also connected to C!
# epmd runs automatically when you start a named node.
# It maps node names to ports on the local machine.
Section 3: Remote GenServer Calls
The {Module, node} Tuple
# On node1, start an agent:
{:ok, agent} = AgentFramework.AgentServer.start_link("Worker-1")
AgentFramework.AgentServer.remember(agent, :location, "node1")
# Get the agent's state locally:
AgentFramework.AgentServer.get_state(agent)
# => %{name: "Worker-1", status: :idle, ...}
# On node2, you can call the agent on node1.
# But first, you need a way to address it.
# Option A: Use the pid directly (if you have it)
# The pid from node1 works on node2 because PIDs are cluster-global.
# Option B: If the GenServer is registered with a name:
# GenServer.call({AgentFramework.AgentServer, :node1@yourmachine}, :get_state)
# This only works if the process is registered with that exact module name.
# Our agents aren't registered with module names, so we'll use pids
# or the AgentDirectory (which we built in Session 22).
🤔 Why PIDs Work Across Nodes
# Recall from Session 15:
pid_explanation = """
Local PID: #PID<0.123.0>
Remote PID: #PID<12345.123.0>
The first number identifies the originating node.
0 means "local node".
Any other number is a remote node identifier.
When you send a message to a remote PID, the BEAM:
1. Identifies the target node from the PID
2. Serializes the message
3. Sends it over the TCP connection
4. Deserializes on the other end
5. Delivers to the process's mailbox
All transparent to your code!
"""
Section 4: :rpc.call/4 for Remote Execution
Running Code on Remote Nodes
# :rpc.call lets you execute any function on a remote node:
# :rpc.call(node, module, function, args)
# From node2, run code on node1:
:rpc.call(:node1@yourmachine, Node, :self, [])
# => :node1@yourmachine (executed ON node1, result returned to node2)
# More useful: list agents on a remote node
:rpc.call(:node1@yourmachine, AgentFramework.AgentSupervisor, :list_agents, [])
# => [#PID<0.234.0>]
# Get state of a remote agent
:rpc.call(:node1@yourmachine, AgentFramework.AgentSupervisor, :whereis, ["Worker-1"])
# => {:ok, #PID<0.234.0>}
🤔 When to Use :rpc.call vs GenServer.call
# :rpc.call(node, Module, :function, [args])
# - Runs ANY function on the remote node
# - Spawns a temporary process on the remote node
# - Returns {:badrpc, reason} on failure
# - Good for: one-off remote operations
# GenServer.call({Name, node}, message)
# - Calls a SPECIFIC GenServer on the remote node
# - Sends a message to an existing process
# - Raises on timeout (5 second default)
# - Good for: calling known services
# For our agent framework:
# - Use GenServer.call for direct agent communication
# - Use :rpc.call for discovery (finding agents on remote nodes)
comparison = %{
rpc_call: %{
use_when: "Discovery, one-off operations, running arbitrary code",
failure_mode: "{:badrpc, reason}",
overhead: "Spawns a temporary process"
},
genserver_call: %{
use_when: "Communicating with a known GenServer",
failure_mode: "Raises on timeout or noproc",
overhead: "Direct message to existing process"
}
}
Error Handling with RPC
# :rpc.call returns {:badrpc, reason} on failure
# This is different from GenServer.call which raises!
# Examples of :badrpc:
errors = %{
node_down: ":rpc.call(:dead_node@host, ...) => {:badrpc, :nodedown}",
bad_module: ":rpc.call(node, NonExistent, :foo, []) => {:badrpc, {:EXIT, ...}}",
timeout: ":rpc.call(node, :timer, :sleep, [60000]) => {:badrpc, :timeout}"
}
# Always pattern match on the result:
case :rpc.call(:node1@yourmachine, Node, :self, []) do
{:badrpc, reason} ->
IO.puts("RPC failed: #{inspect(reason)}")
result ->
IO.puts("Got: #{inspect(result)}")
end
Section 5: Node Monitoring
:net_kernel.monitor_nodes/1
# You can subscribe to node connection events:
:net_kernel.monitor_nodes(true)
# Now your process receives:
# {:nodeup, node} - when a node connects
# {:nodedown, node} - when a node disconnects
# Example: Monitor in IEx
receive do
{:nodeup, node} -> IO.puts("Connected: #{node}")
{:nodedown, node} -> IO.puts("Disconnected: #{node}")
after
5000 -> IO.puts("No events in 5 seconds")
end
# Unsubscribe:
:net_kernel.monitor_nodes(false)
🤔 Why Monitor Nodes?
# Consider: You have agents distributed across 3 nodes.
# Node 2 crashes. What should happen to its agents?
monitoring_value = """
Without monitoring:
- Calls to node2 agents silently fail
- No one knows node2 is gone
- AgentDirectory still has stale entries
With monitoring:
- ClusterMonitor receives {:nodedown, :node2@host}
- AgentDirectory removes node2's agents
- PubSub broadcasts the event
- Other nodes can react (restart agents, rebalance, etc.)
"""
Section 6: Building a ClusterMonitor GenServer
The Implementation
# Our ClusterMonitor (in agent_api/lib/agent_api/cluster/):
defmodule ClusterMonitorExample do
use GenServer
require Logger
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def connected_nodes do
GenServer.call(__MODULE__, :connected_nodes)
end
def cluster_info do
GenServer.call(__MODULE__, :cluster_info)
end
@impl true
def init(_opts) do
# Subscribe to node events
:net_kernel.monitor_nodes(true)
state = %{
connected_nodes: Node.list(),
self: Node.self()
}
Logger.info("ClusterMonitor started on #{Node.self()}")
{:ok, state}
end
@impl true
def handle_call(:connected_nodes, _from, state) do
{:reply, state.connected_nodes, state}
end
def handle_call(:cluster_info, _from, state) do
info = %{
self: state.self,
connected: state.connected_nodes,
total: length(state.connected_nodes) + 1
}
{:reply, info, state}
end
@impl true
def handle_info({:nodeup, node}, state) do
Logger.info("Node connected: #{node}")
new_nodes = Enum.uniq([node | state.connected_nodes])
{:noreply, %{state | connected_nodes: new_nodes}}
end
def handle_info({:nodedown, node}, state) do
Logger.warning("Node disconnected: #{node}")
new_nodes = List.delete(state.connected_nodes, node)
# Clean up agents from the disconnected node
# AgentFramework.AgentDirectory.remove_node_agents(node)
{:noreply, %{state | connected_nodes: new_nodes}}
end
@impl true
def terminate(_reason, _state) do
:net_kernel.monitor_nodes(false)
:ok
end
end
🤔 ClusterMonitor Design Decisions
# Why is ClusterMonitor a GenServer and not just :net_kernel.monitor_nodes?
design_reasoning = """
1. State tracking - GenServer maintains the current node list,
so you don't have to call Node.list() every time.
2. Event handling - Can do side effects on node up/down:
- Clean up AgentDirectory entries
- Broadcast PubSub events
- Trigger agent migration
3. Supervision - Under a supervisor, restarts if it crashes.
Re-subscribes to node events on restart.
4. Query API - clean_info() returns a formatted snapshot
of cluster state.
"""
Section 7: Starting Agents on Remote Nodes
Using DynamicSupervisor Remotely
# From node2, start an agent on node1:
:rpc.call(
:node1@yourmachine,
AgentFramework.AgentSupervisor,
:start_agent,
["Remote-Worker"]
)
# => {:ok, #PID<12345.234.0>}
# The agent is supervised on node1, but we initiated from node2!
🤔 Where Should Agents Live?
# Consider these strategies for agent placement:
placement_strategies = %{
local_only: %{
description: "Start agents only on the requesting node",
pros: ["Simple", "No network latency for local calls"],
cons: ["Uneven distribution", "No redundancy"]
},
round_robin: %{
description: "Distribute agents evenly across nodes",
pros: ["Even load distribution", "Uses all resources"],
cons: ["Network latency for some calls", "More complex routing"]
},
affinity: %{
description: "Place related agents on the same node",
pros: ["Fast communication between related agents", "Data locality"],
cons: ["May cause hotspots", "Complex placement logic"]
}
}
# For now, we start agents locally and let the router handle lookups.
# More sophisticated placement can come later.
Section 8: Interactive Exercises
Exercise 1: Two-Node Setup (Requires 2 Terminals)
# Terminal 1:
cd agent_framework
iex --sname node1 --cookie agent_secret -S mix
# Terminal 2:
cd agent_framework
iex --sname node2 --cookie agent_secret -S mix
# On node2, connect to node1:
Node.connect(:node1@yourmachine)
# Verify connection on both nodes:
Node.list()
# On node1, start an agent:
{:ok, w1} = AgentFramework.AgentSupervisor.start_agent("Worker-1")
AgentFramework.AgentServer.remember(w1, :origin, "node1")
# On node2, query the agent on node1:
:rpc.call(:node1@yourmachine, AgentFramework.AgentServer, :recall, [w1, :origin])
# => "node1"
# On node2, send a task to the agent on node1:
:rpc.call(:node1@yourmachine, AgentFramework.AgentServer, :send_task, [w1, :search, %{query: "distributed"}])
# => :ok
# On node1, process the task:
AgentFramework.AgentServer.process_next(w1)
# => {:ok, task, {:ok, "Search results for: distributed"}}
Exercise 2: Monitor Node Events
# On node1, set up a simple monitor:
:net_kernel.monitor_nodes(true)
# On node2, disconnect:
# Node.disconnect(:node1@yourmachine)
# On node1, check for events:
receive do
{:nodedown, node} ->
IO.puts("Lost connection to #{node}")
after
10000 -> IO.puts("No events")
end
# Reconnect from node2:
# Node.connect(:node1@yourmachine)
# On node1:
receive do
{:nodeup, node} ->
IO.puts("Reconnected to #{node}")
after
10000 -> IO.puts("No events")
end
# Clean up
:net_kernel.monitor_nodes(false)
Exercise 3: Cross-Node Agent Directory
# On node1, register an agent in the directory:
AgentFramework.AgentDirectory.register("Worker-1", Node.self(), w1)
# On node2, you can read the directory
# (if you started AgentDirectory on both nodes):
AgentFramework.AgentDirectory.lookup("Worker-1")
# Note: ETS is per-node! Node2's directory doesn't have this entry.
# This is why we need the AgentRouter (Session 25) to query remote nodes.
# Question: How would you sync directories across nodes?
# Think about it before Session 25!
Key Takeaways
-
Named nodes connect with shared cookies -
iex --sname node1 --cookie secret -
:rpc.call/4runs code on remote nodes - Returns{:badrpc, reason}on failure -
GenServer.call works remotely -
{Name, node}or remote pids -
:net_kernel.monitor_nodes/1- Subscribe to node up/down events -
ClusterMonitor tracks cluster state - GenServer wrapping node monitoring
-
ETS is per-node - AgentDirectory on node1 doesn’t see node2’s data without syncing
What’s Next?
In the next session, we’ll add Phoenix.PubSub for event broadcasting:
- Subscribe to agent lifecycle events
- Broadcast events across connected nodes automatically
- Build an AgentEventLogger
- PubSub works across nodes with zero configuration