A: intro
Logger.configure(level: :info)
# All necessary dependencies are installed by installing the package below
Mix.install([
{:workshop_elixir_conf_us_2024, path: Path.join(__DIR__, "../..")}
])
Setup the Livebook
If you run Livebook locally, get the value of your PATH
env by running $ echo $PATH
. Then, go to Livebook Settings
(from the Livebook home page) and then to Enviroment variables
and set PATH
to the value you got from the command line.
Get node
Value returned by Node.self()
can be used to get the metrics from the running pipelines.
Go to livebooks/metrics.livemd
to visualize them.
Node.self()
Introduction
Pipeline is an Elixir process that manages Membrane Elements so that they cooperate to the process multimedia data.
To make the module a Pipeline, add the line use Membrane.Pipeline
to it.
Pipelines implement Membrane callbacks. Each callback receives the following arguments:
- The pipeline state - which can be modified (as in GenServer).
- The context - which is generated by the framework and contains various information, including about Pipeline’s Elements.
Sometimes also some other arguments, depending on the callback.
Each callback must return a two-element tuple {actions, state}
, where
-
actions
is the list of actions the framework will execute (docs). -
state
is the (possibly modified) state of the element, which will be passed to the next invocation of any callback.
Elements handle the actual data processing.
For a module to become an Element, it must use
Membrane.Filter
, Membrane.Source
, Membrane.Sink
or Membrane.Endpoint
.
Just like Pipelines, Elements can implement callbacks, which must return a tuple {actions, state}
.
Elements have output and input pads
and can be linked with each other using them: if an element has an output pad, it can be linked with another element that has an input pad.
Elements send buffers with data via output pads using actions. When an element sends a buffer via an output pad, the linked element receives this buffer from the respective input pad.
Elements can have zero, one, or multiple input and output pads (take a look at the pictures below)
Before sending the first data buffer, Elements must send a stream format
using the :stream_format
action (docs).
stream format
describes the data that will be contained in the buffers. For example, it may specify that each buffer will contain one RGB video frame with dimensions 1920p x 1080p.
stream format
must match the accepted_format
field in the definition of the pad through which it was sent.
Example
Here we have an example of a Membrane pipeline that transcodes and transmuxes video from an MP4 container with video encoded with the H264 codec to an MKV container with video encoded with the VP9 codec.
defmodule TransmuxingPipeline do
use Membrane.Pipeline
# `use Membrane.Pipeline` provides default implementations of Membrane callbacks,
# that can be overriden by writing custom implementation, as in this example.
# The default implementations almost always don't do anything.
# This callback is invoked on the Pipeline start
@impl true
def handle_init(_ctx, _options) do
priv = "#{__DIR__}/../../priv/" |> Path.expand()
mp4_path = Path.join(priv, "fixtures/bunny_without_sound.mp4")
mkv_path = Path.join(priv, "outputs/bunny_without_sound.mkv")
spec = [
child(:source, %Membrane.File.Source{location: mp4_path})
|> child(:mp4_demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(:output, options: [kind: :video])
|> child(:h264_parser, %Membrane.H264.Parser{output_stream_structure: :annexb})
|> child(:h264_decoder, Membrane.H264.FFmpeg.Decoder)
|> child(:vp9_encoder, Membrane.VP9.Encoder)
|> child(:matroska_muxer, Membrane.Matroska.Muxer)
|> child(:file_sink, %Membrane.File.Sink{location: mkv_path})
]
{[spec: spec], %{}}
end
# This callback is invoked, when `:file_sink` receives `end of stream` on the input pad
@impl true
def handle_element_end_of_stream(:file_sink, _input, _ctx, state) do
{[terminate: :normal], state}
end
# Because we overrode the default implementation of callback
# `handle_element_end_of_stream` and the definition above matches only one of the
# Elements spawned by the Pipeline, we heve to add implementation for the rest
# of the elements. Otherwise, lack of it would lead to error.
@impl true
def handle_element_end_of_stream(_element, _input, _ctx, state), do: {[], state}
end
Run the Example
Cell below runs the pipeline and waits until it finishes its processing.
{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(TransmuxingPipeline)
ref = Process.monitor(supervisor)
receive do
{:DOWN, ^ref, _process, _pid, _reason} -> :ok
end
If execution of the cell above takes too long, change the source file name from bunny_without_sound.mp4
on bunny_without_sound_short.mp4
. You can do it in the rest of the exercises as well.
If you don’t run livebook on your machine, download the result video with this button:
Kino.Download.new(fn -> File.read!("#{__DIR__}/../../priv/outputs/bunny_without_sound.mkv") end,
label: "Download the video",
filename: "bunny_without_sound.mkv"
)
Now, you can run the output file with the following command:
$ ffplay workshop_elixir_conf_us_2024/priv/outputs/bunny_without_sound.mkv
Exercise A1: Scaling of a video
Add Membrane.FFmpeg.SWScale.Converter
to the pipeline above and set the video width to 640 pixels. Documentation might be helpful.
> How long did the pipeline work before adding an element reducing the video size? What might be causing it?
Exercise A2: Add sound
priv/fixtures/bunny_without_sound.mp4
is an MP4 container that has only one video track. priv/fixtures/bunny_with_sound.mp4
is an MP4 container that has two tracks: one audio and one video.
Modify the pipeline in the following way:
-
use
priv/fixtures/bunny_with_sound.mp4
as a source file instead ofpriv/fixtures/bunny_without_sound.mp4
-
save the output of your pipeline in
priv/outputs/bunny_with_sound.mkv
-
modify the children specification returned by the
handle_init/2
callback to support audio track as well, so that the output file contains audio from the source file.
Exercise A3*: Revert colors
Write your own element reverting colors and add it to the pipeline. Adding Membrane.FFmpeg.SWScale.Converter
before and after your custom filter might be helpful. Use the element template below:
defmodule ColorReverter do
use Membrane.Filter
alias Membrane.RawVideo
def_input_pad(:input, accepted_format: %RawVideo{pixel_format: :RGB})
def_output_pad(:output, accepted_format: %RawVideo{pixel_format: :RGB})
# callbacks implementation
end
defmodule ColorRevertingPipeline do
use Membrane.Pipeline
@impl true
def handle_init(_ctx, _options) do
priv = "#{__DIR__}/../../priv/" |> Path.expand()
mp4_path = Path.join(priv, "fixtures/bunny_with_sound.mp4")
mkv_path = Path.join(priv, "outputs/bunny_with_reverted_colors.mkv")
spec = [
child(:source, %Membrane.File.Source{location: mp4_path})
|> child(:mp4_demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(:output, options: [kind: :video])
|> child(:h264_parser_1, %Membrane.H264.Parser{output_stream_structure: :annexb})
|> child(:h264_decoder, Membrane.H264.FFmpeg.Decoder)
# |> ...
|> child(:h264_encoder, %Membrane.H264.FFmpeg.Encoder{preset: :fast})
|> child(:h264_parser_2, %Membrane.H264.Parser{output_stream_structure: :avc1})
|> child(:matroska_muxer, Membrane.Matroska.Muxer)
|> child(:file_sink, %Membrane.File.Sink{location: mkv_path})
]
{[spec: spec], %{}}
end
@impl true
def handle_element_end_of_stream(:file_sink, _input, _ctx, state) do
{[terminate: :normal], state}
end
@impl true
def handle_element_end_of_stream(_element, _input, _ctx, state), do: {[], state}
end
Now run the pipeline:
{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(ColorRevertingPipeline)
ref = Process.monitor(supervisor)
receive do
{:DOWN, ^ref, _process, _pid, _reason} -> :ok
end
Kino.Download.new(
fn -> File.read!("#{__DIR__}/../../priv/outputs/bunny_with_reverted_colors.mkv") end,
label: "Download the video",
filename: "bunny_with_reverted_colors.mkv"
)
Play the output file with the command below:
$ ffplay workshop_elixir_conf_us_2024/priv/outputs/bunny_with_reverted_colors.mkv