A: intro
Logger.configure(level: :info)
# All necessary dependencies are installed by installing the package below
Mix.install([
{:workshop_elixir_conf_us_2024, path: Path.join(__DIR__, "../..")}
])
Setup the Livebook
If you run Livebook locally, get the value of your PATH env by running $ echo $PATH. Then, go to Livebook Settings (from the Livebook home page) and then to Enviroment variables and set PATH to the value you got from the command line.
Get node
Value returned by Node.self() can be used to get the metrics from the running pipelines.
Go to livebooks/metrics.livemd to visualize them.
Node.self()
Introduction
Pipeline is an Elixir process that manages Membrane Elements so that they cooperate to the process multimedia data.
To make the module a Pipeline, add the line use Membrane.Pipeline to it.
Pipelines implement Membrane callbacks. Each callback receives the following arguments:
- The pipeline state - which can be modified (as in GenServer).
- The context - which is generated by the framework and contains various information, including about Pipeline’s Elements.
Sometimes also some other arguments, depending on the callback.
Each callback must return a two-element tuple {actions, state}, where
-
actionsis the list of actions the framework will execute (docs). -
stateis the (possibly modified) state of the element, which will be passed to the next invocation of any callback.
Elements handle the actual data processing.
For a module to become an Element, it must use Membrane.Filter, Membrane.Source, Membrane.Sink or Membrane.Endpoint.
Just like Pipelines, Elements can implement callbacks, which must return a tuple {actions, state}.
Elements have output and input pads and can be linked with each other using them: if an element has an output pad, it can be linked with another element that has an input pad.
Elements send buffers with data via output pads using actions. When an element sends a buffer via an output pad, the linked element receives this buffer from the respective input pad.
Elements can have zero, one, or multiple input and output pads (take a look at the pictures below)
Before sending the first data buffer, Elements must send a stream format using the :stream_format action (docs).
stream format describes the data that will be contained in the buffers. For example, it may specify that each buffer will contain one RGB video frame with dimensions 1920p x 1080p.
stream format must match the accepted_format field in the definition of the pad through which it was sent.
Example
Here we have an example of a Membrane pipeline that transcodes and transmuxes video from an MP4 container with video encoded with the H264 codec to an MKV container with video encoded with the VP9 codec.
defmodule TransmuxingPipeline do
use Membrane.Pipeline
# `use Membrane.Pipeline` provides default implementations of Membrane callbacks,
# that can be overriden by writing custom implementation, as in this example.
# The default implementations almost always don't do anything.
# This callback is invoked on the Pipeline start
@impl true
def handle_init(_ctx, _options) do
priv = "#{__DIR__}/../../priv/" |> Path.expand()
mp4_path = Path.join(priv, "fixtures/bunny_without_sound.mp4")
mkv_path = Path.join(priv, "outputs/bunny_without_sound.mkv")
spec = [
child(:source, %Membrane.File.Source{location: mp4_path})
|> child(:mp4_demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(:output, options: [kind: :video])
|> child(:h264_parser, %Membrane.H264.Parser{output_stream_structure: :annexb})
|> child(:h264_decoder, Membrane.H264.FFmpeg.Decoder)
|> child(:vp9_encoder, Membrane.VP9.Encoder)
|> child(:matroska_muxer, Membrane.Matroska.Muxer)
|> child(:file_sink, %Membrane.File.Sink{location: mkv_path})
]
{[spec: spec], %{}}
end
# This callback is invoked, when `:file_sink` receives `end of stream` on the input pad
@impl true
def handle_element_end_of_stream(:file_sink, _input, _ctx, state) do
{[terminate: :normal], state}
end
# Because we overrode the default implementation of callback
# `handle_element_end_of_stream` and the definition above matches only one of the
# Elements spawned by the Pipeline, we heve to add implementation for the rest
# of the elements. Otherwise, lack of it would lead to error.
@impl true
def handle_element_end_of_stream(_element, _input, _ctx, state), do: {[], state}
end
Run the Example
Cell below runs the pipeline and waits until it finishes its processing.
{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(TransmuxingPipeline)
ref = Process.monitor(supervisor)
receive do
{:DOWN, ^ref, _process, _pid, _reason} -> :ok
end
If execution of the cell above takes too long, change the source file name from bunny_without_sound.mp4 on bunny_without_sound_short.mp4. You can do it in the rest of the exercises as well.
If you don’t run livebook on your machine, download the result video with this button:
Kino.Download.new(fn -> File.read!("#{__DIR__}/../../priv/outputs/bunny_without_sound.mkv") end,
label: "Download the video",
filename: "bunny_without_sound.mkv"
)
Now, you can run the output file with the following command:
$ ffplay workshop_elixir_conf_us_2024/priv/outputs/bunny_without_sound.mkv
Exercise A1: Scaling of a video
Add Membrane.FFmpeg.SWScale.Converter to the pipeline above and set the video width to 640 pixels. Documentation might be helpful.
> How long did the pipeline work before adding an element reducing the video size? What might be causing it?
Exercise A2: Add sound
priv/fixtures/bunny_without_sound.mp4 is an MP4 container that has only one video track. priv/fixtures/bunny_with_sound.mp4 is an MP4 container that has two tracks: one audio and one video.
Modify the pipeline in the following way:
-
use
priv/fixtures/bunny_with_sound.mp4as a source file instead ofpriv/fixtures/bunny_without_sound.mp4 -
save the output of your pipeline in
priv/outputs/bunny_with_sound.mkv -
modify the children specification returned by the
handle_init/2callback to support audio track as well, so that the output file contains audio from the source file.
Exercise A3*: Revert colors
Write your own element reverting colors and add it to the pipeline. Adding Membrane.FFmpeg.SWScale.Converter before and after your custom filter might be helpful. Use the element template below:
defmodule ColorReverter do
use Membrane.Filter
alias Membrane.RawVideo
def_input_pad(:input, accepted_format: %RawVideo{pixel_format: :RGB})
def_output_pad(:output, accepted_format: %RawVideo{pixel_format: :RGB})
# callbacks implementation
end
defmodule ColorRevertingPipeline do
use Membrane.Pipeline
@impl true
def handle_init(_ctx, _options) do
priv = "#{__DIR__}/../../priv/" |> Path.expand()
mp4_path = Path.join(priv, "fixtures/bunny_with_sound.mp4")
mkv_path = Path.join(priv, "outputs/bunny_with_reverted_colors.mkv")
spec = [
child(:source, %Membrane.File.Source{location: mp4_path})
|> child(:mp4_demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(:output, options: [kind: :video])
|> child(:h264_parser_1, %Membrane.H264.Parser{output_stream_structure: :annexb})
|> child(:h264_decoder, Membrane.H264.FFmpeg.Decoder)
# |> ...
|> child(:h264_encoder, %Membrane.H264.FFmpeg.Encoder{preset: :fast})
|> child(:h264_parser_2, %Membrane.H264.Parser{output_stream_structure: :avc1})
|> child(:matroska_muxer, Membrane.Matroska.Muxer)
|> child(:file_sink, %Membrane.File.Sink{location: mkv_path})
]
{[spec: spec], %{}}
end
@impl true
def handle_element_end_of_stream(:file_sink, _input, _ctx, state) do
{[terminate: :normal], state}
end
@impl true
def handle_element_end_of_stream(_element, _input, _ctx, state), do: {[], state}
end
Now run the pipeline:
{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(ColorRevertingPipeline)
ref = Process.monitor(supervisor)
receive do
{:DOWN, ^ref, _process, _pid, _reason} -> :ok
end
Kino.Download.new(
fn -> File.read!("#{__DIR__}/../../priv/outputs/bunny_with_reverted_colors.mkv") end,
label: "Download the video",
filename: "bunny_with_reverted_colors.mkv"
)
Play the output file with the command below:
$ ffplay workshop_elixir_conf_us_2024/priv/outputs/bunny_with_reverted_colors.mkv