Orchestrator
Mix.install([
{:kino, "~> 0.19.0"}
])
Connect to Workers
# Livebook runtimes are separate Erlang nodes. We need to discover and connect to them.
# All runtimes started by this Livebook share the same cookie (LIVEBOOK_COOKIE),
# but they aren't connected by default. We connect via Node.connect/1.
defmodule Orchestrator do
@doc "Connect to all sibling Livebook runtime nodes"
def connect_siblings do
# Get our node name prefix (e.g., "livebook_epwxvykz")
self_name = node() |> Atom.to_string()
prefix = self_name |> String.split("--") |> hd()
# List all BEAM nodes visible via EPMD on localhost
{:ok, names} = :net_adm.names()
names
|> Enum.map(fn {name, _port} -> :"#{name}@127.0.0.1" end)
|> Enum.filter(fn n -> n != node() end)
|> Enum.filter(fn n -> n |> Atom.to_string() |> String.starts_with?(prefix) end)
|> Enum.each(fn n ->
case Node.connect(n) do
true -> IO.puts("Connected to #{n}")
_ -> IO.puts("Failed to connect to #{n}")
end
end)
end
@doc "Find a worker by global name, retrying a few times"
def find_worker(name, retries \\ 5) do
case :global.whereis_name(name) do
:undefined when retries > 0 ->
Process.sleep(1000)
find_worker(name, retries - 1)
:undefined ->
{:error, "Worker #{name} not found. Is the notebook running and registered?"}
pid ->
{:ok, node(pid)}
end
end
@doc "Call Flamingo to describe audio"
def ask_flamingo(_node, audio_path, prompt, opts \\ []) do
GenServer.call({:global, :flamingo_worker}, {:ask, audio_path, prompt, opts}, :infinity)
end
@doc "Call SAM to separate audio"
def separate_sam(_node, audio_path, description, opts \\ []) do
GenServer.call({:global, :sam_worker}, {:separate, audio_path, description, opts}, :infinity)
end
end
# First, connect to all sibling runtime nodes
Orchestrator.connect_siblings()
# Force global registry sync after connecting
:global.sync()
Process.sleep(1000)
# Now find registered workers
{:ok, flamingo_node} = Orchestrator.find_worker(:flamingo_worker)
{:ok, sam_node} = Orchestrator.find_worker(:sam_worker)
IO.puts("\nConnected to Flamingo on: #{flamingo_node}")
IO.puts("Connected to SAM on: #{sam_node}")
Upload Audio
audio_input = Kino.Input.file("Upload audio file (WAV, max 35s)")
%{file_ref: file_ref, client_name: client_name} = Kino.Input.read(audio_input)
contents = Kino.Input.file_path(file_ref) |> File.read!()
# Save to shared filesystem
work_dir = "/tmp/orchestrator"
File.mkdir_p!(work_dir)
audio_path = Path.join(work_dir, client_name)
File.write!(audio_path, contents)
# If not WAV, convert. If too long, trim to 30s.
wav_path = Path.join(work_dir, "input.wav")
{_, 0} =
System.cmd("ffmpeg", [
"-y", "-i", audio_path,
"-t", "30",
"-ar", "16000",
"-ac", "1",
wav_path
])
IO.puts("Audio ready at: #{wav_path}")
Step 1: Describe the Raw Audio
IO.puts("Asking Flamingo to describe the raw audio...")
raw_description = Orchestrator.ask_flamingo(
flamingo_node,
wav_path,
"Describe all the sounds you hear in this audio. List each distinct sound source."
)
IO.puts("Raw audio description:")
IO.puts(raw_description)
Step 2: Separate with SAM
# Use the first sound mentioned as the separation target,
# or ask the user what to separate
separation_target = Kino.Input.text("What sound to separate?", default: "speech")
target_description = Kino.Input.read(separation_target)
IO.puts("Separating: #{target_description}")
result = Orchestrator.separate_sam(sam_node, wav_path, target_description)
IO.puts("Target saved to: #{result.target}")
IO.puts("Residual saved to: #{result.residual}")
Step 3: Listen to Separated Audio
IO.puts("Target (#{target_description}):")
Kino.Audio.new(File.read!(result.target), :wav)
IO.puts("Residual (everything else):")
Kino.Audio.new(File.read!(result.residual), :wav)
Step 4: Describe the Residual
IO.puts("Asking Flamingo to describe what's left after separation...")
residual_description = Orchestrator.ask_flamingo(
flamingo_node,
result.residual,
"Describe all the sounds you hear in this audio. What remains after removing #{target_description}?"
)
IO.puts("Residual description:")
IO.puts(residual_description)