Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

MPEG-Dash Live Streaming with Elixir

lib/dash-demo.livemd

MPEG-Dash Live Streaming with Elixir

Mix.install([
  {:plug, "~> 1.16"},
  {:ex_cmd, "~> 0.12.0"},
  {:file_system, "~> 1.0"},
  {:plug_crypto, "~> 2.1"},
  {:bandit, "~> 1.5"},
  {:kino, "~> 0.13.2"},
  {:corsica, "~> 2.1"}
])

Introduction

In this Livebook, we illustrate how to run MPEG-DASH Live Streaming. It is an HTTP based protocole thus any server can be used.

We simply capture the feed of the built-in webcam with the MediaRecorder API, and send the chunks in binary form (as ArrayBuffer) to the server.

We then use a “keep alive” FFmpeg process to build the DASH files.

We set a GenServer to monitor the file system to detect when FFmpeg will create the manifest file. This uses FileSystem. When it detects the event, we send a message to the Kino.Js.Live process which will relay it to the process via its WebSocket connection.

This will trigger the Javascript library dash.js to start and work: send a GET request to the webserver and deliver the files. The DASH library will then continuously refresh the player with new segments.

The main difficulty is how MediRecorder can handle (or not) different codecs used by the different containers we may use: MP4 or WEBM.

File.mkdir_p("./dash")

FFmpeg process

We run one FFmpeg process as “keep alive” with ExCmd.

It will build DASH segments and the manifest file.


defmodule FFmpegProcessor do
  @moduledoc false

  @ffmpeg System.find_executable("ffmpeg") || "/opt/homebrew/bin/ffmpeg"

  def start(dash_path, mimetype) do
    manifest = Path.join(dash_path, "manifest.mpd")
    
    build_cmd_webm = 
      ~w(#{@ffmpeg} -i pipe:0 
        -map 0
        -codec: copy
        -f dash
        -use_timeline 1
        -use_template 1
        -init_seg_name init_$RepresentationID$.webm
        -media_seg_name chunk_$RepresentationID$_$Number$.webm
        #{manifest}
      )

      build_cmd_mp4 = 
        ~w(#{@ffmpeg }
        -analyzeduration 100M -probesize 50M
        -i pipe:0 
        -map 0
        -codec: copy
        -f dash
        -use_timeline 1
        -use_template 1
        -init_seg_name init_$RepresentationID$.m4s
        -media_seg_name chunk_$RepresentationID$_$Number$.m4s
        #{manifest}
      )
    
    case mimetype do
      "video/mp4" -> 
        {:ok, _pid_build} = 
          ExCmd.Process.start_link(build_cmd_mp4, log: true)
      "video/webm" -> 
        {:ok, _pid_build} = 
          ExCmd.Process.start_link(build_cmd_webm, log: true)
    end
  end

  def send(pid, data) do
    ExCmd.Process.write(pid, data)
  end
end

##################################################################################
# ffmpeg [GeneralOptions] [InputFileOptions] -i input [OutputFileOptions] output #
##################################################################################

Watch for the manifest file creation

This module will monitor changes in the file system in the given directory.

We want to know when FFmpeg has built the manifest.mpd file located in the directory “./priv/dash/“ (we decided to put it there, as set in the previous module).

When this event is detected, we send a message to the caller.

The “manifest.mpd” file looks like:

1.0utf-8

    
    
    
    
    
        
            
                
                    
                        
                        
                        
                        
                        
                        
                    
                
            
        
    

The FileWatcher module:

defmodule FileWatcher do
  use GenServer

  require Logger

  @impl true
  def init([ws_pid, dash_path]) do
    IO.puts "Started FileSystem watching #{inspect(dash_path)}"
    {:ok, watcher_pid} = FileSystem.start_link(dirs: [dash_path])
    FileSystem.subscribe(watcher_pid)

    {:ok, %{watcher_pid: watcher_pid, ws_pid: ws_pid}}
  end

  @impl true
  def handle_info(
        {:file_event, watcher_pid, {path, _}},
        %{watcher_pid: watcher_pid, ws_pid: ws_pid} = state
      ) do
    Logger.debug("File created: #{path}")
    if Path.extname(path) == ".mpd", do: send(ws_pid, :playlist_created)
    {:noreply, state}
  end
end

Webserver to serve the DASH files

In our configuration, we will serve the DASH files (the segments and the manifest file) locally when the browser asks for them. The idea is of course to save them in a CDN.

We use Bandit to run the Plug Router below.

> Note that we need to set “CORS”.

defmodule FileServer do
  use Plug.Router

  plug Corsica, origins: "*"
  plug :match
  plug :dispatch

  get "/dash/:file" do
    %{"file" => file} = conn.params
    [{:dash_path, dash_path}] = :ets.lookup(:dash, :dash_path)
    if File.exists?(Path.join(dash_path, file)) do
      serve_dash_file(conn, dash_path, file)
    else
      send_resp(conn, 404, "not found")
    end
  end

  defp serve_dash_file(conn, dash_path, file) do
    case Path.extname(file) do
      ".mpd" ->
        conn
        |> put_resp_content_type("application/dash+xml")
        |> send_file(200, Path.join(dash_path, "manifest.mpd"))
      ".webm" ->
        conn
        |> put_resp_content_type("video/iso.segment")
        |> send_file(200, Path.join(dash_path, file))
       ".m4s" ->
        conn
        |> put_resp_content_type("video/mp4")
        |> send_file(200, Path.join(dash_path, file))
      _ ->
        conn
        |> send_resp(415, "Unsupported media type")
    end
  end
