Powered by AppSignal & Oban Pro

Orchestrator

Orchestrator.livemd

Orchestrator

Mix.install([
  {:kino, "~> 0.19.0"}
])

Connect to Workers

# Livebook runtimes are separate Erlang nodes. We need to discover and connect to them.
# All runtimes started by this Livebook share the same cookie (LIVEBOOK_COOKIE),
# but they aren't connected by default. We connect via Node.connect/1.

defmodule Orchestrator do
  @doc "Connect to all sibling Livebook runtime nodes"
  def connect_siblings do
    # Get our node name prefix (e.g., "livebook_epwxvykz")
    self_name = node() |> Atom.to_string()
    prefix = self_name |> String.split("--") |> hd()

    # List all BEAM nodes visible via EPMD on localhost
    {:ok, names} = :net_adm.names()

    names
    |> Enum.map(fn {name, _port} -> :"#{name}@127.0.0.1" end)
    |> Enum.filter(fn n -> n != node() end)
    |> Enum.filter(fn n -> n |> Atom.to_string() |> String.starts_with?(prefix) end)
    |> Enum.each(fn n ->
      case Node.connect(n) do
        true -> IO.puts("Connected to #{n}")
        _ -> IO.puts("Failed to connect to #{n}")
      end
    end)
  end

  @doc "Find a worker by global name, retrying a few times"
  def find_worker(name, retries \\ 5) do
    case :global.whereis_name(name) do
      :undefined when retries > 0 ->
        Process.sleep(1000)
        find_worker(name, retries - 1)

      :undefined ->
        {:error, "Worker #{name} not found. Is the notebook running and registered?"}

      pid ->
        {:ok, node(pid)}
    end
  end

  @doc "Call Flamingo to describe audio"
  def ask_flamingo(_node, audio_path, prompt, opts \\ []) do
    GenServer.call({:global, :flamingo_worker}, {:ask, audio_path, prompt, opts}, :infinity)
  end

  @doc "Call SAM to separate audio"
  def separate_sam(_node, audio_path, description, opts \\ []) do
    GenServer.call({:global, :sam_worker}, {:separate, audio_path, description, opts}, :infinity)
  end
end

# First, connect to all sibling runtime nodes
Orchestrator.connect_siblings()

# Force global registry sync after connecting
:global.sync()
Process.sleep(1000)

# Now find registered workers
{:ok, flamingo_node} = Orchestrator.find_worker(:flamingo_worker)
{:ok, sam_node} = Orchestrator.find_worker(:sam_worker)

IO.puts("\nConnected to Flamingo on: #{flamingo_node}")
IO.puts("Connected to SAM on: #{sam_node}")

Upload Audio

audio_input = Kino.Input.file("Upload audio file (WAV, max 35s)")
%{file_ref: file_ref, client_name: client_name} = Kino.Input.read(audio_input)
contents = Kino.Input.file_path(file_ref) |> File.read!()

# Save to shared filesystem
work_dir = "/tmp/orchestrator"
File.mkdir_p!(work_dir)
audio_path = Path.join(work_dir, client_name)
File.write!(audio_path, contents)

# If not WAV, convert. If too long, trim to 30s.
wav_path = Path.join(work_dir, "input.wav")

{_, 0} =
  System.cmd("ffmpeg", [
    "-y", "-i", audio_path,
    "-t", "30",
    "-ar", "16000",
    "-ac", "1",
    wav_path
  ])

IO.puts("Audio ready at: #{wav_path}")

Step 1: Describe the Raw Audio

IO.puts("Asking Flamingo to describe the raw audio...")

raw_description = Orchestrator.ask_flamingo(
  flamingo_node,
  wav_path,
  "Describe all the sounds you hear in this audio. List each distinct sound source."
)

IO.puts("Raw audio description:")
IO.puts(raw_description)

Step 2: Separate with SAM

# Use the first sound mentioned as the separation target,
# or ask the user what to separate
separation_target = Kino.Input.text("What sound to separate?", default: "speech")
target_description = Kino.Input.read(separation_target)
IO.puts("Separating: #{target_description}")

result = Orchestrator.separate_sam(sam_node, wav_path, target_description)

IO.puts("Target saved to: #{result.target}")
IO.puts("Residual saved to: #{result.residual}")

Step 3: Listen to Separated Audio

IO.puts("Target (#{target_description}):")
Kino.Audio.new(File.read!(result.target), :wav)
IO.puts("Residual (everything else):")
Kino.Audio.new(File.read!(result.residual), :wav)

Step 4: Describe the Residual

IO.puts("Asking Flamingo to describe what's left after separation...")

residual_description = Orchestrator.ask_flamingo(
  flamingo_node,
  result.residual,
  "Describe all the sounds you hear in this audio. What remains after removing #{target_description}?"
)

IO.puts("Residual description:")
IO.puts(residual_description)