Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Livebook Cluster

livebooks/livebook_cluster.livemd

Livebook Cluster

_slug = "Livebook-1"

Mix.install(
  [
    {:libcluster, "~> 3.4.0"},
    {:phoenix_pubsub, ">= 0.0.0"},
    {:kino, "~> 0.14.0"}
  ],
  config: [
    livebook_cluster: [slug: slug]
  ]
)

Cluster (uses libcluster)

defmodule Cluster do
  @cluster __MODULE__
  @moduledoc """
  Documentation for `LivebookCluster`.
  """

  def libcluster_spec do
    topologies = [
      local_epmd: [
        strategy: Elixir.Cluster.Strategy.LocalEpmd
      ]
    ]
    {Cluster.Supervisor, [topologies, [name: @cluster]]}
  end

  def slug do
    Application.get_env(:livebook_cluster, :slug)
  end

  def node_info do
    {slug(), node(), nodes()}
  end

  def find_node(slug) when is_binary(slug) do
    Enum.find(nodes(), & &1.slug == slug)
  end

  def rpc(slug, {mod, fun, args}) do
    with dest <- find_node(slug) do
      dest &amp;&amp; :rpc.call(dest.node, mod, fun, args)
    end
  end

  def rpc_all({mod, fun, args}) do
    Enum.map(nodes(), &amp; :rpc.call(&amp;1.node, mod, fun, args))
  end

  def nodes do
    Node.list(:connected)
    |> Enum.map(&amp; Atom.to_string(&amp;1))
    |> Enum.filter(&amp; Regex.match?(~r[livebook_\w{8}--\w{8}@], &amp;1))
    |> Enum.map(&amp; String.to_atom(&amp;1))
    |> Enum.map(fn node ->
      %{node: node, slug: :rpc.call(node, LivebookCluster, :slug, [])}
    end)
  end

  def livebook_node do
    node = Node.list(:connected)
    |> Enum.map(&amp; Atom.to_string(&amp;1))
    |> Enum.find(&amp; Regex.match?(~r[livebook_\w{8}@], &amp;1))

    if node do
      %{node: node, slug: "livebook"}
    end
  end
end

Pub-Sub (using Phoenix.PubSub)

defmodule PubSub do
  @pubsub __MODULE__

  def phoenix_pubsub_spec do
    {Phoenix.PubSub, name: @pubsub}
  end

  def subscribe(topic) do
    Phoenix.PubSub.subscribe(@pubsub, topic)
  end

  def broadcast(topic, message) do
    local_broadcast(topic, message)
    Cluster.rpc_all({PubSub, :local_broadcast, [topic, message]})
  end

  def local_broadcast(topic, message) do
    Phoenix.PubSub.broadcast!(@pubsub, topic, message)
  end
end

Test it!

children = [
  Cluster.libcluster_spec(),
  PubSub.phoenix_pubsub_spec()
]
opts = [strategy: :one_for_one, name: Test.Supervisor]
Supervisor.start_link(children, opts)
defmodule Test do
  @topic "my-livebooks-collection"
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def init(_) do
    {:ok, PubSub.subscribe(@topic)}
  end

  def ping! do
    from = Cluster.slug()
    PubSub.broadcast(@topic, {:ping, from})
  end

  def handle_info({:ping, from}, _) do
    IO.puts("#{inspect self()} Received ping from #{from}")
    {:noreply, nil}
  end
end

Test.start_link()
ping = Kino.Control.button("Ping!")
Kino.listen(ping, fn _event ->
  Test.ping!
end)

ping