Powered by AppSignal & Oban Pro

Boombox stream processing examples

examples/stream_processing.livemd

Boombox stream processing examples

Logger.configure(level: :info)

# In case of problems installing Nx/EXLA/Bumblebee,
# you can remove them and the Nx backend config below.
# Examples that don't mention them should still work.

# MIX_INSTALL_CONFIG_BEGIN
boombox = {:boombox, github: "membraneframework/boombox"}

# This livebook uses boombox from the master branch. If any examples happen to not work, the latest stable version of this livebook
# can be found on https://hexdocs.pm/boombox/stream_processing.html or in the latest github release.
# MIX_INSTALL_CONFIG_END

Mix.install([
  boombox,
  :kino,
  :nx,
  :exla,
  :bumblebee,
  :websockex,
  :membrane_simple_rtsp_server,
  {:coerce, ">= 1.0.2"}
])

Nx.global_default_backend(EXLA.Backend)

# HTTP server for assets
data_dir = "/tmp/boombox_examples_data"
input_dir = "#{data_dir}/input"
File.mkdir_p!(input_dir)
out_dir = "#{data_dir}/output"
File.mkdir_p!(out_dir)

# match in case a dependency already started :inets
case :inets.start() do
  :ok -> :ok
  {:error, {:already_started, :inets}} -> :ok
  err -> raise "Unexpected value returned by :inets.start/0: #{inspect(err)}"
end

case :inets.start(:httpd,
  bind_address: ~c"localhost",
  port: 1234,
  document_root: ~c"#{data_dir}",
  server_name: ~c"assets_server",
  server_root: ~c"/tmp",
  erl_script_nocache: true
) do
  {:ok, _server} -> :ok
  # port already in use — server likely started from another livebook
  {:error, _} -> :ok
end

Setup

👋 Here are some stream processing examples of using Boombox, showing how to generate custom packet streams, read and manipulate individual frames, and integrate Boombox within larger Membrane pipelines.

The cell below downloads assets to be used in the examples. The setup cell started an HTTP server on port 1234 that will serve static HTML files for sending/receiving the stream in the browser.

samples_url = "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples"

for {filename, remote} <- [
      {"bun.mp4", "big-buck-bunny/bun33s.mp4"},
      {"ffmpeg-testsrc.mp4", "ffmpeg-testsrc-480x270.mp4"}
    ],
    path = "#{input_dir}/#{filename}",
    not File.exists?(path) do
  %{status: 200, body: data} = Req.get!("#{samples_url}/#{remote}")
  File.write!(path, data)
end

assets_url =
  "https://raw.githubusercontent.com/membraneframework/boombox/master/examples/data"

for asset <- ["hls", "webrtc_to_browser"],
    path = "#{data_dir}/#{asset}.html",
    not File.exists?(path) do
  %{status: 200, body: data} = Req.get!("#{assets_url}/#{asset}.html")
  File.write!(path, data)
end

Generate a bouncing logo video, stream it via WebRTC

To receive the stream, visit http://localhost:1234/webrtc_to_browser.html

overlay =
  Req.get!("https://avatars.githubusercontent.com/u/25247695?s=200&v=4").body
  |> Vix.Vips.Image.new_from_buffer()
  |> then(fn {:ok, img} -> img end)
  |> Image.trim!()
  |> Image.thumbnail!(100)

bg = Image.new!(640, 480, color: :light_gray)
max_x = Image.width(bg) - Image.width(overlay)
max_y = Image.height(bg) - Image.height(overlay)

Stream.iterate({_x = 300, _y = 0, _dx = 1, _dy = 2, _pts = 0}, fn {x, y, dx, dy, pts} ->
  dx = if (x + dx) in 0..max_x, do: dx, else: -dx
  dy = if (y + dy) in 0..max_y, do: dy, else: -dy
  pts = pts + div(Membrane.Time.seconds(1), _fps = 60)
  {x + dx, y + dy, dx, dy, pts}
end)
|> Stream.map(fn {x, y, _dx, _dy, pts} ->
  img = Image.compose!(bg, overlay, x: x, y: y)
  %Boombox.Packet{kind: :video, payload: img, pts: pts}
end)
|> Boombox.run(
  input: {:stream, video: :image, audio: false},
  output: {:webrtc, "ws://localhost:8830"}
)

Compose two streams side by side, broadcast via HLS

To receive the stream, visit http://localhost:1234/hls.html after running the cells below

The first cell uses :reader and :writer endpoints to communicate with boombox. In this configuration the process calling Boombox.read/1 controls when packets are being provided.

input1 = "#{input_dir}/bun.mp4"
input2 = "#{input_dir}/ffmpeg-testsrc.mp4"
output = "#{out_dir}/index.m3u8"

