Powered by AppSignal & Oban Pro

Session 23: Distributed Erlang in Practice

notebooks/23_distributed_erlang.livemd

Session 23: Distributed Erlang in Practice

Mix.install([])

Introduction

In Session 15, you learned the theory of distributed Erlang - nodes, cookies, and location transparency. In Session 22, you built an ETS-backed AgentDirectory for cross-node agent lookups.

Now it’s time to put it all together. This session is hands-on: you’ll start multiple BEAM nodes, connect them, make remote GenServer calls, and build a ClusterMonitor that tracks node connections.

Sources for This Session

This session synthesizes concepts from:

Learning Goals

By the end of this session, you’ll be able to:

  • Start named BEAM nodes and connect them
  • Make remote GenServer calls using {Module, node} tuples
  • Use :rpc.call/4 for arbitrary remote execution
  • Monitor node connections with :net_kernel.monitor_nodes/1
  • Build a ClusterMonitor GenServer
  • Start and communicate with agents on remote nodes

Section 1: Review - Nodes and Cookies

🤔 Quick Recall from Session 15

# Before we get hands-on, recall the basics:

recall = %{
  what_is_a_node: """
    A running BEAM instance with a name.
    Started with: iex --sname mynode --cookie mysecret
    Node.self() returns the node's name.
  """,

  what_is_a_cookie: """
    A shared secret that nodes must have in common to connect.
    Think of it as a simple password for the cluster.
  """,

  how_to_connect: """
    Node.connect(:other_node@hostname)
    Returns true if connected, false if failed.
  """,

  location_transparency: """
    GenServer.call(pid, :msg)           # local
    GenServer.call({Name, node}, :msg)  # remote
    Same API, different locations. Code doesn't need to know.
  """
}

Section 2: Starting Two Nodes with agent_framework

Hands-On: Two Terminals

This section requires two terminal windows. You’ll start two instances of the agent_framework application and connect them.

# Terminal 1:
cd agent_framework
iex --sname node1 --cookie agent_secret -S mix

# Terminal 2:
cd agent_framework
iex --sname node2 --cookie agent_secret -S mix
# On node1, check your identity:
Node.self()
# => :node1@yourmachine

# On node2, connect to node1:
Node.connect(:node1@yourmachine)
# => true

# On either node, list connections:
Node.list()
# => [:node1@yourmachine] (from node2's perspective)
# => [:node2@yourmachine] (from node1's perspective)

🤔 What Just Happened?

# When you called Node.connect, the BEAM:
# 1. Looked up node1's address using epmd (Erlang Port Mapper Daemon)
# 2. Verified the cookie matched
# 3. Established a TCP connection
# 4. Made both nodes aware of each other

# Connection is bidirectional and transitive:
# If A connects to B, and B connects to C,
# then A is also connected to C!

# epmd runs automatically when you start a named node.
# It maps node names to ports on the local machine.

Section 3: Remote GenServer Calls

The {Module, node} Tuple

# On node1, start an agent:
{:ok, agent} = AgentFramework.AgentServer.start_link("Worker-1")
AgentFramework.AgentServer.remember(agent, :location, "node1")

# Get the agent's state locally:
AgentFramework.AgentServer.get_state(agent)
# => %{name: "Worker-1", status: :idle, ...}
# On node2, you can call the agent on node1.
# But first, you need a way to address it.

# Option A: Use the pid directly (if you have it)
# The pid from node1 works on node2 because PIDs are cluster-global.

# Option B: If the GenServer is registered with a name:
# GenServer.call({AgentFramework.AgentServer, :node1@yourmachine}, :get_state)
# This only works if the process is registered with that exact module name.

# Our agents aren't registered with module names, so we'll use pids
# or the AgentDirectory (which we built in Session 22).

🤔 Why PIDs Work Across Nodes

# Recall from Session 15:

pid_explanation = """
Local PID:  #PID<0.123.0>
Remote PID: #PID<12345.123.0>

The first number identifies the originating node.
0 means "local node".
Any other number is a remote node identifier.

When you send a message to a remote PID, the BEAM:
1. Identifies the target node from the PID
2. Serializes the message
3. Sends it over the TCP connection
4. Deserializes on the other end
5. Delivers to the process's mailbox

All transparent to your code!
"""

Section 4: :rpc.call/4 for Remote Execution

Running Code on Remote Nodes

# :rpc.call lets you execute any function on a remote node:
# :rpc.call(node, module, function, args)

# From node2, run code on node1:
:rpc.call(:node1@yourmachine, Node, :self, [])
# => :node1@yourmachine (executed ON node1, result returned to node2)

# More useful: list agents on a remote node
:rpc.call(:node1@yourmachine, AgentFramework.AgentSupervisor, :list_agents, [])
# => [#PID<0.234.0>]

