Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

HLS with Elixir

lib/hls-demo.livemd

HLS with Elixir

Mix.install([
  {:plug, "~> 1.16"},
  {:ex_cmd, "~> 0.12.0"},
  {:evision, "~> 0.2.7"},
  {:file_system, "~> 1.0"},
  {:plug_crypto, "~> 2.1"},
  {:bandit, "~> 1.5"},
  {:kino, "~> 0.13.2"},
  {:corsica, "~> 2.1"},
  {:req, "~> 0.5.2"}
])

Create a WebServer listening on port 4001

We need to serve the segments and the playlist for the browser.

> Note that we need to set “CORS” on this server.

defmodule WebServer do
  use Plug.Router

  plug Corsica, origins: "*"
  plug :match
  plug :dispatch

  # http://localhost:58331

  get "/hls/:file" do
    IO.puts "endpoint reached----"
    %{"file" => file} = conn.params
    data = File.read!("./priv/hls/"<>file)
    Plug.Conn.send_resp(conn, 200, data)
  end
end

Bandit.start_link(plug: WebServer, port: 4001)

FFmpeg processes

We run two FFmpeg processes as “keep alive” with ExCmd.

The first one will extract all the frames from the received video chunk.

The second one will build HLS segments and the playlist.


defmodule FFmpegProcessor do
  @moduledoc false

  @ffmpeg System.find_executable("ffmpeg") || "/opt/homebrew/bin/ffmpeg"

  def start(frame_rate, resolution, duration) do
    frame_pattern = "./priv/input/test_%05d.jpg"

    build_frames =
      ~w(#{@ffmpeg}  -loglevel debug
      -i pipe:0 -framerate #{frame_rate}
      -video_size #{resolution}
      -thread_type slice
      #{frame_pattern}
      )


    {:ok, pid_capture} =
      ExCmd.Process.start_link(build_frames)

    playlist = Path.join("./priv/hls", "playlist.m3u8")
    segment = Path.join("./priv/hls", "segment_%03d.ts")
    ffmpeg_rebuild_cmd =
      ~w(#{@ffmpeg} -loglevel info
			-f image2pipe -framerate #{frame_rate}
			-i pipe:0 -c:v libx264
			-preset veryfast
			-f hls
			-hls_time #{duration}
			-hls_list_size 4
			-hls_playlist_type event
			-hls_flags append_list
			-hls_segment_filename #{segment}
			#{playlist}
  		)

    {:ok, pid_segment} =
      ExCmd.Process.start_link(ffmpeg_rebuild_cmd)

    {pid_capture, pid_segment}
  end
end

##################################################################################
# ffmpeg [GeneralOptions] [InputFileOptions] -i input [OutputFileOptions] output #
##################################################################################

File watcher

This module will monitor changes in the file system in the given directory.

We want to know when FFmpeg has built the “playlist.m3u8” file located in the directory “.priv/hls/“ (we decided to put it there, as set in the previous module).

When this event is detected, we send a message to the caller.

defmodule FileWatcher do
  use GenServer

  require Logger

  @impl true
  def init(ws_pid) do
    {:ok, watcher_pid} = FileSystem.start_link(dirs: ["./priv/hls"])
    FileSystem.subscribe(watcher_pid)

    {:ok, %{watcher_pid: watcher_pid, ws_pid: ws_pid}}
  end

  @impl true
  def handle_info(
        {:file_event, watcher_pid, {path, _}},
        %{watcher_pid: watcher_pid, ws_pid: ws_pid} = state
      ) do
    Logger.debug("File created: #{path}")
    if Path.extname(path) == ".m3u8", do: send(ws_pid, :playlist_created)
    {:noreply, state}
  end
end

Evision running the Haar Cascade face detection

We transform each frame by adding a rectangle around the ROI, if any.

We read the files and build new files.

> you will see false positives.

defmodule ImageProcessor do

  def load_haar_cascade do
    haar_path =
      Path.join(
        :code.priv_dir(:evision),
        "share/opencv4/haarcascades/haarcascade_frontalface_default.xml"
      )

    Evision.CascadeClassifier.cascadeClassifier(haar_path)
  end

  def detect_and_draw_faces(file, face_detector) do
    input_path = Path.join("./priv/input", file)
    output_path = Path.join("./priv/output", file)

    frame =
      Evision.imread(input_path)

    # convert to grey-scale
    grey_img =
      Evision.cvtColor(frame, Evision.ImreadModes.cv_IMREAD_GRAYSCALE())

    # detect faces
    faces =
      Evision.CascadeClassifier.detectMultiScale(face_detector, grey_img)

    # draw rectangles found on the original frame
    result_frame =
      Enum.reduce(faces, frame, fn {x, y, w, h}, mat ->
        Evision.rectangle(mat, {x, y}, {x + w, y + h}, {0, 255, 0}, thickness: 2)
      end)

    Evision.imwrite(output_path, result_frame)
    :ok = File.rm!(input_path)
  end
end

The main process: Kino.JS.Live

We use Kino.JS.Live. It runs a GenServer to handle the messages between the browser and the backend.

The API is close to a LiveView and Channel. Instead of a socket, we have a context object.

In the browser, we send a message with ctx.pushEvent. In the backend, the corresponding callback is a handle_event.

We send a message from the backend with broadcast_event. In the browser, the listener is ctx.handleEvent.

