RTMP Sender
File.cd(__DIR__)
Logger.configure(level: :error)
Mix.install([
{:membrane_core, "~> 1.0"},
{:membrane_realtimer_plugin, "~> 0.9.0"},
{:membrane_hackney_plugin, "~> 0.11.0"},
{:membrane_rtmp_plugin, "~> 0.23.3"}
])
Description
Defines a pipeline downloading Big Buck Bunny trailer video and audio from Membranes’ asset samples page using the Hackney plugin, and sending it via RTMP to the other Livebook.
Pipeline definition
To download media from the internet, we use Hackney. We then convert the raw data to the appropriate format, such as H264 or AAC. We also regulate the download speed to avoid overloading the system and ensure that the media is added to the MP4 container to comply with RTMP.Sink requirements. Finally, we transmit both video and audio using RTMP to the other livebook. Once the entire stream has been sent, the pipeline will automatically terminate.
defmodule RTMP.Sender.Pipeline do
use Membrane.Pipeline
@samples_url "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/"
@video_url @samples_url <> "bun33s_480x270.h264"
@audio_url @samples_url <> "bun33s.aac"
@impl true
def handle_init(_ctx, destination: destination) do
video_source =
child(:video_source, %Membrane.Hackney.Source{
location: @video_url,
hackney_opts: [follow_redirect: true]
})
|> child(:video_parser, %Membrane.H264.Parser{
output_alignment: :au,
skip_until_keyframe: true,
generate_best_effort_timestamps: %{framerate: {25, 1}}
})
|> child(:video_realtimer, Membrane.Realtimer)
|> child(:video_payloader, %Membrane.H264.Parser{output_stream_structure: :avc1})
audio_source =
child(:audio_source, %Membrane.Hackney.Source{
location: @audio_url,
hackney_opts: [follow_redirect: true]
})
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :ADTS
})
|> child(:audio_realtimer, Membrane.Realtimer)
rtmp_sink =
child(:rtmp_sink, %Membrane.RTMP.Sink{
rtmp_url: destination,
max_attempts: :infinity
})
spec = [
video_source
|> via_in(Pad.ref(:video, 0))
|> get_child(:rtmp_sink),
audio_source
|> via_in(Pad.ref(:audio, 0))
|> get_child(:rtmp_sink),
rtmp_sink
]
{[spec: spec], %{streams_to_end: 2}}
end
# The rest of the example module is only used for self-termination of the pipeline after processing finishes
@impl true
def handle_element_end_of_stream(:rtmp_sink, _pad, _ctx, %{streams_to_end: 1} = state) do
Membrane.Pipeline.terminate(self())
{[], %{state | streams_to_end: 0}}
end
@impl true
def handle_element_end_of_stream(:rtmp_sink, _pad, _ctx, state) do
{[], %{state | streams_to_end: 1}}
end
@impl true
def handle_element_end_of_stream(_child, _pad, _ctx, state) do
{[], state}
end
end
:ok
Sender
RTMP protocol requires a client-server communication, where the TCP server receives the data and the client sends it.
defmodule RTMP.Sender do
def run(port: port) do
destination_url = "rtmp://localhost:#{port}"
{:ok, pipeline} = start_tcp_client(destination_url)
await_termination(pipeline)
end
defp start_tcp_client(destination_url) do
{:ok, _supervisor, pipeline} =
Membrane.Pipeline.start(RTMP.Sender.Pipeline, destination: destination_url)
{:ok, pipeline}
end
defp await_termination(pipeline) do
monitor_ref = Process.monitor(pipeline)
receive do
{:DOWN, ^monitor_ref, :process, _pid, _reason} ->
:ok
end
end
end
:ok
port = 1942
RTMP.Sender.run(port: port)