end

Bandit.start_link(plug: FileServer, port: 4001)

Ets table to declare your “dash” files directory

:ets.new(:dash, [:named_table, :public])
dash_path = "./dash"
:ets.insert(:dash, {:dash_path, dash_path})

The main process: a Kino.JS.Live

We use Kino.JS.Live. It runs a GenServer to handle the messages between the browser and the backend.

The API is close to a LiveView and Channel. Instead of a socket, we have a context object.

In the browser, we send a message with ctx.pushEvent. In the backend, the corresponding callback is a handle_event.

We send a message from the backend with broadcast_event. In the browser, the listener is ctx.handleEvent.

With Kino.JS, you load the HTML by passing the HTML string to ctx.root.innerHTML.

You load external libraries with ctx.importJS.

> We send binary payloads from the browser to the process.

defmodule DashLive do
  use Kino.JS
  use Kino.JS.Live

  @html """
  
    
      START
      
STOP
Source

.spinner_ajPY{transform-origin:center;animation:spinner_AtaB .75s infinite linear}@keyframes spinner_AtaB{100%{transform:rotate(360deg)}}

Modified image """
def run(), do: Kino.JS.Live.new(__MODULE__, @html) asset "main.js" do """ export function init(ctx, html) { ctx.importJS("https://cdn.dashjs.org/latest/dash.all.min.js") ctx.root.innerHTML = html; console.log("loaded") let video1 = document.getElementById("source"), video2 = document.getElementById("output"), spinner = document.getElementById("spinner"), fileProc = document.getElementById("file-proc"), stop = document.getElementById("stop"), isReady = false; navigator.mediaDevices.getUserMedia({ video: { width: 640, height: 480 }, audio: false, }). then((stream) => { video1.srcObject = stream; spinner.style.visibility = "hidden"; video2.style.visibility = "hidden"; let mediaRecorder, mimetype; const types = [ "video/mp4; codecs=avc1.42E01E, mp4a.40.2", "video/mp4; codecs=avc1.64001F, mp4a.40.2", "video/mp4; codec=avc1.4D401F, mp4a.40.2", "video/webm;codecs=vp8, opus", "video/webm;codecs=vp9, opus", ]; for (let type of types) { if (MediaRecorder.isTypeSupported(type)) { mimetype = type.split(";")[0]; mediaRecorder = new MediaRecorder(stream, { mimeType: type }); } } ctx.pushEvent("mimetype", {mimetype: mimetype}) mediaRecorder.ondataavailable = async ({ data }) => { if (!isReady) return; if (data.size > 0) { console.log(data.size); const buffer = await data.arrayBuffer(); ctx.pushEvent("chunk", [{}, buffer]) } }; fileProc.onclick = () => { isReady = true; if (mediaRecorder.state == "inactive") mediaRecorder.start(1_000); spinner.style.visibility = "visible"; }; stop.onclick = () => { mediaRecorder.stop(); ctx.pushEvent("stop", {}) }; ctx.handleEvent("playlist_ready", handleHls) function handleHls() { spinner.style.visibility = "hidden"; video2.style.visibility = "visible"; let url = "http://localhost:4001/dash/manifest.mpd"; let player = dashjs.MediaPlayer().create(); player.initialize(video2, url, true); } }) } """ end @impl true def init(html, ctx) do [{:dash_path, dash_path}] = :ets.lookup(:dash, :dash_path) {:ok, pid_watcher} = GenServer.start(FileWatcher, [self(), dash_path]) ctx = ctx |> assign(%{ html: html, pid_watcher: pid_watcher, dash_path: dash_path, ref: nil, init: true }) {:ok, ctx} end @impl true def handle_connect(ctx) do {:ok, ctx.assigns.html, ctx} end # received from the browser------------- @impl true def handle_event("mimetype", %{"mimetype" => mimetype}, ctx) do {:ok, pid_build} = FFmpegProcessor.start(ctx.assigns.dash_path, mimetype) {:noreply, assign(ctx, %{pid_build: pid_build})} end def handle_event("stop", _, ctx) do %{pid_build: pid_build, pid_watcher: pid_watcher} = ctx.assigns GenServer.stop(pid_watcher) ExCmd.Process.close_stdin(pid_build) ExCmd.Process.await_exit(pid_build, 100) {:noreply, ctx} end def handle_event("chunk", {:binary,_, buffer}, ctx) do %{pid_build: pid_build} = ctx.assigns IO.puts("#{byte_size(buffer)}") IO.puts("received data ---------------") # Write the received binary data to the FFmpeg capture process :ok = FFmpegProcessor.send(pid_build, buffer) {:noreply, ctx} end # received from the server-------------- @impl true def handle_info(:playlist_created, %{assigns: %{init: true}} = ctx) do IO.puts("PLAYLIST CREATED") broadcast_event(ctx, "playlist_ready", %{}) {:noreply, assign(ctx, init: false)} end def handle_info(msg, ctx) do IO.puts "#{inspect(msg)}" {:noreply, ctx} end @impl true def terminate(_, _) do IO.puts "TERMINATE" {:stop, :shutdown, :normal} end end

The output

We will see your webcam displayed.

Click on “start”.

After a few secons, you should see below a second video element which broadcast back your feed.

DashLive.run()