With Kino.JS, you load the HTML by passing the HTML string to ctx.root.innerHTML.

You load external libraries with ctx.importJS.

> We send binary payloads from the browser to the process.

A helper module

defmodule Assets do
  def fetch_js do
    github_js_url = "https://raw.githubusercontent.com/dwyl/hls-demo/main/lib/assets/index.js"
    Req.get!(github_js_url).body
  end

  def fetch_html do
    github_html_url = "https://raw.githubusercontent.com/dwyl/hls-demo/main/lib/assets/index.html"
    Req.get!(github_html_url).body
  end
end
defmodule HlsLive do
  use Kino.JS
  use Kino.JS.Live

  @duration 5
  @frame_rate 30
  @resolution "640x480"

  @html Assets.fetch_html()

  def run(), do: Kino.JS.Live.new(__MODULE__, @html)

  asset "main.js" do
    Assets.fetch_js()
  end

  @impl true
  def init(html, ctx) do
    ["./priv/input", "./priv/output", "./priv/hls"] |> Enum.each(&amp;File.mkdir_p/1)

    {:ok, watcher_pid} =
      GenServer.start(FileWatcher, self())

    {pid_capture, pid_segment} =
      FFmpegProcessor.start(@frame_rate, @resolution, @duration)

    ctx =
      ctx
      |> assign(%{
        html: html,
        face_detector: ImageProcessor.load_haar_cascade(),
        pid_capture: pid_capture,
        pid_segment: pid_segment,
        pid_watcher: watcher_pid,
        map_list: MapSet.new(),
        queue: :queue.new(),
        frame_rate: @frame_rate,
        chunk_id: 1,
        ref: nil,
        init: true
      })
    {:ok, ctx}
  end

  @impl true
  def handle_connect(ctx) do
    {:ok, ctx.assigns.html, ctx}
  end

  # received from the browser-------------
  @impl true
  def handle_event("stop", _, ctx) do
    {:noreply, ctx}
  end

  def handle_event("chunk", {:binary,_, buffer}, ctx) do
    %{pid_capture: pid_capture, chunk_id: chunk_id} = ctx.assigns
    IO.puts("received data ---------------#{ctx.assigns.chunk_id}")

    # Write the received binary data to the FFmpeg capture process
    :ok = ExCmd.Process.write(pid_capture, buffer)

    send(self(), :ffmpeg_process)
    ctx = assign(ctx, chunk_id: chunk_id + 1)
    {:noreply, ctx}
  end

  # received from the server--------------

  @impl true
  def handle_info(:ffmpeg_process, ctx) do
    %{queue: queue, map_list: map_list} = ctx.assigns

    case File.ls!("./priv/input") do
      [] ->
        {:noreply, ctx}

      files ->
        new_files =
          MapSet.difference(MapSet.new(files), map_list)

        MapSet.size(new_files) |> IO.inspect(label: "NEW FILES")

        new_queue = :queue.in(MapSet.to_list(new_files), queue)
        map_list = MapSet.union(new_files, map_list)
        MapSet.size(map_list) |> IO.inspect(label: "MAP LIST")
        send(self(), :process_queue)
        ctx = ctx |> assign(queue: new_queue) |> assign(map_list: map_list)
        {:noreply, ctx}
    end
  end

  def handle_info(:process_queue, ctx) do
    %{queue: queue, face_detector: face_detector} = ctx.assigns

    case :queue.out(queue) do
      {{:value, files}, new_queue} ->
        :ok =
          Task.async_stream(
            files,
            fn file ->
              :ok = ImageProcessor.detect_and_draw_faces(file, face_detector)
            end,
            max_concurreny: System.schedulers_online(),
            ordered: false
          )
          |> Stream.run()

        send(self(), :process_queue)

        {:noreply, assign(ctx, queue: new_queue)}

      {:empty, _} ->
        send(self(), :ffmpeg_rebuild)
        {:noreply, assign(ctx, queue: :queue.new())}
    end
  end

  def handle_info(:ffmpeg_rebuild, %{assigns: %{chunk_id: @duration}} = ctx) do
    IO.puts("REBUILD---")
    %{map_list: map_list, pid_segment: pid_segment} = ctx.assigns

    list =
      MapSet.to_list(map_list)
      |> Enum.sort()

    %{ref: ref} =
      Task.async(fn ->
        for file <- list do
          ExCmd.Process.write(pid_segment, File.read!(Path.join("./priv/output", file)))
        end

        Enum.each(list, &amp;File.rm(Path.join("./priv/output", &amp;1)))
      end)

    ctx = ctx |> assign(map_list: MapSet.new()) |> assign(chunk_id: 0) |> assign(ref: ref)
    {:noreply, ctx}
  end

  def handle_info(:ffmpeg_rebuild, ctx) do
    {:noreply, ctx}
  end

  def handle_info(:playlist_created, %{assigns: %{init: true}} = ctx) do
    IO.puts("PLAYLIST CREATED")
    broadcast_event(ctx, "playlist_ready", %{})
    {:noreply, assign(ctx, init: false)}
  end

  def handle_info(msg, ctx) do
    {:noreply, ctx}
  end

  @impl true
  def terminate(_, _) do
    {:stop, :shutdown, :normal}
  end
end

The output

We will see your webcam displayed.

Click on “start”.

After 15s, you should see below a second video element which streams the transformed feed of the webcam with face detection. Et voilà!.

HlsLive.run()