RTMP Receiver
File.cd(__DIR__)
Logger.configure(level: :error)
Mix.install([
{:membrane_core, "~> 1.0"},
{:membrane_realtimer_plugin, "~> 0.9.0"},
{:membrane_rtmp_plugin, "~> 0.21.0"},
{:membrane_kino_plugin, github: "membraneframework-labs/membrane_kino_plugin", tag: "v0.3.1"}
])
Description
Defines a server that receives a media stream from the RTMP source and plays it directly in the notebook.
Pipeline definition
Here’s the definition of the pipeline:
- The RTMP source provides video and audio data.
- The data is then parsed into suitable H264 and AAC formats.
- Finally, the media is pushed to Kino.Player.
defmodule RTMP.Receiver.Pipeline do
use Membrane.Pipeline
@impl true
def handle_init(_ctx, socket: socket, kino: kino) do
source =
child(:source, %Membrane.RTMP.SourceBin{
socket: socket
})
playing_audio =
get_child(:source)
|> via_out(:audio)
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :ADTS
})
|> via_in(:audio)
|> get_child(:player)
playing_video =
get_child(:source)
|> via_out(:video)
|> child(:video_parser, %Membrane.H264.Parser{
generate_best_effort_timestamps: %{framerate: {25, 1}},
output_stream_structure: :annexb
})
|> via_in(:video)
|> get_child(:player)
player = child(:player, %Membrane.Kino.Player.Sink{kino: kino})
spec = [source, playing_audio, playing_video, player]
{[spec: spec], %{}}
end
# Once the source initializes, we grant it the control over the tcp socket
@impl true
def handle_child_notification(
{:socket_control_needed, _socket, _source} = notification,
:source,
_ctx,
state
) do
send(self(), notification)
{[], state}
end
def handle_child_notification(_notification, _child, _ctx, state) do
{[], state}
end
@impl true
def handle_info({:socket_control_needed, socket, source} = notification, _ctx, state) do
case Membrane.RTMP.SourceBin.pass_control(socket, source) do
:ok ->
:ok
{:error, :not_owner} ->
Process.send_after(self(), notification, 200)
end
{[], state}
end
# The rest of the module is used for self-termination of the pipeline after processing finishes
@impl true
def handle_element_end_of_stream(:sink, _pad, _ctx, state) do
Membrane.Pipeline.terminate(self())
{[], state}
end
@impl true
def handle_element_end_of_stream(_child, _pad, _ctx, state) do
{[], state}
end
end
:ok
Server
Receiving an RTMP stream requires creating a TCP server. After the connection is established, a pipeline is created using the TCP socket.
defmodule RTMP.Receiver do
@server_ip {127, 0, 0, 1}
def run(port: port, kino: kino) do
parent = self()
server_options = %Membrane.RTMP.Source.TcpServer{
port: port,
listen_options: [
:binary,
packet: :raw,
active: false,
ip: @server_ip
],
socket_handler: fn socket ->
# On new connection a pipeline is started
{:ok, _supervisor, pipeline} =
Membrane.Pipeline.start(RTMP.Receiver.Pipeline, socket: socket, kino: kino)
send(parent, {:pipeline_spawned, pipeline})
{:ok, pipeline}
end
}
{:ok, pipeline} = start_server(server_options)
await_termination(pipeline)
end
defp start_server(server_options) do
{:ok, _server_pid} = Membrane.RTMP.Source.TcpServer.start_link(server_options)
receive do
{:pipeline_spawned, pipeline} ->
{:ok, pipeline}
end
end
defp await_termination(pipeline) do
monitor_ref = Process.monitor(pipeline)
receive do
{:DOWN, ^monitor_ref, :process, _pid, _reason} ->
:ok
end
end
end
:ok
port = 1942
kino = Membrane.Kino.Player.new(video: true, audio: true)
Kino.render(kino)
RTMP.Receiver.run(port: port, kino: kino)