Working with ExCmd.Process/Exile.Process
Mix.install([
{:ex_cmd, "~> 0.12.0"},
{:corsica, "~> 2.1"},
{:kino, "~> 0.13.1"},
{:bandit, "~> 1.5"},
{:exile, "~> 0.10.0"}
])
FFmpeg accepting buffer input with ExCmd.Process/Exile.process
Context
We want to Live stream ourselves from the webcam using HTTP Live Streaming. We use FFmpeg
to transform the streams we receive from our browser into HLS type files.
Since FFmpeg
can handle buffer data, we want to keep alive the FFmpeg
(OS) process.
You can use ExCmd.Process
or Exile.process
for this.
FFmpeg GenServer
The module below is a GenServer. It starts an ExCmd.Process
to run FFmpeg
. The command argument passed to FFmpeg
is tailored to produce these HLS type files from stdin
.
The GenServer starts the FFmpeg
process with ExCmd.Process.start_link/1
to whom we pass the command to execute the OS process.
FFmpeg
accepts inputs from stdin with the “pipe:0” argument. When you receive data from the browser, you read the data with ExCmd.Process.read/2
: it will send it to the OS process.
defmodule FFmpeger do
use GenServer
def start_link(opt) do
GenServer.start_link(__MODULE__,opt, name: __MODULE__)
end
def enqueue(path) do
GenServer.call(__MODULE__, {:process, path})
end
def pid, do: GenServer.call(__MODULE__, :pid)
def stop(), do: GenServer.call(__MODULE__, :stop)
def init(opt) do
dir = System.tmp_dir()
playlist_path = Path.join(dir, "stream.m3u8")
segment_path = Path.join(dir, "segment_%03d.ts")
ffmpeg = System.find_executable("ffmpeg") || "/opt/homebrew/bin/ffmpeg"
cmd =
~w(#{ffmpeg} -loglevel debug -hide_banner -i pipe:0 -r 20 -c:v libx264 -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+append_list -hls_playlist_type event -hls_segment_filename #{segment_path} #{playlist_path})
case opt do
:exile ->
{:ok, pid} = Exile.Process.start_link(cmd)
{:ok, {pid, :exile}}
:excmd ->
{:ok, pid} = ExCmd.Process.start_link(cmd)
{:ok, {pid, :excmd}}
end
end
def handle_call(:pid,_, state), do: {:reply, self(), state}
def handle_call({:process, path}, _from, {pid, :excmd}) do
data = File.read!(path)
ExCmd.Process.write(pid, data)
IO.puts("processed-----------------#{byte_size(data)}")
{:reply, :processed, {pid, :excmd}}
end
def handle_call({:process, path}, _from, {pid, :exile}) do
data = File.read!(path)
Exile.Process.write(pid, data)
IO.puts("processed-----------------#{byte_size(data)}")
{:reply, :processed, {pid, :exile}}
end
def handle_call(:stop, _from, {pid, :exile}) do
:ok = Exile.Process.close_stdin(pid)
{:ok, 0} = Exile.Process.await_exit(pid)
{:stop, :shutdown, {pid, :exile}}
IO.puts("stopped-----------------")
end
def handle_call(:stop, _from, {pid, :excmd}) do
:ok = ExCmd.Process.close_stdin(pid)
:eof = ExCmd.Process.read(pid)
{:ok, 0} = ExCmd.Process.await_exit(pid)
IO.puts("stopped-----------------")
{:stop, :shutdown, {pid, :excmd}}
end
end
Webserver to handle POST requests
We will run a (Bandit) WebServer to listen on port 4002 to handle the data sent by the video stream. We run a multipart HTTP POST request every second to send the binary data packaged into a file to the endpoint http://localhost:4002/upload.
defmodule PostRouter do
use Plug.Router
plug Corsica, origins: "http://localhost:4000", allow_methods: ["GET", "POST"]
plug(Plug.Parsers,
parsers: [:urlencoded, :multipart],
pass: ["*/*"]
)
plug(:match)
plug(:dispatch)
get "/exile" do
send_resp(conn, 200, "hello from Bandit with exile")
end
get "/excmd" do
send_resp(conn, 200, "hello from Bandit with excmd")
end
post "/upload" do
%{"file" => %Plug.Upload{path: path}} = conn.params
:processed = FFmpeger.enqueue(path)
send_resp(conn, 201, "uploaded")
end
end
Start the weberser and the FFmpeg runner module. You can choose the process runner by passing :exile
or :excmd
to the GenServer FFmpeger.
webserver = {Bandit, plug: PostRouter, scheme: :http, port: 4002}
Supervisor.start_link([webserver, {FFmpeger, :excmd}], strategy: :one_for_one, name: MySup)
Supervisor.which_children(MySup)
We test that the Bandit webserver is serving port 4002 by running a cURL
test with ExCmd.stream!
and Exile.stream!
.
ExCmd.stream!(~w(curl http://localhost:4002/excmd)) |> Enum.into("")
#Exile.stream!(~w(curl http://localhost:4002/exile)) |> Enum.into("")
Launch the video stream
This module runs a Kino.JS.Live
. It will run the webcam and run an HTTP POST request.
You can watch the logs below where the size of the data is displayed.
You can stop the process with the button “stop”. ExCmd
will gracefully stop Ffmpeg
.
defmodule VideoLive do
use Kino.JS
use Kino.JS.Live
@html """
Stop streaming
"""
def new() do
Kino.JS.Live.new(__MODULE__, @html)
end
asset "main.css" do
"""
#elt {
display: flex;
flex-direction: column;
align-items: center
}
button {
margin-top: 1em;
padding: 1em;
background-color: bisque;
}
"""
end
asset "main.js" do
"""
export function init(ctx, html) {
ctx.importCSS("main.css")
ctx.root.innerHTML = html
function run() {
navigator.mediaDevices.getUserMedia({video: {width: 400, height: 400}, audio: false})
.then((stream)=> {
let video = document.getElementById("video"),
send = true;
document.getElementById("stop").onclick = () => {
send = false;
ctx.pushEvent("stop", {})
}
video.srcObject = stream
let mediaRecorder = new MediaRecorder(stream);
mediaRecorder.ondataavailable = ({data}) => {
if (!send) return;
if (data.size > 0) {
console.log(data.size)
const file = new File([data], "chunk.webm", {
type: "video/webm",
});
const formData = new FormData();
formData.append("file", file);
fetch(`http://localhost:4002/upload`, {method: "POST",body: formData})
.then((res) => res.text())
.then(console.log)
}
}
mediaRecorder.start(1000)
})
}
run()
}
"""
end
@impl true
def init(html, ctx) do
{:ok, assign(ctx, html: html)}
end
@impl true
def handle_connect(ctx) do
{:ok, ctx.assigns.html, ctx}
end
@impl true
def handle_event("stop", _, ctx) do
FFmpeger.stop()
{:noreply, ctx}
end
end
VideoLive.new()