# Get state of a remote agent
:rpc.call(:node1@yourmachine, AgentFramework.AgentSupervisor, :whereis, ["Worker-1"])
# => {:ok, #PID<0.234.0>}

🤔 When to Use :rpc.call vs GenServer.call

# :rpc.call(node, Module, :function, [args])
#   - Runs ANY function on the remote node
#   - Spawns a temporary process on the remote node
#   - Returns {:badrpc, reason} on failure
#   - Good for: one-off remote operations

# GenServer.call({Name, node}, message)
#   - Calls a SPECIFIC GenServer on the remote node
#   - Sends a message to an existing process
#   - Raises on timeout (5 second default)
#   - Good for: calling known services

# For our agent framework:
# - Use GenServer.call for direct agent communication
# - Use :rpc.call for discovery (finding agents on remote nodes)

comparison = %{
  rpc_call: %{
    use_when: "Discovery, one-off operations, running arbitrary code",
    failure_mode: "{:badrpc, reason}",
    overhead: "Spawns a temporary process"
  },
  genserver_call: %{
    use_when: "Communicating with a known GenServer",
    failure_mode: "Raises on timeout or noproc",
    overhead: "Direct message to existing process"
  }
}

Error Handling with RPC

# :rpc.call returns {:badrpc, reason} on failure
# This is different from GenServer.call which raises!

# Examples of :badrpc:
errors = %{
  node_down: ":rpc.call(:dead_node@host, ...) => {:badrpc, :nodedown}",
  bad_module: ":rpc.call(node, NonExistent, :foo, []) => {:badrpc, {:EXIT, ...}}",
  timeout: ":rpc.call(node, :timer, :sleep, [60000]) => {:badrpc, :timeout}"
}

# Always pattern match on the result:
case :rpc.call(:node1@yourmachine, Node, :self, []) do
  {:badrpc, reason} ->
    IO.puts("RPC failed: #{inspect(reason)}")

  result ->
    IO.puts("Got: #{inspect(result)}")
end

Section 5: Node Monitoring

:net_kernel.monitor_nodes/1

# You can subscribe to node connection events:
:net_kernel.monitor_nodes(true)

# Now your process receives:
# {:nodeup, node}   - when a node connects
# {:nodedown, node} - when a node disconnects

# Example: Monitor in IEx
receive do
  {:nodeup, node} -> IO.puts("Connected: #{node}")
  {:nodedown, node} -> IO.puts("Disconnected: #{node}")
after
  5000 -> IO.puts("No events in 5 seconds")
end

# Unsubscribe:
:net_kernel.monitor_nodes(false)

🤔 Why Monitor Nodes?

# Consider: You have agents distributed across 3 nodes.
# Node 2 crashes. What should happen to its agents?

monitoring_value = """
Without monitoring:
  - Calls to node2 agents silently fail
  - No one knows node2 is gone
  - AgentDirectory still has stale entries

With monitoring:
  - ClusterMonitor receives {:nodedown, :node2@host}
  - AgentDirectory removes node2's agents
  - PubSub broadcasts the event
  - Other nodes can react (restart agents, rebalance, etc.)
"""

Section 6: Building a ClusterMonitor GenServer

The Implementation

# Our ClusterMonitor (in agent_api/lib/agent_api/cluster/):

defmodule ClusterMonitorExample do
  use GenServer

  require Logger

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def connected_nodes do
    GenServer.call(__MODULE__, :connected_nodes)
  end

  def cluster_info do
    GenServer.call(__MODULE__, :cluster_info)
  end

  @impl true
  def init(_opts) do
    # Subscribe to node events
    :net_kernel.monitor_nodes(true)

    state = %{
      connected_nodes: Node.list(),
      self: Node.self()
    }

    Logger.info("ClusterMonitor started on #{Node.self()}")
    {:ok, state}
  end

  @impl true
  def handle_call(:connected_nodes, _from, state) do
    {:reply, state.connected_nodes, state}
  end

  def handle_call(:cluster_info, _from, state) do
    info = %{
      self: state.self,
      connected: state.connected_nodes,
      total: length(state.connected_nodes) + 1
    }
    {:reply, info, state}
  end

  @impl true
  def handle_info({:nodeup, node}, state) do
    Logger.info("Node connected: #{node}")
    new_nodes = Enum.uniq([node | state.connected_nodes])
    {:noreply, %{state | connected_nodes: new_nodes}}
  end

  def handle_info({:nodedown, node}, state) do
    Logger.warning("Node disconnected: #{node}")
    new_nodes = List.delete(state.connected_nodes, node)

    # Clean up agents from the disconnected node
    # AgentFramework.AgentDirectory.remove_node_agents(node)

    {:noreply, %{state | connected_nodes: new_nodes}}
  end

  @impl true
  def terminate(_reason, _state) do
    :net_kernel.monitor_nodes(false)
    :ok
  end
end

🤔 ClusterMonitor Design Decisions

