Powered by AppSignal & Oban Pro

Boombox AI examples

examples/ai.livemd

Boombox AI examples

Logger.configure(level: :info)

# In case of problems installing Nx/EXLA/Bumblebee,
# you can remove them and the Nx backend config below.
# Examples that don't mention them should still work.

# MIX_INSTALL_CONFIG_BEGIN
boombox = {:boombox, github: "membraneframework/boombox"}

# This livebook uses boombox from the master branch. If any examples happen to not work, the latest stable version of this livebook
# can be found on https://hexdocs.pm/boombox/ai.html or in the latest github release.
# MIX_INSTALL_CONFIG_END

Mix.install([
  boombox,
  :kino,
  :nx,
  :exla,
  :bumblebee,
  :websockex,
  :membrane_simple_rtsp_server,
  {:coerce, ">= 1.0.2"}
])

Nx.global_default_backend(EXLA.Backend)

# HTTP server for assets
data_dir = "/tmp/boombox_examples_data"

# match in case a dependency already started :inets
case :inets.start() do
  :ok -> :ok
  {:error, {:already_started, :inets}} -> :ok
  err -> raise "Unexpected value returned by :inets.start/0: #{inspect(err)}"
end

case :inets.start(:httpd,
  bind_address: ~c"localhost",
  port: 1234,
  document_root: ~c"#{data_dir}",
  server_name: ~c"assets_server",
  server_root: ~c"/tmp",
  erl_script_nocache: true
) do
  {:ok, _server} -> :ok
  # port already in use — server likely started from another livebook
  {:error, _} -> :ok
end

Setup

👋 Here are some AI examples of using Boombox, covering LLM voice interfaces, speech transcription, and ML-driven video processing.

The cell below downloads assets to be used in the examples. The setup cell started an HTTP server on port 1234 that will serve static HTML files for sending/receiving the stream in the browser.

samples_url = "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples"

assets_url =
  "https://raw.githubusercontent.com/membraneframework/boombox/master/examples/data"

for asset <- ["webrtc_from_browser", "webrtc_to_browser", "talk_to_llm"],
    path = "#{data_dir}/#{asset}.html",
    not File.exists?(path) do
  %{status: 200, body: data} = Req.get!("#{assets_url}/#{asset}.html")
  File.write!(path, data)
end

Not hot dog

Inspired by Silicon Valley’s Not hot dog app, and Evadne Wu’s talk.

To send the stream, visit http://localhost:1234/webrtc_from_browser.html.

model_name = "google/vit-base-patch16-224"
{:ok, resnet} = Bumblebee.load_model({:hf, model_name})
{:ok, featurizer} = Bumblebee.load_featurizer({:hf, model_name})
serving = Bumblebee.Vision.image_classification(resnet, featurizer)
:ok
frame = Kino.Frame.new()
Kino.render(frame)

Boombox.run(
  input: {:webrtc, "ws://localhost:8829"},
  output: {:stream, video: :image, audio: false}
)
|> Stream.take_every(10)
|> Stream.map(fn frame ->
  tensor =
    frame.payload
    |> Image.thumbnail!(224)
    |> Image.embed!(224, 224)
    |> Image.to_nx!()

  hot_dog =
    Nx.Serving.run(serving, tensor).predictions
    |> Enum.find(fn p -> String.contains?(p.label, "hotdog") end)

  if hot_dog do
    "## ✅ Hotdog"
  else
    "## ❌ Not hotdog"
  end
end)
|> Enum.each(fn text -> Kino.Frame.render(frame, Kino.Markdown.new(text)) end)

Read speech audio from MP4 chunk-by-chunk, generate transcription

