Clustering through a tunnel
Mix.install([{:muontrap, "~> 1.0"}, {:kino, "~> 0.19.0"}])
get_free_port = fn ->
{:ok, socket} = :gen_tcp.listen(0, active: false, reuseaddr: true)
{:ok, port} = :inet.port(socket)
:gen_tcp.close(socket)
port
end
wait_until = fn check, opts ->
max = Keyword.get(opts, :max, 30)
interval = Keyword.get(opts, :interval, 1000)
label = Keyword.get(opts, :label, "Waiting")
IO.write(label)
result =
Stream.repeatedly(fn ->
Process.sleep(interval)
IO.write(".")
check.()
end)
|> Stream.take(max)
|> Enum.find(& &1)
IO.puts("")
result
end
The idea
You can cluster two BEAM nodes through a tunnel using a custom EPMD module.
Prerequisite: a remote machine with Elixir, accessible via SSH
You need a remote machine that’s accessible via SSH from the machine that’s running this notebook.
Also, this remote machine needs Elixir installed.
Set your SSH user and a host you can already reach with ssh user@host
(key-based auth), and where Elixir is on PATH.
user_input = Kino.Input.text("SSH user")
host_input = Kino.Input.text("Remote host")
Kino.Layout.grid([user_input, host_input])
Verify the notebook can reach the remote and finds Elixir there:
user = Kino.Input.read(user_input) |> String.trim()
host = Kino.Input.read(host_input) |> String.trim()
if user == "" or host == "" do
Kino.interrupt!(:normal, "Fill in SSH user and Remote host above, then re-run.")
else
# `bash -ilc` runs the remote command in an interactive login shell so it
# picks up PATH from the user's .bashrc / .zshrc (version managers, etc.).
{out, status} =
System.cmd(
"ssh",
[
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=5",
"-o", "StrictHostKeyChecking=accept-new",
"#{user}@#{host}",
"bash -ilc 'elixir --version'"
],
stderr_to_stdout: true
)
clean =
out
|> String.split("\n")
|> Enum.reject(&(&1 =~ "cannot set terminal process group" or &1 =~ "no job control"))
|> Enum.join("\n")
|> String.trim()
if status == 0 do
IO.puts("Ready. user=#{user} host=#{host}")
IO.puts(clean)
else
IO.puts("SSH check failed (exit #{status}):")
IO.puts(clean)
IO.puts("")
IO.puts("Try:")
IO.puts(" • Does `ssh #{user}@#{host}` work manually?")
IO.puts(" • Is your key loaded? (ssh-add -l)")
IO.puts(" • Elixir loaded by your shell init (.bashrc/.zshrc)?")
end
end
Step 1: Start a node on the remote with a fixed distribution port
The node name encodes the local port we’ll use for the tunnel. That’s what makes the proxy trick work later.
cookie = :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
local_port = get_free_port.()
remote_port = 44444
# Erlang longnames (--name) requires a dotted host. Add `.local` if the
# user's host has none. ProxyEPMD ignores this anyway, so it's just a label.
node_host = if String.contains?(host, "."), do: host, else: "#{host}.local"
IO.puts("Node: remote_#{local_port}@#{node_host}")
IO.puts("Local tunnel port: #{local_port} (encoded in the node name)")
IO.puts("Remote fixed port: #{remote_port}")
# Kill any leftover `remote_*` beam from a previous run so our fixed port is free.
# SIGKILL (-9) avoids waiting on BEAM's graceful shutdown holding the port.
System.cmd(
"ssh",
["#{user}@#{host}", "pkill -9 -f 'beam.smp.*remote_' || true"],
stderr_to_stdout: true
)
# Slide 1's command (as `elixir`), wrapped in bash -ilc (PATH) + nohup (background).
remote_cmd =
"nohup elixir" <>
" --name remote_#{local_port}@#{node_host}" <>
" --cookie #{cookie}" <>
" --erl '-kernel inet_dist_listen_min #{remote_port} inet_dist_listen_max #{remote_port}'" <>
" -e 'receive do after 3_600_000 -> :ok end'" <>
" > /dev/null 2>&1 < /dev/null &"
{_, 0} =
System.cmd(
"ssh",
["#{user}@#{host}", "bash -ilc \"#{remote_cmd}\""],
stderr_to_stdout: true
)
wait_until.(fn ->
{out, _} =
System.cmd(
"ssh",
["#{user}@#{host}", "ss -tln | grep -q ':#{remote_port}' && echo yes || echo no"],
stderr_to_stdout: true
)
String.trim(out) == "yes"
end, label: "Waiting for remote node") || raise("Remote node failed to start")
IO.puts("Remote node listening on :#{remote_port}")
Step 2: Set up an SSH tunnel from a local port to the remote dist port
A standard ssh -L port forward, kept alive as a supervised daemon. If
this notebook crashes, the tunnel dies with it.
# Stop any tunnel from a previous run (registered name makes this idempotent).
if pid = Process.whereis(:proxy_tunnel), do: GenServer.stop(pid)
{:ok, tunnel} =
MuonTrap.Daemon.start_link(
"ssh",
[
"-o", "StrictHostKeyChecking=accept-new",
"-o", "ExitOnForwardFailure=yes",
"-N",
"-L", "#{local_port}:localhost:#{remote_port}",
"#{user}@#{host}"
],
name: :proxy_tunnel,
stderr_to_stdout: true
)
wait_until.(fn ->
match?(
{_, 0},
System.cmd("lsof", ["-iTCP:#{local_port}", "-sTCP:LISTEN"], stderr_to_stdout: true)
)
end, label: "Waiting for tunnel", interval: 200, max: 50) ||
raise("Tunnel failed to bind on port #{local_port}")
IO.puts("Tunnel ready: localhost:#{local_port} -> remote:#{remote_port}")
Step 3: Write a custom EPMD module to route through the tunnel
Two pattern matches do the work: port_please pulls the port out of the
node name, and address_please returns loopback so the BEAM dials the
tunnel. The rest is boilerplate delegating to :erl_epmd.
dir = Path.join(System.tmp_dir!(), "proxy_epmd_demo")
File.mkdir_p!(dir)
{:module, _, binary, _} =
defmodule ProxyEPMD do
@epmd_dist_version 6
# Extract the port from node names starting with "remote_".
def port_please(~c"remote_" ++ port_chars, _host, _timeout) do
{:port, List.to_integer(port_chars), @epmd_dist_version}
end
def port_please(name, host, timeout) do
:erl_epmd.port_please(name, host, timeout)
end
# Resolve "remote_" nodes to loopback. We're going through a tunnel.
def address_please(~c"remote_" ++ _, _host, :inet), do: {:ok, {127, 0, 0, 1}}
def address_please(~c"remote_" ++ _, _host, :inet6),
do: {:ok, {0, 0, 0, 0, 0, 0, 0, 1}}
def address_please(name, host, family) do
:erl_epmd.address_please(name, host, family)
end
# Boilerplate the slide hides. A real EPMD module needs these.
# Note: port_please/2 bridges to /3 so the "remote_" pattern match
# fires when BEAM calls port_please(name, host) after address_please.
defdelegate start_link(), to: :erl_epmd
def port_please(name, host), do: port_please(name, host, :infinity)
defdelegate register_node(name, port), to: :erl_epmd
defdelegate register_node(name, port, family), to: :erl_epmd
defdelegate listen_port_please(name, host), to: :erl_epmd
defdelegate names(host_name), to: :erl_epmd
end
File.write!(Path.join(dir, "Elixir.ProxyEPMD.beam"), binary)
Quick sanity check on the pattern matching:
IO.inspect(ProxyEPMD.port_please(~c"remote_9999", ~c"anything", 5000), label: "remote_9999")
IO.inspect(ProxyEPMD.address_please(~c"remote_9999", ~c"anything", :inet), label: "address")
Connect through the tunnel
Spawn a local node that uses ProxyEPMD, connect to the remote, and call
:inet.gethostname over there to confirm the traffic actually went through.
id = :crypto.strong_rand_bytes(4) |> Base.encode16(case: :lower)
target = "remote_#{local_port}@#{node_host}"
{output, _} =
System.cmd(
"elixir",
[
"--erl", "-epmd_module Elixir.ProxyEPMD -pa #{dir}",
"--name", "local_#{id}@127.0.0.1",
"--cookie", cookie,
"-e",
"""
target = :"#{target}"
IO.puts("Connecting to: \#{target}")
IO.puts("(ProxyEPMD extracts port #{local_port} from the name, resolves to 127.0.0.1)")
IO.puts("")
result = Node.connect(target)
IO.puts("Connect result: \#{result}")
if result do
{:ok, hostname} = :rpc.call(target, :inet, :gethostname, [])
IO.puts("Remote hostname (via rpc): \#{hostname}")
IO.puts("Remote node (via rpc): \#{:rpc.call(target, Node, :self, [])}")
end
System.halt(0)
"""
],
stderr_to_stdout: true
)
IO.puts(output)
The clean up
Stop the tunnel (MuonTrap gracefully kills the SSH child). Then kill the
remote node by matching its name. SIGHUP isn’t enough because the
elixir / erl wrappers fork beam.smp and exit, so it always gets
reparented to init.
if pid = Process.whereis(:proxy_tunnel), do: GenServer.stop(pid)
System.cmd(
"ssh",
["#{user}@#{host}", "pkill -9 -f 'remote_#{local_port}' || true"],
stderr_to_stdout: true
)
IO.puts("Tunnel stopped. Remote node killed.")