HLS with Elixir
Mix.install([
{:plug, "~> 1.16"},
{:ex_cmd, "~> 0.12.0"},
{:evision, "~> 0.2.7"},
{:file_system, "~> 1.0"},
{:plug_crypto, "~> 2.1"},
{:bandit, "~> 1.5"},
{:kino, "~> 0.13.2"},
{:corsica, "~> 2.1"},
{:req, "~> 0.5.2"}
])
Create a WebServer listening on port 4001
We need to serve the segments and the playlist for the browser.
> Note that we need to set “CORS” on this server.
defmodule WebServer do
use Plug.Router
plug Corsica, origins: "*"
plug :match
plug :dispatch
# http://localhost:58331
get "/hls/:file" do
IO.puts "endpoint reached----"
%{"file" => file} = conn.params
data = File.read!("./priv/hls/"<>file)
Plug.Conn.send_resp(conn, 200, data)
end
end
Bandit.start_link(plug: WebServer, port: 4001)
FFmpeg processes
We run two FFmpeg
processes as “keep alive” with ExCmd
.
The first one will extract all the frames from the received video chunk.
The second one will build HLS segments and the playlist.
defmodule FFmpegProcessor do
@moduledoc false
@ffmpeg System.find_executable("ffmpeg") || "/opt/homebrew/bin/ffmpeg"
def start(frame_rate, resolution, duration) do
frame_pattern = "./priv/input/test_%05d.jpg"
build_frames =
~w(#{@ffmpeg} -loglevel debug
-i pipe:0 -framerate #{frame_rate}
-video_size #{resolution}
-thread_type slice
#{frame_pattern}
)
{:ok, pid_capture} =
ExCmd.Process.start_link(build_frames)
playlist = Path.join("./priv/hls", "playlist.m3u8")
segment = Path.join("./priv/hls", "segment_%03d.ts")
ffmpeg_rebuild_cmd =
~w(#{@ffmpeg} -loglevel info
-f image2pipe -framerate #{frame_rate}
-i pipe:0 -c:v libx264
-preset veryfast
-f hls
-hls_time #{duration}
-hls_list_size 4
-hls_playlist_type event
-hls_flags append_list
-hls_segment_filename #{segment}
#{playlist}
)
{:ok, pid_segment} =
ExCmd.Process.start_link(ffmpeg_rebuild_cmd)
{pid_capture, pid_segment}
end
end
##################################################################################
# ffmpeg [GeneralOptions] [InputFileOptions] -i input [OutputFileOptions] output #
##################################################################################
File watcher
This module will monitor changes in the file system in the given directory.
We want to know when FFmpeg
has built the “playlist.m3u8” file located in the directory “.priv/hls/“ (we decided to put it there, as set in the previous module).
When this event is detected, we send a message to the caller.
defmodule FileWatcher do
use GenServer
require Logger
@impl true
def init(ws_pid) do
{:ok, watcher_pid} = FileSystem.start_link(dirs: ["./priv/hls"])
FileSystem.subscribe(watcher_pid)
{:ok, %{watcher_pid: watcher_pid, ws_pid: ws_pid}}
end
@impl true
def handle_info(
{:file_event, watcher_pid, {path, _}},
%{watcher_pid: watcher_pid, ws_pid: ws_pid} = state
) do
Logger.debug("File created: #{path}")
if Path.extname(path) == ".m3u8", do: send(ws_pid, :playlist_created)
{:noreply, state}
end
end
Evision running the Haar Cascade face detection
We transform each frame by adding a rectangle around the ROI, if any.
We read the files and build new files.
> you will see false positives.
defmodule ImageProcessor do
def load_haar_cascade do
haar_path =
Path.join(
:code.priv_dir(:evision),
"share/opencv4/haarcascades/haarcascade_frontalface_default.xml"
)
Evision.CascadeClassifier.cascadeClassifier(haar_path)
end
def detect_and_draw_faces(file, face_detector) do
input_path = Path.join("./priv/input", file)
output_path = Path.join("./priv/output", file)
frame =
Evision.imread(input_path)
# convert to grey-scale
grey_img =
Evision.cvtColor(frame, Evision.ImreadModes.cv_IMREAD_GRAYSCALE())
# detect faces
faces =
Evision.CascadeClassifier.detectMultiScale(face_detector, grey_img)
# draw rectangles found on the original frame
result_frame =
Enum.reduce(faces, frame, fn {x, y, w, h}, mat ->
Evision.rectangle(mat, {x, y}, {x + w, y + h}, {0, 255, 0}, thickness: 2)
end)
Evision.imwrite(output_path, result_frame)
:ok = File.rm!(input_path)
end
end
The main process: Kino.JS.Live
We use Kino.JS.Live
. It runs a GenServer to handle the messages between the browser and the backend.
The API is close to a LiveView and Channel. Instead of a socket
, we have a context
object.
In the browser, we send a message with ctx.pushEvent
. In the backend, the corresponding callback is a handle_event
.
We send a message from the backend with broadcast_event
. In the browser, the listener is ctx.handleEvent
.
With Kino.JS
, you load the HTML by passing the HTML string to ctx.root.innerHTML
.
You load external libraries with ctx.importJS
.
> We send binary payloads from the browser to the process.
A helper module
defmodule Assets do
def fetch_js do
github_js_url = "https://raw.githubusercontent.com/dwyl/hls-demo/main/lib/assets/index.js"
Req.get!(github_js_url).body
end
def fetch_html do
github_html_url = "https://raw.githubusercontent.com/dwyl/hls-demo/main/lib/assets/index.html"
Req.get!(github_html_url).body
end
end
defmodule HlsLive do
use Kino.JS
use Kino.JS.Live
@duration 5
@frame_rate 30
@resolution "640x480"
@html Assets.fetch_html()
def run(), do: Kino.JS.Live.new(__MODULE__, @html)
asset "main.js" do
Assets.fetch_js()
end
@impl true
def init(html, ctx) do
["./priv/input", "./priv/output", "./priv/hls"] |> Enum.each(&File.mkdir_p/1)
{:ok, watcher_pid} =
GenServer.start(FileWatcher, self())
{pid_capture, pid_segment} =
FFmpegProcessor.start(@frame_rate, @resolution, @duration)
ctx =
ctx
|> assign(%{
html: html,
face_detector: ImageProcessor.load_haar_cascade(),
pid_capture: pid_capture,
pid_segment: pid_segment,
pid_watcher: watcher_pid,
map_list: MapSet.new(),
queue: :queue.new(),
frame_rate: @frame_rate,
chunk_id: 1,
ref: nil,
init: true
})
{:ok, ctx}
end
@impl true
def handle_connect(ctx) do
{:ok, ctx.assigns.html, ctx}
end
# received from the browser-------------
@impl true
def handle_event("stop", _, ctx) do
{:noreply, ctx}
end
def handle_event("chunk", {:binary,_, buffer}, ctx) do
%{pid_capture: pid_capture, chunk_id: chunk_id} = ctx.assigns
IO.puts("received data ---------------#{ctx.assigns.chunk_id}")
# Write the received binary data to the FFmpeg capture process
:ok = ExCmd.Process.write(pid_capture, buffer)
send(self(), :ffmpeg_process)
ctx = assign(ctx, chunk_id: chunk_id + 1)
{:noreply, ctx}
end
# received from the server--------------
@impl true
def handle_info(:ffmpeg_process, ctx) do
%{queue: queue, map_list: map_list} = ctx.assigns
case File.ls!("./priv/input") do
[] ->
{:noreply, ctx}
files ->
new_files =
MapSet.difference(MapSet.new(files), map_list)
MapSet.size(new_files) |> IO.inspect(label: "NEW FILES")
new_queue = :queue.in(MapSet.to_list(new_files), queue)
map_list = MapSet.union(new_files, map_list)
MapSet.size(map_list) |> IO.inspect(label: "MAP LIST")
send(self(), :process_queue)
ctx = ctx |> assign(queue: new_queue) |> assign(map_list: map_list)
{:noreply, ctx}
end
end
def handle_info(:process_queue, ctx) do
%{queue: queue, face_detector: face_detector} = ctx.assigns
case :queue.out(queue) do
{{:value, files}, new_queue} ->
:ok =
Task.async_stream(
files,
fn file ->
:ok = ImageProcessor.detect_and_draw_faces(file, face_detector)
end,
max_concurreny: System.schedulers_online(),
ordered: false
)
|> Stream.run()
send(self(), :process_queue)
{:noreply, assign(ctx, queue: new_queue)}
{:empty, _} ->
send(self(), :ffmpeg_rebuild)
{:noreply, assign(ctx, queue: :queue.new())}
end
end
def handle_info(:ffmpeg_rebuild, %{assigns: %{chunk_id: @duration}} = ctx) do
IO.puts("REBUILD---")
%{map_list: map_list, pid_segment: pid_segment} = ctx.assigns
list =
MapSet.to_list(map_list)
|> Enum.sort()
%{ref: ref} =
Task.async(fn ->
for file <- list do
ExCmd.Process.write(pid_segment, File.read!(Path.join("./priv/output", file)))
end
Enum.each(list, &File.rm(Path.join("./priv/output", &1)))
end)
ctx = ctx |> assign(map_list: MapSet.new()) |> assign(chunk_id: 0) |> assign(ref: ref)
{:noreply, ctx}
end
def handle_info(:ffmpeg_rebuild, ctx) do
{:noreply, ctx}
end
def handle_info(:playlist_created, %{assigns: %{init: true}} = ctx) do
IO.puts("PLAYLIST CREATED")
broadcast_event(ctx, "playlist_ready", %{})
{:noreply, assign(ctx, init: false)}
end
def handle_info(msg, ctx) do
{:noreply, ctx}
end
@impl true
def terminate(_, _) do
{:stop, :shutdown, :normal}
end
end
The output
We will see your webcam displayed.
Click on “start”.
After 15s, you should see below a second video element which streams the transformed feed of the webcam with face detection. Et voilà!.
HlsLive.run()