# Why is ClusterMonitor a GenServer and not just :net_kernel.monitor_nodes?

design_reasoning = """
1. State tracking - GenServer maintains the current node list,
   so you don't have to call Node.list() every time.

2. Event handling - Can do side effects on node up/down:
   - Clean up AgentDirectory entries
   - Broadcast PubSub events
   - Trigger agent migration

3. Supervision - Under a supervisor, restarts if it crashes.
   Re-subscribes to node events on restart.

4. Query API - clean_info() returns a formatted snapshot
   of cluster state.
"""

Section 7: Starting Agents on Remote Nodes

Using DynamicSupervisor Remotely

# From node2, start an agent on node1:
:rpc.call(
  :node1@yourmachine,
  AgentFramework.AgentSupervisor,
  :start_agent,
  ["Remote-Worker"]
)
# => {:ok, #PID<12345.234.0>}

# The agent is supervised on node1, but we initiated from node2!

🤔 Where Should Agents Live?

# Consider these strategies for agent placement:

placement_strategies = %{
  local_only: %{
    description: "Start agents only on the requesting node",
    pros: ["Simple", "No network latency for local calls"],
    cons: ["Uneven distribution", "No redundancy"]
  },

  round_robin: %{
    description: "Distribute agents evenly across nodes",
    pros: ["Even load distribution", "Uses all resources"],
    cons: ["Network latency for some calls", "More complex routing"]
  },

  affinity: %{
    description: "Place related agents on the same node",
    pros: ["Fast communication between related agents", "Data locality"],
    cons: ["May cause hotspots", "Complex placement logic"]
  }
}

# For now, we start agents locally and let the router handle lookups.
# More sophisticated placement can come later.

Section 8: Interactive Exercises

Exercise 1: Two-Node Setup (Requires 2 Terminals)

# Terminal 1:
cd agent_framework
iex --sname node1 --cookie agent_secret -S mix

# Terminal 2:
cd agent_framework
iex --sname node2 --cookie agent_secret -S mix
# On node2, connect to node1:
Node.connect(:node1@yourmachine)

# Verify connection on both nodes:
Node.list()

# On node1, start an agent:
{:ok, w1} = AgentFramework.AgentSupervisor.start_agent("Worker-1")
AgentFramework.AgentServer.remember(w1, :origin, "node1")

# On node2, query the agent on node1:
:rpc.call(:node1@yourmachine, AgentFramework.AgentServer, :recall, [w1, :origin])
# => "node1"

# On node2, send a task to the agent on node1:
:rpc.call(:node1@yourmachine, AgentFramework.AgentServer, :send_task, [w1, :search, %{query: "distributed"}])
# => :ok

# On node1, process the task:
AgentFramework.AgentServer.process_next(w1)
# => {:ok, task, {:ok, "Search results for: distributed"}}

Exercise 2: Monitor Node Events

# On node1, set up a simple monitor:
:net_kernel.monitor_nodes(true)

# On node2, disconnect:
# Node.disconnect(:node1@yourmachine)

# On node1, check for events:
receive do
  {:nodedown, node} ->
    IO.puts("Lost connection to #{node}")
after
  10000 -> IO.puts("No events")
end

# Reconnect from node2:
# Node.connect(:node1@yourmachine)

# On node1:
receive do
  {:nodeup, node} ->
    IO.puts("Reconnected to #{node}")
after
  10000 -> IO.puts("No events")
end

# Clean up
:net_kernel.monitor_nodes(false)

Exercise 3: Cross-Node Agent Directory

# On node1, register an agent in the directory:
AgentFramework.AgentDirectory.register("Worker-1", Node.self(), w1)

# On node2, you can read the directory
# (if you started AgentDirectory on both nodes):
AgentFramework.AgentDirectory.lookup("Worker-1")
# Note: ETS is per-node! Node2's directory doesn't have this entry.
# This is why we need the AgentRouter (Session 25) to query remote nodes.

# Question: How would you sync directories across nodes?
# Think about it before Session 25!

Key Takeaways

  1. Named nodes connect with shared cookies - iex --sname node1 --cookie secret

  2. :rpc.call/4 runs code on remote nodes - Returns {:badrpc, reason} on failure

  3. GenServer.call works remotely - {Name, node} or remote pids

  4. :net_kernel.monitor_nodes/1 - Subscribe to node up/down events

  5. ClusterMonitor tracks cluster state - GenServer wrapping node monitoring

  6. ETS is per-node - AgentDirectory on node1 doesn’t see node2’s data without syncing


What’s Next?

In the next session, we’ll add Phoenix.PubSub for event broadcasting:

  • Subscribe to agent lifecycle events
  • Broadcast events across connected nodes automatically
  • Build an AgentEventLogger
  • PubSub works across nodes with zero configuration

Navigation

Previous: Session 22 - ETS

Next: Session 24 - Phoenix.PubSub