C: Live Processing
Logger.configure(level: :info)
# All necessary dependencies are installed by installing the package below
Mix.install([
{:workshop_elixir_conf_us_2024, path: Path.join(__DIR__, "../..")}
])
Get node
Value returned by Node.self()
can be used to get the metrics from the running pipelines.
Go to livebooks/metrics.livemd
go visualize them.
Node.self()
WebRTCPipeline
The pipeline below gets the audio and video from the browser and resends it back.
If you use remote machine instead of your local one, change port numbers in the example below from 8829
on 8929
and from 8831
on 8931
.
defmodule WebRTCPipeline do
use Membrane.Pipeline
alias Membrane.WebRTC
@impl true
def handle_init(_ctx, _opts) do
spec = [
child(:webrtc_source, %WebRTC.Source{
signaling: {:websocket, port: 8829},
video_codec: :vp8
})
|> via_out(Pad.ref(:output, :video_track), options: [kind: :video])
|> via_in(Pad.ref(:input, :video_track), options: [kind: :video])
|> child(:webrtc_sink, %WebRTC.Sink{
signaling: {:websocket, port: 8831},
video_codec: :vp8
}),
get_child(:webrtc_source)
|> via_out(Pad.ref(:output, :audio_track), options: [kind: :audio])
|> child(Membrane.Opus.Parser)
|> via_in(Pad.ref(:input, :audio_track), options: [kind: :audio])
|> get_child(:webrtc_sink)
]
{[spec: spec], %{webrtc_sink_pads_with_eos: []}}
end
@impl true
def handle_element_end_of_stream(pad, :webrtc_sink, _ctx, state) do
state = Map.update!(state, :webrtc_sink_pads_with_eos, &[pad | &1])
if length(state.webrtc_sink_pads_with_eos) >= 2 do
{[terminate: :normal], state}
else
{[], state}
end
end
def handle_element_end_of_stream(_pad, _child, _ctx, state), do: {[], state}
end
Run both cells below and enter http://localhost:8000/index.html
url using your browser.
If you use remote machine, use url of it instead of localhost:8000
.
{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(WebRTCPipeline, [])
ref = Process.monitor(supervisor)
:inets.start()
:inets.start(:httpd,
bind_address: ~c"localhost",
port: 8000,
document_root: ~c"#{__DIR__}/../../assets/browser_to_browser",
server_name: ~c"webrtc",
server_root: "/tmp"
)
receive do
{:DOWN, ^ref, _process, _pid, _reason} -> :ok
end
:inets.terminate(:httpd, ref)
Exercise C1: Style Transfer Live
Use your StyleTranferFilter
from the previous exercises and put it in the pipeline, to have a pipeline that performs style transfer on live video.
Hint: Webrtc elements expect and return video in H264 with output_alignment: :nalu
, while H264 Decoder and Encoder work on H264 with output_alignment: :au
. Add child(%Membrane.H264.Parser{output_alignment: :au})
(or :nalu
) to the pipeline, to switch between these two types of alignment.
defmodule StyleTransferFilter do
use Membrane.Filter
@impl true
def handle_init(_ctx, _opts), do: {[], %{}}
@impl true
def handle_setup(_ctx, state) do
# setup the element
{[], state}
end
# more callbacks
end
defmodule LiveStyleTransferPipeline do
use Membrane.Pipeline
alias Membrane.WebRTC
@impl true
def handle_init(_ctx, _opts) do
spec = [
child(:webrtc_source, %WebRTC.Source{
signaling: {:websocket, port: 8829},
video_codec: :h264
})
|> via_out(Pad.ref(:output, :video_track), options: [kind: :video])
|> via_in(Pad.ref(:input, :video_track), options: [kind: :video])
|> child(:webrtc_sink, %WebRTC.Sink{
signaling: {:websocket, port: 8831},
video_codec: :h264
}),
get_child(:webrtc_source)
|> via_out(Pad.ref(:output, :audio_track), options: [kind: :audio])
|> child(Membrane.Opus.Parser)
|> via_in(Pad.ref(:input, :audio_track), options: [kind: :audio])
|> get_child(:webrtc_sink)
]
{[spec: spec], %{webrtc_sink_pads_with_eos: []}}
end
@impl true
def handle_element_end_of_stream(pad, :webrtc_sink, _ctx, state) do
state = Map.update!(state, :webrtc_sink_pads_with_eos, &[pad | &1])
if length(state.webrtc_sink_pads_with_eos) >= 2 do
{[terminate: :normal], state}
else
{[], state}
end
end
def handle_element_end_of_stream(_pad, _child, _ctx, state), do: {[], state}
end
{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(LiveStyleTransferPipeline, [])
ref = Process.monitor(supervisor)
:inets.start()
:inets.start(:httpd,
bind_address: ~c"localhost",
port: 8000,
document_root: ~c"#{__DIR__}/../../assets/browser_to_browser",
server_name: ~c"webrtc",
server_root: "/tmp"
)
receive do
{:DOWN, ^ref, _process, _pid, _reason} -> :ok
end
:inets.terminate(:httpd, ref)