MPEG-Dash Live Streaming with Elixir
Mix.install([
{:plug, "~> 1.16"},
{:ex_cmd, "~> 0.12.0"},
{:file_system, "~> 1.0"},
{:plug_crypto, "~> 2.1"},
{:bandit, "~> 1.5"},
{:kino, "~> 0.13.2"},
{:corsica, "~> 2.1"}
])
Introduction
In this Livebook, we illustrate how to run MPEG-DASH Live Streaming. It is an HTTP based protocole thus any server can be used.
We simply capture the feed of the built-in webcam with the MediaRecorder
API, and send the chunks in binary form (as ArrayBuffer
) to the server.
We then use a “keep alive” FFmpeg
process to build the DASH files.
We set a GenServer to monitor the file system to detect when FFmpeg will create the manifest file. This uses FileSystem
. When it detects the event, we send a message to the Kino.Js.Live
process which will relay it to the process via its WebSocket connection.
This will trigger the Javascript library dash.js
to start and work: send a GET request to the webserver and deliver the files. The DASH library will then continuously refresh the player with new segments.
The main difficulty is how MediRecorder
can handle (or not) different codecs used by the different containers we may use: MP4 or WEBM.
File.mkdir_p("./dash")
FFmpeg process
We run one FFmpeg
process as “keep alive” with ExCmd
.
It will build DASH segments and the manifest file.
defmodule FFmpegProcessor do
@moduledoc false
@ffmpeg System.find_executable("ffmpeg") || "/opt/homebrew/bin/ffmpeg"
def start(dash_path, mimetype) do
manifest = Path.join(dash_path, "manifest.mpd")
build_cmd_webm =
~w(#{@ffmpeg} -i pipe:0
-map 0
-codec: copy
-f dash
-use_timeline 1
-use_template 1
-init_seg_name init_$RepresentationID$.webm
-media_seg_name chunk_$RepresentationID$_$Number$.webm
#{manifest}
)
build_cmd_mp4 =
~w(#{@ffmpeg }
-analyzeduration 100M -probesize 50M
-i pipe:0
-map 0
-codec: copy
-f dash
-use_timeline 1
-use_template 1
-init_seg_name init_$RepresentationID$.m4s
-media_seg_name chunk_$RepresentationID$_$Number$.m4s
#{manifest}
)
case mimetype do
"video/mp4" ->
{:ok, _pid_build} =
ExCmd.Process.start_link(build_cmd_mp4, log: true)
"video/webm" ->
{:ok, _pid_build} =
ExCmd.Process.start_link(build_cmd_webm, log: true)
end
end
def send(pid, data) do
ExCmd.Process.write(pid, data)
end
end
##################################################################################
# ffmpeg [GeneralOptions] [InputFileOptions] -i input [OutputFileOptions] output #
##################################################################################
Watch for the manifest file creation
This module will monitor changes in the file system in the given directory.
We want to know when FFmpeg
has built the manifest.mpd file located in the directory “./priv/dash/“ (we decided to put it there, as set in the previous module).
When this event is detected, we send a message to the caller.
The “manifest.mpd” file looks like:
1.0utf-8
The FileWatcher module:
defmodule FileWatcher do
use GenServer
require Logger
@impl true
def init([ws_pid, dash_path]) do
IO.puts "Started FileSystem watching #{inspect(dash_path)}"
{:ok, watcher_pid} = FileSystem.start_link(dirs: [dash_path])
FileSystem.subscribe(watcher_pid)
{:ok, %{watcher_pid: watcher_pid, ws_pid: ws_pid}}
end
@impl true
def handle_info(
{:file_event, watcher_pid, {path, _}},
%{watcher_pid: watcher_pid, ws_pid: ws_pid} = state
) do
Logger.debug("File created: #{path}")
if Path.extname(path) == ".mpd", do: send(ws_pid, :playlist_created)
{:noreply, state}
end
end
Webserver to serve the DASH files
In our configuration, we will serve the DASH files (the segments and the manifest file) locally when the browser asks for them. The idea is of course to save them in a CDN.
We use Bandit
to run the Plug Router below.
> Note that we need to set “CORS”.
defmodule FileServer do
use Plug.Router
plug Corsica, origins: "*"
plug :match
plug :dispatch
get "/dash/:file" do
%{"file" => file} = conn.params
[{:dash_path, dash_path}] = :ets.lookup(:dash, :dash_path)
if File.exists?(Path.join(dash_path, file)) do
serve_dash_file(conn, dash_path, file)
else
send_resp(conn, 404, "not found")
end
end
defp serve_dash_file(conn, dash_path, file) do
case Path.extname(file) do
".mpd" ->
conn
|> put_resp_content_type("application/dash+xml")
|> send_file(200, Path.join(dash_path, "manifest.mpd"))
".webm" ->
conn
|> put_resp_content_type("video/iso.segment")
|> send_file(200, Path.join(dash_path, file))
".m4s" ->
conn
|> put_resp_content_type("video/mp4")
|> send_file(200, Path.join(dash_path, file))
_ ->
conn
|> send_resp(415, "Unsupported media type")
end
end
end
Bandit.start_link(plug: FileServer, port: 4001)
Ets table to declare your “dash” files directory
:ets.new(:dash, [:named_table, :public])
dash_path = "./dash"
:ets.insert(:dash, {:dash_path, dash_path})
The main process: a Kino.JS.Live
We use Kino.JS.Live
. It runs a GenServer to handle the messages between the browser and the backend.
The API is close to a LiveView and Channel. Instead of a socket
, we have a context
object.
In the browser, we send a message with ctx.pushEvent
. In the backend, the corresponding callback is a handle_event
.
We send a message from the backend with broadcast_event
. In the browser, the listener is ctx.handleEvent
.
With Kino.JS
, you load the HTML by passing the HTML string to ctx.root.innerHTML
.
You load external libraries with ctx.importJS
.
> We send binary payloads from the browser to the process.
defmodule DashLive do
use Kino.JS
use Kino.JS.Live
@html """
START
STOP
Source
.spinner_ajPY{transform-origin:center;animation:spinner_AtaB .75s infinite linear}@keyframes spinner_AtaB{100%{transform:rotate(360deg)}}
Modified image
"""
def run(), do: Kino.JS.Live.new(__MODULE__, @html)
asset "main.js" do
"""
export function init(ctx, html) {
ctx.importJS("https://cdn.dashjs.org/latest/dash.all.min.js")
ctx.root.innerHTML = html;
console.log("loaded")
let video1 = document.getElementById("source"),
video2 = document.getElementById("output"),
spinner = document.getElementById("spinner"),
fileProc = document.getElementById("file-proc"),
stop = document.getElementById("stop"),
isReady = false;
navigator.mediaDevices.getUserMedia({
video: { width: 640, height: 480 },
audio: false,
}). then((stream) => {
video1.srcObject = stream;
spinner.style.visibility = "hidden";
video2.style.visibility = "hidden";
let mediaRecorder, mimetype;
const types = [
"video/mp4; codecs=avc1.42E01E, mp4a.40.2",
"video/mp4; codecs=avc1.64001F, mp4a.40.2",
"video/mp4; codec=avc1.4D401F, mp4a.40.2",
"video/webm;codecs=vp8, opus",
"video/webm;codecs=vp9, opus",
];
for (let type of types) {
if (MediaRecorder.isTypeSupported(type)) {
mimetype = type.split(";")[0];
mediaRecorder = new MediaRecorder(stream, { mimeType: type });
}
}
ctx.pushEvent("mimetype", {mimetype: mimetype})
mediaRecorder.ondataavailable = async ({ data }) => {
if (!isReady) return;
if (data.size > 0) {
console.log(data.size);
const buffer = await data.arrayBuffer();
ctx.pushEvent("chunk", [{}, buffer])
}
};
fileProc.onclick = () => {
isReady = true;
if (mediaRecorder.state == "inactive") mediaRecorder.start(1_000);
spinner.style.visibility = "visible";
};
stop.onclick = () => {
mediaRecorder.stop();
ctx.pushEvent("stop", {})
};
ctx.handleEvent("playlist_ready", handleHls)
function handleHls() {
spinner.style.visibility = "hidden";
video2.style.visibility = "visible";
let url = "http://localhost:4001/dash/manifest.mpd";
let player = dashjs.MediaPlayer().create();
player.initialize(video2, url, true);
}
})
}
"""
end
@impl true
def init(html, ctx) do
[{:dash_path, dash_path}] = :ets.lookup(:dash, :dash_path)
{:ok, pid_watcher} = GenServer.start(FileWatcher, [self(), dash_path])
ctx =
ctx
|> assign(%{
html: html,
pid_watcher: pid_watcher,
dash_path: dash_path,
ref: nil,
init: true
})
{:ok, ctx}
end
@impl true
def handle_connect(ctx) do
{:ok, ctx.assigns.html, ctx}
end
# received from the browser-------------
@impl true
def handle_event("mimetype", %{"mimetype" => mimetype}, ctx) do
{:ok, pid_build} = FFmpegProcessor.start(ctx.assigns.dash_path, mimetype)
{:noreply, assign(ctx, %{pid_build: pid_build})}
end
def handle_event("stop", _, ctx) do
%{pid_build: pid_build, pid_watcher: pid_watcher} = ctx.assigns
GenServer.stop(pid_watcher)
ExCmd.Process.close_stdin(pid_build)
ExCmd.Process.await_exit(pid_build, 100)
{:noreply, ctx}
end
def handle_event("chunk", {:binary,_, buffer}, ctx) do
%{pid_build: pid_build} = ctx.assigns
IO.puts("#{byte_size(buffer)}")
IO.puts("received data ---------------")
# Write the received binary data to the FFmpeg capture process
:ok = FFmpegProcessor.send(pid_build, buffer)
{:noreply, ctx}
end
# received from the server--------------
@impl true
def handle_info(:playlist_created, %{assigns: %{init: true}} = ctx) do
IO.puts("PLAYLIST CREATED")
broadcast_event(ctx, "playlist_ready", %{})
{:noreply, assign(ctx, init: false)}
end
def handle_info(msg, ctx) do
IO.puts "#{inspect(msg)}"
{:noreply, ctx}
end
@impl true
def terminate(_, _) do
IO.puts "TERMINATE"
{:stop, :shutdown, :normal}
end
end
The output
We will see your webcam displayed.
Click on “start”.
After a few secons, you should see below a second video element which broadcast back your feed.
DashLive.run()