Boombox stream processing examples
Logger.configure(level: :info)
# In case of problems installing Nx/EXLA/Bumblebee,
# you can remove them and the Nx backend config below.
# Examples that don't mention them should still work.
# MIX_INSTALL_CONFIG_BEGIN
boombox = {:boombox, github: "membraneframework/boombox"}
# This livebook uses boombox from the master branch. If any examples happen to not work, the latest stable version of this livebook
# can be found on https://hexdocs.pm/boombox/stream_processing.html or in the latest github release.
# MIX_INSTALL_CONFIG_END
Mix.install([
boombox,
:kino,
:nx,
:exla,
:bumblebee,
:websockex,
:membrane_simple_rtsp_server,
{:coerce, ">= 1.0.2"}
])
Nx.global_default_backend(EXLA.Backend)
# HTTP server for assets
data_dir = "/tmp/boombox_examples_data"
input_dir = "#{data_dir}/input"
File.mkdir_p!(input_dir)
out_dir = "#{data_dir}/output"
File.mkdir_p!(out_dir)
# match in case a dependency already started :inets
case :inets.start() do
:ok -> :ok
{:error, {:already_started, :inets}} -> :ok
err -> raise "Unexpected value returned by :inets.start/0: #{inspect(err)}"
end
case :inets.start(:httpd,
bind_address: ~c"localhost",
port: 1234,
document_root: ~c"#{data_dir}",
server_name: ~c"assets_server",
server_root: ~c"/tmp",
erl_script_nocache: true
) do
{:ok, _server} -> :ok
# port already in use — server likely started from another livebook
{:error, _} -> :ok
end
Setup
👋 Here are some stream processing examples of using Boombox, showing how to generate custom packet streams, read and manipulate individual frames, and integrate Boombox within larger Membrane pipelines.
The cell below downloads assets to be used in the examples. The setup cell started an HTTP server on port 1234 that will serve static HTML files for sending/receiving the stream in the browser.
samples_url = "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples"
for {filename, remote} <- [
{"bun.mp4", "big-buck-bunny/bun33s.mp4"},
{"ffmpeg-testsrc.mp4", "ffmpeg-testsrc-480x270.mp4"}
],
path = "#{input_dir}/#{filename}",
not File.exists?(path) do
%{status: 200, body: data} = Req.get!("#{samples_url}/#{remote}")
File.write!(path, data)
end
assets_url =
"https://raw.githubusercontent.com/membraneframework/boombox/master/examples/data"
for asset <- ["hls", "webrtc_to_browser"],
path = "#{data_dir}/#{asset}.html",
not File.exists?(path) do
%{status: 200, body: data} = Req.get!("#{assets_url}/#{asset}.html")
File.write!(path, data)
end
Generate a bouncing logo video, stream it via WebRTC
To receive the stream, visit http://localhost:1234/webrtc_to_browser.html
overlay =
Req.get!("https://avatars.githubusercontent.com/u/25247695?s=200&v=4").body
|> Vix.Vips.Image.new_from_buffer()
|> then(fn {:ok, img} -> img end)
|> Image.trim!()
|> Image.thumbnail!(100)
bg = Image.new!(640, 480, color: :light_gray)
max_x = Image.width(bg) - Image.width(overlay)
max_y = Image.height(bg) - Image.height(overlay)
Stream.iterate({_x = 300, _y = 0, _dx = 1, _dy = 2, _pts = 0}, fn {x, y, dx, dy, pts} ->
dx = if (x + dx) in 0..max_x, do: dx, else: -dx
dy = if (y + dy) in 0..max_y, do: dy, else: -dy
pts = pts + div(Membrane.Time.seconds(1), _fps = 60)
{x + dx, y + dy, dx, dy, pts}
end)
|> Stream.map(fn {x, y, _dx, _dy, pts} ->
img = Image.compose!(bg, overlay, x: x, y: y)
%Boombox.Packet{kind: :video, payload: img, pts: pts}
end)
|> Boombox.run(
input: {:stream, video: :image, audio: false},
output: {:webrtc, "ws://localhost:8830"}
)
Compose two streams side by side, broadcast via HLS
To receive the stream, visit http://localhost:1234/hls.html after running the cells below
The first cell uses :reader and :writer endpoints to communicate with boombox. In this
configuration the process calling Boombox.read/1 controls when packets are being provided.
input1 = "#{input_dir}/bun.mp4"
input2 = "#{input_dir}/ffmpeg-testsrc.mp4"
output = "#{out_dir}/index.m3u8"
reader1 = Boombox.run(input: input1, output: {:reader, video: :image, audio: false})
reader2 = Boombox.run(input: input2, output: {:reader, video: :image, audio: false})
writer = Boombox.run(input: {:writer, video: :image, audio: false}, output: output)
Stream.repeatedly(fn ->
case {Boombox.read(reader1), Boombox.read(reader2)} do
{{:ok, packet1}, {:ok, packet2}} ->
joined_image =
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)
packet = %Boombox.Packet{
pts: max(packet1.pts, packet2.pts),
payload: joined_image,
kind: :video
}
Boombox.write(writer, packet)
_finished ->
:eos
end
end)
|> Enum.find(&(&1 == :eos))
Boombox.close(writer)
Boombox.close(reader1)
Boombox.close(reader2)
The second cell uses :message endpoints, meaning that the server communicates with boomboxes by
exchanging messages. A consequence of this is that the inputting boomboxes will control the
pace of providing the packets to the server, what can be useful in some circumstances:
defmodule MyServer do
use GenServer
def start(args) do
GenServer.start(__MODULE__, args)
end
@impl true
def init(args) do
boombox1 = Boombox.run(input: args.input1, output: {:message, video: :image, audio: false})
boombox2 = Boombox.run(input: args.input2, output: {:message, video: :image, audio: false})
output_writer =
Boombox.run(input: {:writer, video: :image, audio: false}, output: args.output)
{:ok,
%{
boombox_states: %{
boombox1: %{last_packet: nil, eos: false},
boombox2: %{last_packet: nil, eos: false}
},
boomboxes: %{boombox1 => :boombox1, boombox2 => :boombox2},
output_writer: output_writer
}}
end
@impl true
def handle_info({:boombox_packet, bb, %Boombox.Packet{} = packet}, state) do
boombox_id = state.boomboxes[bb]
state = put_in(state.boombox_states[boombox_id].last_packet, packet)
if Enum.all?(Map.values(state.boombox_states), &(&1.last_packet != nil)) do
joined_image =
Vix.Vips.Operation.join!(
state.boombox_states.boombox1.last_packet.payload,
state.boombox_states.boombox2.last_packet.payload,
:VIPS_DIRECTION_HORIZONTAL
)
pts =
max(
state.boombox_states.boombox1.last_packet.pts,
state.boombox_states.boombox2.last_packet.pts
)
packet = %Boombox.Packet{packet | payload: joined_image, pts: pts}
Boombox.write(state.output_writer, packet)
end
{:noreply, state}
end
@impl true
def handle_info({:boombox_finished, bb}, state) do
boombox_id = state.boomboxes[bb]
state = put_in(state.boombox_states[boombox_id].eos, true)
if Enum.all?(Map.values(state.boombox_states), & &1.eos) do
Boombox.close(state.output_writer)
{:stop, :normal, state}
else
{:noreply, state}
end
end
end
input1 = "#{input_dir}/bun.mp4"
input2 = "#{input_dir}/ffmpeg-testsrc.mp4"
output = "#{out_dir}/index.m3u8"
{:ok, server} = MyServer.start(%{input1: input1, input2: input2, output: output})
monitor = Process.monitor(server)
receive do
{:DOWN, ^monitor, :process, ^server, reason} ->
:ok
end
Use Boombox.Bin to discard video from MP4 and to stream audio via WebRTC at the same time
To receive the stream, visit http://localhost:1234/webrtc_to_browser.html after running the cell below.
defmodule VideoDiscardingPipeline do
use Membrane.Pipeline
@impl true
def handle_init(_ctx, opts) do
spec = [
child(:input_boombox, %Boombox.Bin{
input: opts[:boombox_input]
})
|> via_out(:output, options: [kind: :audio])
|> via_in(:input, options: [kind: :audio])
|> child(:output_boombox, %Boombox.Bin{
output: opts[:boombox_output]
}),
get_child(:input_boombox)
|> via_out(:output, options: [kind: :video])
|> child(Membrane.Fake.Sink)
]
{[spec: spec], %{}}
end
end
{:ok, supervisor, _pipeline} =
Membrane.Pipeline.start_link(VideoDiscardingPipeline,
boombox_input: "#{input_dir}/bun.mp4",
boombox_output: {:webrtc, "ws://localhost:8830"}
)
monitor_ref = Process.monitor(supervisor)
receive do
{:DOWN, ^monitor_ref, :process, _pid, _reason} -> :ok
end