{:ok, whisper} = Bumblebee.load_model({:hf, "openai/whisper-tiny"})
{:ok, featurizer} = Bumblebee.load_featurizer({:hf, "openai/whisper-tiny"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai/whisper-tiny"})
{:ok, generation_config} = Bumblebee.load_generation_config({:hf, "openai/whisper-tiny"})

serving =
  Bumblebee.Audio.speech_to_text_whisper(
    whisper,
    featurizer,
    tokenizer,
    generation_config,
    defn_options: [compiler: EXLA]
  )

Boombox.run(
  input: "#{samples_url}/sherlock_librivox.mp4",
  output:
    {:stream,
     video: false, audio: :binary, audio_rate: 16_000, audio_channels: 1, audio_format: :f32le}
)
|> Stream.map(&amp;Nx.from_binary(&amp;1.payload, :f32))
|> Stream.chunk_every(200)
|> Enum.each(fn chunk ->
  batch = Nx.concatenate(chunk)

  Nx.Serving.run(serving, batch).chunks
  |> Enum.map_join(&amp; &amp;1.text)
  |> IO.puts()
end)

Receive speech audio via WebRTC, generate live transcription

To send the stream, visit http://localhost:1234/webrtc_from_browser.html

{:ok, whisper} = Bumblebee.load_model({:hf, "openai/whisper-tiny"})
{:ok, featurizer} = Bumblebee.load_featurizer({:hf, "openai/whisper-tiny"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai/whisper-tiny"})
{:ok, generation_config} = Bumblebee.load_generation_config({:hf, "openai/whisper-tiny"})

serving =
  Bumblebee.Audio.speech_to_text_whisper(
    whisper,
    featurizer,
    tokenizer,
    generation_config,
    defn_options: [compiler: EXLA]
  )

Boombox.run(
  input: {:webrtc, "ws://localhost:8829"},
  output:
    {:stream,
     video: false, audio: :binary, audio_rate: 16_000, audio_channels: 1, audio_format: :f32le}
)
|> Stream.map(&amp;Nx.from_binary(&amp;1.payload, :f32))
|> Stream.chunk_every(200)
|> Enum.each(fn chunk ->
  batch = Nx.concatenate(chunk)

  Nx.Serving.run(serving, batch).chunks
  |> Enum.map_join(&amp; &amp;1.text)
  |> IO.puts()
end)

Talk to Chat GPT

This example lets you perform a natural conversation with Chat GPT using voice. Boombox is used to deliver audio between the browser and server. It uses WebRTC, which is probably the best option for this case.

The module below is a simple interface to the OpenAI realtime audio API. It accepts PCM audio (1 channel, 24kHz, s16le) and responds in the same format. Thanks to that, we don’t need to do speech to text nor text to speech. This results in very low latency and simple logic.

If you prefer open source solutions, there’s Ultravox, but while it accepts audio, it outputs text for now, so you’d need TTS. If there’s anything else we should link here, please open a PR.

defmodule OpenAIWebSocket do
  use WebSockex
  require Logger

  def start_link(opts) do
    # OpenAI API docs: https://platform.openai.com/docs/guides/realtime
    WebSockex.start_link(
      "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01",
      __MODULE__,
      %{response: <<>>},
      extra_headers: [
        {"Authorization", "Bearer " <> opts[:token]},
        {"OpenAI-Beta", "realtime=v1"}
      ]
    )
  end

  def send_audio(ws, audio) do
    audio = Base.encode64(audio)
    frame = %{type: "input_audio_buffer.append", audio: audio} |> Jason.encode!()
    WebSockex.send_frame(ws, {:text, frame})
  end

  def get_response_chunk(ws, chunk_byte_size) do
    # There's no 'call' in WebSockex, so we just send and receive
    send(ws, {:get_response_chunk, chunk_byte_size, self()})

    receive do
      {:response_chunk, chunk} -> chunk
    end
  end

  @impl true
  def handle_frame({:text, frame}, state) do
    case Jason.decode!(frame) do
      %{"type" => "response.audio.delta", "delta" => delta} ->
        audio_payload = Base.decode64!(delta)
        # Buffer the response audio
        response = state.response <> audio_payload
        {:ok, %{state | response: response}}

      %{"type" => "input_audio_buffer.speech_started"} ->
        # If the user speaks, they may interrupt the current response,
        # so we drop it and wait for a new one.
        {:ok, %{state | response: <<>>}}

      %{"type" => "response.audio_transcript.done", "transcript" => transcript} ->
        Logger.info("AI transcription: #{transcript}")
        {:ok, state}

      %{} = _event ->
        {:ok, state}
    end
  end

  @impl true
  def handle_frame(_frame, state), do: {:ok, state}

  @impl true
  def handle_info({:get_response_chunk, size, pid}, state) do
    case state.response do
      <> ->
        # If we have enough data, send it back
        send(pid, {:response_chunk, chunk})
        {:ok, %{state | response: rest}}

      chunk ->
        # Otherwise, send what we have, padded with silence
        silence = <<0::size(size - byte_size(chunk))-unit(8)>>
        send(pid, {:response_chunk, chunk <> silence})
        {:ok, %{state | response: <<>>}}
    end
  end
end

In the cell below, we receive stream from the browser via WebRTC, feed it to the API, receive response and send it back to the browser. You need to add the Open AI API token as a OPEN_AI_TOKEN secret in Livebook for this to work. To connect via WebRTC, visit http://localhost:1234/talk_to_llm.html after running this cell

{:ok, ws} = OpenAIWebSocket.start_link(token: System.fetch_env!("LB_OPEN_AI_TOKEN"))

# Ingress part
Task.start_link(fn ->
  Boombox.run(
    # Connect to the browser via WebRTC, using WebSocket for session establishment
    input: {:webrtc, "ws://localhost:8829"},
    output: {
      :stream,
      # Audio format that the OpenAI API expects
      video: false, audio: :binary, audio_format: :s16le, audio_channels: 1, audio_rate: 24_000
    }
  )
  |> Enum.each(fn packet -> OpenAIWebSocket.send_audio(ws, packet.payload) end)
end)

# Egress part

# We send 20 millisecond chunks to Boombox
chunk_duration_ms = 20
# Samples per second * bytes per sample * chunk duration in seconds
chunk_byte_size = trunc(24_000 * 2 * chunk_duration_ms / 1_000)

Stream.interval(chunk_duration_ms)
# This emits the current time in milliseconds (0, 20, 40, 60...) every 20ms
|> Stream.map(&amp;(&amp;1 * chunk_duration_ms))
|> Stream.map(fn time ->
  response_chunk = OpenAIWebSocket.get_response_chunk(ws, chunk_byte_size)

  %Boombox.Packet{
    payload: response_chunk,
    kind: :audio,
    pts: Membrane.Time.milliseconds(time),
    # Audio format that the OpenAI API outputs
    format: %{audio_format: :s16le, audio_channels: 1, audio_rate: 24_000}
  }
end)
|> Boombox.run(
  input: {:stream, audio: :binary, video: false},
  # Connect to the browser via WebRTC, using WebSocket for session establishment
  output: {:webrtc, "ws://localhost:8830"}
)