Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Echo server with SFU ExWebRTC

demo/echo-exrtc.livemd

Echo server with SFU ExWebRTC

Mix.install([
  {:kino, "~> 0.13.1"},
  {:ex_webrtc, "~> 0.3.0"},
  {:ex_cmd, "~> 0.12.0"}
])

SFU: ExWebRTC

The webcam is captured and displayed in a first element.

The browser and the Elixir SFU will establish a WebRTC PeerConnection via a signaling channel: the Livebook WebSocket.

They will exchange SDP and ICE candidates.

Once connected, the Elixir SFU will receive the streams from the via a UDP connection. In this Echo configuration, it will send back the streams to the connected peer via UDP and display them in the other element.

Information flow

  • The signaling process (instantiate the PeerConnection):
graph LR;
  B1[Browser 
PeerConnection]-- ws-->L[Livebook]; L-- ws -->Ex[Elixir SFU
PeerConnection]; Ex -- ws --> L L -- ws --> B1
  • The active connection:
graph LR;
  B3[video 1]--stream 
UDP -->ExSFU[Elixir SFU] ExSFU --stream
UPD-->B4[video 2];

Server-side WebRTC module

defmodule RtcServer do
  use GenServer

  alias ExWebRTC.{ICECandidate, PeerConnection, SessionDescription, MediaStreamTrack, RTPCodecParameters}
  alias ExWebRTC.RTP.VP8.Depayloader

  require Logger

  defp ice_servers(), do: [%{urls: "stun:stun.l.google.com:19302"}]
  defp video_codecs, do: [
    %RTPCodecParameters{
      payload_type: 96,
      mime_type: "video/VP8",
      clock_rate: 90_000
    }
  ]

  defp setup_transceivers(pc) do
    media_stream_id = MediaStreamTrack.generate_stream_id()
    video_track = MediaStreamTrack.new(:video, [media_stream_id])
    {:ok, _sender} = PeerConnection.add_track(pc, video_track)
    %{serv_video_track: video_track}
  end

  defp setup_ffmpeg do
    ffmpeg = "/opt/homebrew/Cellar/ffmpeg/7.0-with-options_1/bin/ffmpeg"
    #ffmpeg_cmd = ~w(#{ffmpeg} -i pipe:0 -f vpx -c:v copy -f webm pipe:1 )
    ffmpeg_cmd =  ~w(#{ffmpeg} -i pipe:0 -f rawvideo -vcodec vp8 -f ifv -y demo/output.ivf)

    {:ok, pid} = ExCmd.Process.start_link(ffmpeg_cmd, log: true)
    pid
  end
  # ??? -> can't find ffmpeg in Livebook?


  def start_link(args), do: GenServer.start_link(__MODULE__, args, name: __MODULE__)
  def connect, do: GenServer.call(__MODULE__, :connect)

  def receive_signaling_msg({msg, sender}) do
    GenServer.cast(__MODULE__, {:receive_signaling_msg, msg, sender})
  end

  def peek_state, do: GenServer.call(__MODULE__, :peek_state)

  @impl true
  def init(_args) do
    Logger.info("ExRtc PeerConnection started")


    {:ok, %{
      sender: nil,
      pc: nil,
      client_video_track: nil,
      video_depayloader: Depayloader.new(),
      i: 1,
      t: System.monotonic_time(:microsecond),
      ffmpeg_pid: setup_ffmpeg()
    }}
  end

  # Livebook calls
  @impl true
  def handle_call(:peek_state, _, state) do
    {:reply, state, state}
  end

  def handle_call(:connect, _, state) do
    {:ok, pc_pid} =
      PeerConnection.start_link(ice_servers: ice_servers(), video_codecs: video_codecs())

    state = %{state | pc: pc_pid} |> Map.merge(setup_transceivers(pc_pid))
    {:reply, :connected, state}
  end

  # messages received from the client
  @impl true
  def handle_cast({:receive_signaling_msg, %{"type"=> "offer"} = msg, sender},state) do
    with desc <-
           SessionDescription.from_json(msg),
         :ok <-
           PeerConnection.set_remote_description(state.pc, desc),
         {:ok, answer} <-
           PeerConnection.create_answer(state.pc),
         :ok <-
           PeerConnection.set_local_description(state.pc, answer),
         :ok <-
           gather_candidates(state.pc) do

      #  the 'answer' is formatted into a struct, which can't be read by the JS client
      answer = %{"type" =>  "answer", "sdp" => answer.sdp}
      send(sender, {:signaling, answer})
      Logger.warning("--> Server sends Answer to remote")
      {:noreply, %{state | sender: sender}}
    else
      error ->
        Logger.error("Server: Error creating answer: #{inspect(error)}")
        {:stop, :shutdown, state}
    end
  end

  def handle_cast({:receive_signaling_msg, %{"type"=> "ice"} = msg, _sender}, state) do
    candidate = ICECandidate.from_json(msg["candidate"])
    :ok = PeerConnection.add_ice_candidate(state.pc, candidate)
    Logger.debug("--> Server processes remote ICE")
    {:noreply, state}
  end

  def handle_cast({:receive_signaling_msg, {msg, _}}, state) do
    Logger.warning("Server: unexpected msg: #{inspect(msg)}")
    {:stop, :shutdown, state}
  end

  @impl true
  def handle_info({:ex_webrtc, _pc, {:track, %{kind: :video} = client_video_track}}, state) do
    {:noreply, %{state | client_video_track: client_video_track}}
  end

  # internal messages --------
  def handle_info({:ex_webrtc, _pc, {:ice_candidate, candidate}}, state) do
    candidate = ICECandidate.to_json(candidate)
    send(state.sender, {:signaling, %{"type"=> "ice", "candidate" => candidate}})
    Logger.debug("--> Server sends ICE to remote")
    {:noreply, state}
  end

  def handle_info(
        {:ex_webrtc, pc, {:rtp, client_track_id, _rid, packet}},
        %{client_video_track: %{id: client_track_id, kind: :video}} = state
      ) do
    PeerConnection.send_rtp(pc, state.serv_video_track.id, packet)

    state = handle_paquet(packet, state)

    {:noreply, state}
  end

  def handle_info({:ex_webrtc, pc, {:connection_state_change, :connected}}, state) do
    #sdp = PeerConnection.get_remote_description(pc).sdp
    #[codec, _] =
    #  Regex.scan(~r/a=rtpmap:([a-zA-Z0-9\s]+)/, sdp)

    PeerConnection.get_transceivers(pc)
    |> Enum.find(&amp;(&amp;1.kind == :video))
    |> then(fn %{receiver: receiver} ->
      Logger.warning("PeerConnection successfully connected, using #{inspect(receiver.codec.mime_type)}")
      end)

    {:noreply, state}
  end

  def handle_info({:ex_webrtc, _pc, _msg}, state) do
    {:noreply, state}
  end

  defp handle_paquet(packet, state) do
    case Depayloader.write(state.video_depayloader, packet) do
        {:ok, d} ->
          %{state | video_depayloader: d}


        {:ok, frame, d} ->
          # we measure around 30pfs
          n = state.i + 1
          t = System.monotonic_time(:microsecond)
          q = 30
          if (Integer.mod(n, q) == 0) do
            Logger.debug(%{
              size: Float.round(byte_size(frame)*8/1000,1),
              fps: round(q* 1_000_000/(t-state.t))
            })

            :ok = ExCmd.Process.write(state.ffmpeg_pid, frame)


            %{state | video_depayloader: d, i: n, t: t}
          else
            %{ state | video_depayloader: d, i: n}
          end

    end
  end

  defp gather_candidates(pc) do
    receive do
      {:ex_webrtc, ^pc, {:ice_gathering_state_change, :complete}} -> :ok
    after
      1000 -> {:error, :timeout}
    end
  end
end

Start the GenServer

Supervisor.start_link([RtcServer], strategy: :one_for_one, name: MySup)

and instantiate a new PeerConnection:

:connected = RtcServer.connect()

This will instantiate a ExWebRTC.PeerConnection server-side. It is waiting for a peer to connect and receive its offer. It will respond with an “answer”, digest the peer’s Ice candidates, and send to the peer its own ICE candidates.

RtcServer.peek_state()

Client-side WebRTC module

On connection, a new PeerConnection is created. Since the server is already connected, the client will send an “offer” via the signaling channel. The client expects an “answer” back. It will also send “Ice” candidates via the signaling channel, and expects to receive Ice candidates from the server.

Once the connection is set, the client will receive RTP packets and digest them into a element.

defmodule VideoLive do
  # GenServer to communicate between browser and Livebook server

  use Kino.JS
  use Kino.JS.Live

  require Logger

  @html """
    
      
        
        Local webcam
      
      
Echo webcam Some frames """
def new() do Kino.JS.Live.new(__MODULE__, @html) end asset "main.css" do """ #elt { display: flex; flex-direction: column; align-items: center } button { margin-top: 1em; margin-bottom: 1em; padding: 1em; background-color: bisque; } """ end asset "main.js" do """ export function init(ctx, html) { ctx.importCSS("main.css"); ctx.root.innerHTML = html; const iceConf = { iceServers: [{ urls: "stun:stun.l.google.com:19302" }], }; function run() { window.navigator.mediaDevices .getUserMedia({ video: { width: 300, height: 300 }, audio: false }) .then((stream) => { // display the webcam in a tag const videoIn = document.getElementById("video-in"); videoIn.srcObject = stream; const pc = new RTCPeerConnection(iceConf); // capture local MediaStream (from the webcam) const tracks = stream.getTracks(); tracks.forEach((track) => pc.addTrack(track, stream)); // send offer to any peer connected on the signaling channel pc.onicecandidate = ({candidate}) => { if (candidate === null) { return}; ctx.pushEvent("ice", { candidate: candidate.toJSON(), type: "ice" }); }; // send offer to any peer connected on the signaling channel pc.onnegotiationneeded = async () => { const offer = await pc.createOffer(); await pc.setLocalDescription(offer); console.log("--> Offer created and sent"); ctx.pushEvent("offer", { sdp: offer}); }; // received from the remote peer (Elixir SFU server here) via UDP pc.ontrack = ({ streams }) => { console.log("--> Received remote track"); const echo = document.querySelector("#echo"); echo.srcObject = streams[0]; }; // received from the remote peer via signaling channel (Elixir server) ctx.handleEvent("ice", async ({candidate}) => { await pc.addIceCandidate(candidate); }); ctx.handleEvent("answer", async (msg) => { console.log("--> handled Answer"); await pc.setRemoteDescription(msg); }); // internal WebRTC listener, for information or other action... pc.onconnectionstatechange = () => { console.log("~~> Connection state: ", pc.connectionState); }; }); } run() } """ end @impl true def init(html, ctx) do {:ok, assign(ctx, html: html)} end @impl true def handle_connect(ctx) do {:ok, ctx.assigns.html, ctx} end # received from the browser via the signaling WebSocket, call server @impl true def handle_event("offer",%{"sdp" => sdp}, ctx) do RtcServer.receive_signaling_msg({sdp, self()}) {:noreply, ctx} end def handle_event("ice",%{"candidate" => candidate}, ctx) do RtcServer.receive_signaling_msg({%{"type" => "ice", "candidate" => candidate}, self()}) {:noreply, ctx} end # received from the server, send to the browser via signaling WebSocket @impl true def handle_info({:signaling, %{"type" => "answer"} = msg}, ctx) do broadcast_event(ctx, "answer", msg) {:noreply, ctx} end def handle_info({:signaling, %{"type" => "ice"} = msg}, ctx) do if msg["candidate"], do: broadcast_event(ctx, "ice",msg) {:noreply, ctx} end end
VideoLive.new()

The PeerConnection server-side is now populated:

RtcServer.peek_state()

Dockerfile Livebook

Some packages are missing when you run the official image on Ubuntu.

FROM ghcr.io/livebook-dev/livebook:latest

RUN apt update && apt upgrade -y && \
    apt install pkg-config libssl-dev libsrtp2-dev -y && \
    apt purge -y && apt clean -y && \
    rm -rf /var/lib/apt/lists/*

WORKDIR /data
ENV LIVEBOOK_HOME=/data
ENV HOME=/home/livebook
ENV LIVEBOOK_IP="::"


HEALTHCHECK CMD wget --no-verbose --tries=1 --spider http://localhost:${LIVEBOOK_PORT-8080}/public/health || exit 1

CMD ["/app/bin/server"]
docker build . -t lvb:latest -f ./demo/Dockerfile
docker run --rm -p 8080:8080 -p 8181:8181 lvb:latest