Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Working with ExCmd.Process/Exile.Process

demo/ffmpeg-buffer-excmd-process.livemd

Working with ExCmd.Process/Exile.Process

Mix.install([
  {:ex_cmd, "~> 0.12.0"},
  {:corsica, "~> 2.1"},
  {:kino, "~> 0.13.1"},
  {:bandit, "~> 1.5"},
  {:exile, "~> 0.10.0"}
])

FFmpeg accepting buffer input with ExCmd.Process/Exile.process

Context

We want to Live stream ourselves from the webcam using HTTP Live Streaming. We use FFmpeg to transform the streams we receive from our browser into HLS type files.

Since FFmpeg can handle buffer data, we want to keep alive the FFmpeg (OS) process.

You can use ExCmd.Process or Exile.process for this.

FFmpeg GenServer

The module below is a GenServer. It starts an ExCmd.Process to run FFmpeg. The command argument passed to FFmpeg is tailored to produce these HLS type files from stdin.

The GenServer starts the FFmpeg process with ExCmd.Process.start_link/1 to whom we pass the command to execute the OS process.

FFmpeg accepts inputs from stdin with the “pipe:0” argument. When you receive data from the browser, you read the data with ExCmd.Process.read/2: it will send it to the OS process.

defmodule FFmpeger do
  use GenServer

  def start_link(opt) do
    GenServer.start_link(__MODULE__,opt, name: __MODULE__)
  end

  def enqueue(path) do
    GenServer.call(__MODULE__, {:process, path})
  end

  def pid, do: GenServer.call(__MODULE__, :pid)

  def stop(), do: GenServer.call(__MODULE__, :stop)

  def init(opt) do
    dir = System.tmp_dir()
    playlist_path = Path.join(dir, "stream.m3u8")
    segment_path = Path.join(dir, "segment_%03d.ts")
    ffmpeg = System.find_executable("ffmpeg") || "/opt/homebrew/bin/ffmpeg"

    cmd =
      ~w(#{ffmpeg} -loglevel debug -hide_banner -i pipe:0 -r 20 -c:v libx264 -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+append_list -hls_playlist_type event  -hls_segment_filename #{segment_path} #{playlist_path})


    case opt do
      :exile ->
        {:ok, pid} = Exile.Process.start_link(cmd)
        {:ok, {pid, :exile}}
      :excmd ->
        {:ok, pid} = ExCmd.Process.start_link(cmd)
        {:ok, {pid, :excmd}}
    end
  end

  def handle_call(:pid,_, state), do: {:reply, self(), state}

  def handle_call({:process, path}, _from, {pid, :excmd}) do
    data = File.read!(path)
    ExCmd.Process.write(pid, data)
    IO.puts("processed-----------------#{byte_size(data)}")
    {:reply, :processed, {pid, :excmd}}
  end

  def handle_call({:process, path}, _from, {pid, :exile}) do
    data = File.read!(path)
    Exile.Process.write(pid, data)
    IO.puts("processed-----------------#{byte_size(data)}")
    {:reply, :processed, {pid, :exile}}
  end



  def handle_call(:stop, _from, {pid, :exile}) do
    :ok = Exile.Process.close_stdin(pid)
    {:ok, 0} = Exile.Process.await_exit(pid)
    {:stop, :shutdown, {pid, :exile}}
    IO.puts("stopped-----------------")
  end

  def handle_call(:stop, _from, {pid, :excmd}) do
     :ok = ExCmd.Process.close_stdin(pid)
     :eof = ExCmd.Process.read(pid)
     {:ok, 0} = ExCmd.Process.await_exit(pid)
    IO.puts("stopped-----------------")
    {:stop, :shutdown, {pid, :excmd}}
  end
end

Webserver to handle POST requests

We will run a (Bandit) WebServer to listen on port 4002 to handle the data sent by the video stream. We run a multipart HTTP POST request every second to send the binary data packaged into a file to the endpoint http://localhost:4002/upload.

 defmodule PostRouter do
  use Plug.Router

  plug Corsica, origins: "http://localhost:4000", allow_methods: ["GET", "POST"]
  plug(Plug.Parsers,
    parsers: [:urlencoded, :multipart],
    pass: ["*/*"]
  )

  plug(:match)
  plug(:dispatch)

  get "/exile" do
    send_resp(conn, 200, "hello from Bandit with exile")
  end

  get "/excmd" do
    send_resp(conn, 200, "hello from Bandit with excmd")
  end


  post "/upload" do
    %{"file" => %Plug.Upload{path: path}} = conn.params
    :processed = FFmpeger.enqueue(path)
    send_resp(conn, 201, "uploaded")
  end
end

Start the weberser and the FFmpeg runner module. You can choose the process runner by passing :exile or :excmd to the GenServer FFmpeger.

webserver = {Bandit, plug: PostRouter, scheme: :http, port: 4002}

Supervisor.start_link([webserver, {FFmpeger, :excmd}], strategy: :one_for_one, name: MySup)

Supervisor.which_children(MySup)

We test that the Bandit webserver is serving port 4002 by running a cURL test with ExCmd.stream! and Exile.stream!.

ExCmd.stream!(~w(curl  http://localhost:4002/excmd)) |> Enum.into("")
#Exile.stream!(~w(curl  http://localhost:4002/exile)) |> Enum.into("")

Launch the video stream

This module runs a Kino.JS.Live. It will run the webcam and run an HTTP POST request.

You can watch the logs below where the size of the data is displayed.

You can stop the process with the button “stop”. ExCmd will gracefully stop Ffmpeg.

defmodule VideoLive do

  use Kino.JS
  use Kino.JS.Live

  @html """
    
      
      Stop streaming
    
    """


  def new() do
    Kino.JS.Live.new(__MODULE__, @html)
  end

  asset "main.css" do
    """
    #elt {
      display: flex;
      flex-direction: column;
      align-items: center
    }
    button {
      margin-top: 1em;
      padding: 1em;
      background-color: bisque;
    }
    """
  end

  asset "main.js" do
    """
    export function init(ctx, html) {
      ctx.importCSS("main.css")
      ctx.root.innerHTML = html

      function run() {
        navigator.mediaDevices.getUserMedia({video: {width: 400, height: 400}, audio: false})
        .then((stream)=> {
          let video = document.getElementById("video"),
            send = true;

          document.getElementById("stop").onclick = () => {
              send = false;
              ctx.pushEvent("stop", {})
          }

          video.srcObject = stream
          let mediaRecorder = new MediaRecorder(stream);
          mediaRecorder.ondataavailable = ({data}) => {
            if (!send) return;
            if (data.size > 0) {
              console.log(data.size)
              const file = new File([data], "chunk.webm", {
                type: "video/webm",
              });
            const formData = new FormData();
            formData.append("file", file);
            fetch(`http://localhost:4002/upload`, {method: "POST",body: formData})
            .then((res) => res.text())
            .then(console.log)
            }
          }
          mediaRecorder.start(1000)
        })
      }

      run()
    }
    """
  end

  @impl true
  def init(html, ctx) do
    {:ok, assign(ctx, html: html)}
  end

  @impl true
  def handle_connect(ctx) do
    {:ok, ctx.assigns.html, ctx}
  end

  @impl true
  def handle_event("stop", _, ctx) do
    FFmpeger.stop()
    {:noreply, ctx}
  end
end
VideoLive.new()