Livebook Cluster
_slug = "Livebook-1"
Mix.install(
[
{:libcluster, "~> 3.4.0"},
{:phoenix_pubsub, ">= 0.0.0"},
{:kino, "~> 0.14.0"}
],
config: [
livebook_cluster: [slug: slug]
]
)
Cluster (uses libcluster)
defmodule Cluster do
@cluster __MODULE__
@moduledoc """
Documentation for `LivebookCluster`.
"""
def libcluster_spec do
topologies = [
local_epmd: [
strategy: Elixir.Cluster.Strategy.LocalEpmd
]
]
{Cluster.Supervisor, [topologies, [name: @cluster]]}
end
def slug do
Application.get_env(:livebook_cluster, :slug)
end
def node_info do
{slug(), node(), nodes()}
end
def find_node(slug) when is_binary(slug) do
Enum.find(nodes(), & &1.slug == slug)
end
def rpc(slug, {mod, fun, args}) do
with dest <- find_node(slug) do
dest && :rpc.call(dest.node, mod, fun, args)
end
end
def rpc_all({mod, fun, args}) do
Enum.map(nodes(), & :rpc.call(&1.node, mod, fun, args))
end
def nodes do
Node.list(:connected)
|> Enum.map(& Atom.to_string(&1))
|> Enum.filter(& Regex.match?(~r[livebook_\w{8}--\w{8}@], &1))
|> Enum.map(& String.to_atom(&1))
|> Enum.map(fn node ->
%{node: node, slug: :rpc.call(node, LivebookCluster, :slug, [])}
end)
end
def livebook_node do
node = Node.list(:connected)
|> Enum.map(& Atom.to_string(&1))
|> Enum.find(& Regex.match?(~r[livebook_\w{8}@], &1))
if node do
%{node: node, slug: "livebook"}
end
end
end
Pub-Sub (using Phoenix.PubSub)
defmodule PubSub do
@pubsub __MODULE__
def phoenix_pubsub_spec do
{Phoenix.PubSub, name: @pubsub}
end
def subscribe(topic) do
Phoenix.PubSub.subscribe(@pubsub, topic)
end
def broadcast(topic, message) do
local_broadcast(topic, message)
Cluster.rpc_all({PubSub, :local_broadcast, [topic, message]})
end
def local_broadcast(topic, message) do
Phoenix.PubSub.broadcast!(@pubsub, topic, message)
end
end
Test it!
children = [
Cluster.libcluster_spec(),
PubSub.phoenix_pubsub_spec()
]
opts = [strategy: :one_for_one, name: Test.Supervisor]
Supervisor.start_link(children, opts)
defmodule Test do
@topic "my-livebooks-collection"
use GenServer
def start_link do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def init(_) do
{:ok, PubSub.subscribe(@topic)}
end
def ping! do
from = Cluster.slug()
PubSub.broadcast(@topic, {:ping, from})
end
def handle_info({:ping, from}, _) do
IO.puts("#{inspect self()} Received ping from #{from}")
{:noreply, nil}
end
end
Test.start_link()
ping = Kino.Control.button("Ping!")
Kino.listen(ping, fn _event ->
Test.ping!
end)
ping