Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

RTMP Sender

livebooks/rtmp/rtmp_sender.livemd

RTMP Sender

File.cd(__DIR__)
Logger.configure(level: :error)

Mix.install([
  {:membrane_core, "~> 1.0"},
  {:membrane_realtimer_plugin, "~> 0.9.0"},
  {:membrane_hackney_plugin, "~> 0.11.0"},
  {:membrane_rtmp_plugin, "~> 0.21.0"}
])

Description

Defines a pipeline downloading Big Buck Bunny trailer video and audio from Membranes’ asset samples page using the Hackney plugin, and sending it via RTMP to the other Livebook.

Pipeline definition

To download media from the internet, we use Hackney. We then convert the raw data to the appropriate format, such as H264 or AAC. We also regulate the download speed to avoid overloading the system and ensure that the media is added to the MP4 container to comply with RTMP.Sink requirements. Finally, we transmit both video and audio using RTMP to the other livebook. Once the entire stream has been sent, the pipeline will automatically terminate.

defmodule RTMP.Sender.Pipeline do
  use Membrane.Pipeline

  @samples_url "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/"
  @video_url @samples_url <> "bun33s_480x270.h264"
  @audio_url @samples_url <> "bun33s.aac"
  @impl true
  def handle_init(_ctx, destination: destination) do
    video_source =
      child(:video_source, %Membrane.Hackney.Source{
        location: @video_url,
        hackney_opts: [follow_redirect: true]
      })
      |> child(:video_parser, %Membrane.H264.Parser{
        output_alignment: :au,
        skip_until_keyframe: true,
        generate_best_effort_timestamps: %{framerate: {25, 1}}
      })
      |> child(:video_realtimer, Membrane.Realtimer)
      |> child(:video_payloader, %Membrane.H264.Parser{output_stream_structure: :avc1})

    audio_source =
      child(:audio_source, %Membrane.Hackney.Source{
        location: @audio_url,
        hackney_opts: [follow_redirect: true]
      })
      |> child(:audio_parser, %Membrane.AAC.Parser{
        out_encapsulation: :ADTS
      })
      |> child(:audio_realtimer, Membrane.Realtimer)

    rtmp_sink =
      child(:rtmp_sink, %Membrane.RTMP.Sink{
        rtmp_url: destination,
        max_attempts: :infinity
      })

    spec = [
      video_source
      |> via_in(Pad.ref(:video, 0))
      |> get_child(:rtmp_sink),
      audio_source
      |> via_in(Pad.ref(:audio, 0))
      |> get_child(:rtmp_sink),
      rtmp_sink
    ]

    {[spec: spec], %{streams_to_end: 2}}
  end

  # The rest of the example module is only used for self-termination of the pipeline after processing finishes

  @impl true
  def handle_element_end_of_stream(:rtmp_sink, _pad, _ctx, %{streams_to_end: 1} = state) do
    Membrane.Pipeline.terminate(self())
    {[], %{state | streams_to_end: 0}}
  end

  @impl true
  def handle_element_end_of_stream(:rtmp_sink, _pad, _ctx, state) do
    {[], %{state | streams_to_end: 1}}
  end

  @impl true
  def handle_element_end_of_stream(_child, _pad, _ctx, state) do
    {[], state}
  end
end

:ok

Sender

RTMP protocol requires a client-server communication, where the TCP server receives the data and the client sends it.

defmodule RTMP.Sender do
  def run(port: port) do
    destination_url = "rtmp://localhost:#{port}"

    {:ok, pipeline} = start_tcp_client(destination_url)

    await_termination(pipeline)
  end

  defp start_tcp_client(destination_url) do
    {:ok, _supervisor, pipeline} =
      Membrane.Pipeline.start(RTMP.Sender.Pipeline, destination: destination_url)

    {:ok, pipeline}
  end

  defp await_termination(pipeline) do
    monitor_ref = Process.monitor(pipeline)

    receive do
      {:DOWN, ^monitor_ref, :process, _pid, _reason} ->
        :ok
    end
  end
end

:ok
port = 1942

RTMP.Sender.run(port: port)