Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

RTMP Receiver

livebooks/rtmp/rtmp_receiver.livemd

RTMP Receiver

File.cd(__DIR__)
Logger.configure(level: :error)

Mix.install([
  {:membrane_core, "~> 1.0"},
  {:membrane_realtimer_plugin, "~> 0.9.0"},
  {:membrane_rtmp_plugin, "~> 0.21.0"},
  {:membrane_kino_plugin, github: "membraneframework-labs/membrane_kino_plugin", tag: "v0.3.1"}
])

Description

Defines a server that receives a media stream from the RTMP source and plays it directly in the notebook.

Pipeline definition

Here’s the definition of the pipeline:

  1. The RTMP source provides video and audio data.
  2. The data is then parsed into suitable H264 and AAC formats.
  3. Finally, the media is pushed to Kino.Player.
defmodule RTMP.Receiver.Pipeline do
  use Membrane.Pipeline

  @impl true
  def handle_init(_ctx, socket: socket, kino: kino) do
    source =
      child(:source, %Membrane.RTMP.SourceBin{
        socket: socket
      })

    playing_audio =
      get_child(:source)
      |> via_out(:audio)
      |> child(:audio_parser, %Membrane.AAC.Parser{
        out_encapsulation: :ADTS
      })
      |> via_in(:audio)
      |> get_child(:player)

    playing_video =
      get_child(:source)
      |> via_out(:video)
      |> child(:video_parser, %Membrane.H264.Parser{
        generate_best_effort_timestamps: %{framerate: {25, 1}},
        output_stream_structure: :annexb
      })
      |> via_in(:video)
      |> get_child(:player)

    player = child(:player, %Membrane.Kino.Player.Sink{kino: kino})

    spec = [source, playing_audio, playing_video, player]
    {[spec: spec], %{}}
  end

  # Once the source initializes, we grant it the control over the tcp socket
  @impl true
  def handle_child_notification(
        {:socket_control_needed, _socket, _source} = notification,
        :source,
        _ctx,
        state
      ) do
    send(self(), notification)

    {[], state}
  end

  def handle_child_notification(_notification, _child, _ctx, state) do
    {[], state}
  end

  @impl true
  def handle_info({:socket_control_needed, socket, source} = notification, _ctx, state) do
    case Membrane.RTMP.SourceBin.pass_control(socket, source) do
      :ok ->
        :ok

      {:error, :not_owner} ->
        Process.send_after(self(), notification, 200)
    end

    {[], state}
  end

  # The rest of the module is used for self-termination of the pipeline after processing finishes
  @impl true
  def handle_element_end_of_stream(:sink, _pad, _ctx, state) do
    Membrane.Pipeline.terminate(self())
    {[], state}
  end

  @impl true
  def handle_element_end_of_stream(_child, _pad, _ctx, state) do
    {[], state}
  end
end

:ok

Server

Receiving an RTMP stream requires creating a TCP server. After the connection is established, a pipeline is created using the TCP socket.

defmodule RTMP.Receiver do
  @server_ip {127, 0, 0, 1}

  def run(port: port, kino: kino) do
    parent = self()

    server_options = %Membrane.RTMP.Source.TcpServer{
      port: port,
      listen_options: [
        :binary,
        packet: :raw,
        active: false,
        ip: @server_ip
      ],
      socket_handler: fn socket ->
        # On new connection a pipeline is started
        {:ok, _supervisor, pipeline} =
          Membrane.Pipeline.start(RTMP.Receiver.Pipeline, socket: socket, kino: kino)

        send(parent, {:pipeline_spawned, pipeline})
        {:ok, pipeline}
      end
    }

    {:ok, pipeline} = start_server(server_options)

    await_termination(pipeline)
  end

  defp start_server(server_options) do
    {:ok, _server_pid} = Membrane.RTMP.Source.TcpServer.start_link(server_options)

    receive do
      {:pipeline_spawned, pipeline} ->
        {:ok, pipeline}
    end
  end

  defp await_termination(pipeline) do
    monitor_ref = Process.monitor(pipeline)

    receive do
      {:DOWN, ^monitor_ref, :process, _pid, _reason} ->
        :ok
    end
  end
end

:ok
port = 1942

kino = Membrane.Kino.Player.new(video: true, audio: true)
Kino.render(kino)
RTMP.Receiver.run(port: port, kino: kino)