Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

C: Live Processing

C_live_processing.livemd

C: Live Processing

Logger.configure(level: :info)

# All necessary dependencies are installed by installing the package below
Mix.install([
  {:workshop_elixir_conf_us_2024, path: Path.join(__DIR__, "../..")}
])

Get node

Value returned by Node.self() can be used to get the metrics from the running pipelines.

Go to livebooks/metrics.livemd go visualize them.

Node.self()

WebRTCPipeline

The pipeline below gets the audio and video from the browser and resends it back.

If you use remote machine instead of your local one, change port numbers in the example below from 8829 on 8929 and from 8831 on 8931.

defmodule WebRTCPipeline do
  use Membrane.Pipeline

  alias Membrane.WebRTC

  @impl true
  def handle_init(_ctx, _opts) do
    spec = [
      child(:webrtc_source, %WebRTC.Source{
        signaling: {:websocket, port: 8829},
        video_codec: :vp8
      })
      |> via_out(Pad.ref(:output, :video_track), options: [kind: :video])
      |> via_in(Pad.ref(:input, :video_track), options: [kind: :video])
      |> child(:webrtc_sink, %WebRTC.Sink{
        signaling: {:websocket, port: 8831},
        video_codec: :vp8
      }),
      get_child(:webrtc_source)
      |> via_out(Pad.ref(:output, :audio_track), options: [kind: :audio])
      |> child(Membrane.Opus.Parser)
      |> via_in(Pad.ref(:input, :audio_track), options: [kind: :audio])
      |> get_child(:webrtc_sink)
    ]

    {[spec: spec], %{webrtc_sink_pads_with_eos: []}}
  end

  @impl true
  def handle_element_end_of_stream(pad, :webrtc_sink, _ctx, state) do
    state = Map.update!(state, :webrtc_sink_pads_with_eos, &[pad | &1])

    if length(state.webrtc_sink_pads_with_eos) >= 2 do
      {[terminate: :normal], state}
    else
      {[], state}
    end
  end

  def handle_element_end_of_stream(_pad, _child, _ctx, state), do: {[], state}
end

Run both cells below and enter http://localhost:8000/index.html url using your browser.

If you use remote machine, use url of it instead of localhost:8000.

{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(WebRTCPipeline, [])
ref = Process.monitor(supervisor)

:inets.start()

:inets.start(:httpd,
  bind_address: ~c"localhost",
  port: 8000,
  document_root: ~c"#{__DIR__}/../../assets/browser_to_browser",
  server_name: ~c"webrtc",
  server_root: "/tmp"
)

receive do
  {:DOWN, ^ref, _process, _pid, _reason} -> :ok
end

:inets.terminate(:httpd, ref)

Exercise C1: Style Transfer Live

Use your StyleTranferFilter from the previous exercises and put it in the pipeline, to have a pipeline that performs style transfer on live video.

Hint: Webrtc elements expect and return video in H264 with output_alignment: :nalu, while H264 Decoder and Encoder work on H264 with output_alignment: :au. Add child(%Membrane.H264.Parser{output_alignment: :au}) (or :nalu) to the pipeline, to switch between these two types of alignment.

defmodule StyleTransferFilter do
  use Membrane.Filter

  @impl true
  def handle_init(_ctx, _opts), do: {[], %{}}

  @impl true
  def handle_setup(_ctx, state) do
    # setup the element
    {[], state}
  end

  # more callbacks 
end
defmodule LiveStyleTransferPipeline do
  use Membrane.Pipeline

  alias Membrane.WebRTC

  @impl true
  def handle_init(_ctx, _opts) do
    spec = [
      child(:webrtc_source, %WebRTC.Source{
        signaling: {:websocket, port: 8829},
        video_codec: :h264
      })
      |> via_out(Pad.ref(:output, :video_track), options: [kind: :video])
      |> via_in(Pad.ref(:input, :video_track), options: [kind: :video])
      |> child(:webrtc_sink, %WebRTC.Sink{
        signaling: {:websocket, port: 8831},
        video_codec: :h264
      }),
      get_child(:webrtc_source)
      |> via_out(Pad.ref(:output, :audio_track), options: [kind: :audio])
      |> child(Membrane.Opus.Parser)
      |> via_in(Pad.ref(:input, :audio_track), options: [kind: :audio])
      |> get_child(:webrtc_sink)
    ]

    {[spec: spec], %{webrtc_sink_pads_with_eos: []}}
  end

  @impl true
  def handle_element_end_of_stream(pad, :webrtc_sink, _ctx, state) do
    state = Map.update!(state, :webrtc_sink_pads_with_eos, &[pad | &1])

    if length(state.webrtc_sink_pads_with_eos) >= 2 do
      {[terminate: :normal], state}
    else
      {[], state}
    end
  end

  def handle_element_end_of_stream(_pad, _child, _ctx, state), do: {[], state}
end
{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(LiveStyleTransferPipeline, [])
ref = Process.monitor(supervisor)

:inets.start()

:inets.start(:httpd,
  bind_address: ~c"localhost",
  port: 8000,
  document_root: ~c"#{__DIR__}/../../assets/browser_to_browser",
  server_name: ~c"webrtc",
  server_root: "/tmp"
)

receive do
  {:DOWN, ^ref, _process, _pid, _reason} -> :ok
end

:inets.terminate(:httpd, ref)