reader1 = Boombox.run(input: input1, output: {:reader, video: :image, audio: false})

reader2 = Boombox.run(input: input2, output: {:reader, video: :image, audio: false})

writer = Boombox.run(input: {:writer, video: :image, audio: false}, output: output)

Stream.repeatedly(fn ->
  case {Boombox.read(reader1), Boombox.read(reader2)} do
    {{:ok, packet1}, {:ok, packet2}} ->
      joined_image =
        Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)

      packet = %Boombox.Packet{
        pts: max(packet1.pts, packet2.pts),
        payload: joined_image,
        kind: :video
      }

      Boombox.write(writer, packet)

    _finished ->
      :eos
  end
end)
|> Enum.find(&amp;(&amp;1 == :eos))

Boombox.close(writer)
Boombox.close(reader1)
Boombox.close(reader2)

The second cell uses :message endpoints, meaning that the server communicates with boomboxes by exchanging messages. A consequence of this is that the inputting boomboxes will control the pace of providing the packets to the server, what can be useful in some circumstances:

defmodule MyServer do
  use GenServer

  def start(args) do
    GenServer.start(__MODULE__, args)
  end

  @impl true
  def init(args) do
    boombox1 = Boombox.run(input: args.input1, output: {:message, video: :image, audio: false})
    boombox2 = Boombox.run(input: args.input2, output: {:message, video: :image, audio: false})

    output_writer =
      Boombox.run(input: {:writer, video: :image, audio: false}, output: args.output)

    {:ok,
     %{
       boombox_states: %{
         boombox1: %{last_packet: nil, eos: false},
         boombox2: %{last_packet: nil, eos: false}
       },
       boomboxes: %{boombox1 => :boombox1, boombox2 => :boombox2},
       output_writer: output_writer
     }}
  end

  @impl true
  def handle_info({:boombox_packet, bb, %Boombox.Packet{} = packet}, state) do
    boombox_id = state.boomboxes[bb]
    state = put_in(state.boombox_states[boombox_id].last_packet, packet)

    if Enum.all?(Map.values(state.boombox_states), &amp;(&amp;1.last_packet != nil)) do
      joined_image =
        Vix.Vips.Operation.join!(
          state.boombox_states.boombox1.last_packet.payload,
          state.boombox_states.boombox2.last_packet.payload,
          :VIPS_DIRECTION_HORIZONTAL
        )

      pts =
        max(
          state.boombox_states.boombox1.last_packet.pts,
          state.boombox_states.boombox2.last_packet.pts
        )

      packet = %Boombox.Packet{packet | payload: joined_image, pts: pts}

      Boombox.write(state.output_writer, packet)
    end

    {:noreply, state}
  end

  @impl true
  def handle_info({:boombox_finished, bb}, state) do
    boombox_id = state.boomboxes[bb]
    state = put_in(state.boombox_states[boombox_id].eos, true)

    if Enum.all?(Map.values(state.boombox_states), &amp; &amp;1.eos) do
      Boombox.close(state.output_writer)
      {:stop, :normal, state}
    else
      {:noreply, state}
    end
  end
end

input1 = "#{input_dir}/bun.mp4"
input2 = "#{input_dir}/ffmpeg-testsrc.mp4"
output = "#{out_dir}/index.m3u8"

{:ok, server} = MyServer.start(%{input1: input1, input2: input2, output: output})
monitor = Process.monitor(server)

receive do
  {:DOWN, ^monitor, :process, ^server, reason} ->
    :ok
end

Use Boombox.Bin to discard video from MP4 and to stream audio via WebRTC at the same time

To receive the stream, visit http://localhost:1234/webrtc_to_browser.html after running the cell below.

defmodule VideoDiscardingPipeline do
  use Membrane.Pipeline

  @impl true
  def handle_init(_ctx, opts) do
    spec = [
      child(:input_boombox, %Boombox.Bin{
        input: opts[:boombox_input]
      })
      |> via_out(:output, options: [kind: :audio])
      |> via_in(:input, options: [kind: :audio])
      |> child(:output_boombox, %Boombox.Bin{
        output: opts[:boombox_output]
      }),
      get_child(:input_boombox)
      |> via_out(:output, options: [kind: :video])
      |> child(Membrane.Fake.Sink)
    ]

    {[spec: spec], %{}}
  end
end

{:ok, supervisor, _pipeline} =
  Membrane.Pipeline.start_link(VideoDiscardingPipeline,
    boombox_input: "#{input_dir}/bun.mp4",
    boombox_output: {:webrtc, "ws://localhost:8830"}
  )

monitor_ref = Process.monitor(supervisor)

receive do
  {:DOWN, ^monitor_ref, :process, _pid, _reason} -> :ok
end