Messages source and sink
File.cd(__DIR__)
Logger.configure(level: :error)
Mix.install([
{:membrane_core, "~> 1.0"}
])
Erlang messages driven source
defmodule MessageSource do
use Membrane.Source
require Membrane.Logger
def_output_pad :output,
flow_control: :push,
accepted_format: _any
def_options register_name: [
description: "The name under which the element's process will be registered",
spec: atom()
]
@impl true
def handle_init(_ctx, opts) do
Process.register(self(), opts.register_name)
{[], %{buffered: []}}
end
@impl true
def handle_playing(_ctx, state) do
{actions, state} = send_buffers(state)
{[stream_format: {:output, %Membrane.RemoteStream{type: :bytestream}}] ++ actions, state}
end
@impl true
def handle_info({:message, message}, ctx, state) do
state = %{state | buffered: state.buffered ++ [message]}
if ctx.playback == :playing do
send_buffers(state)
else
{[], state}
end
end
@impl true
def handle_info(msg, _ctx, state) do
Membrane.Logger.warning("Unknown message received: #{inspect(msg)}")
{[], state}
end
defp send_buffers(state) do
actions =
Enum.map(state.buffered, fn message ->
{:buffer, {:output, %Membrane.Buffer{payload: message}}}
end)
{actions, %{state | buffered: []}}
end
end
Erlang messages driven sink
defmodule MessageSink do
use Membrane.Sink
def_input_pad :input,
flow_control: :push,
accepted_format: _any
def_options receiver: [
description: "PID of the process that will receive messages from the sink",
spec: pid()
]
@impl true
def handle_init(_ctx, opts) do
{[], %{receiver: opts.receiver}}
end
@impl true
def handle_buffer(:input, buffer, _ctx, state) do
send(state.receiver, {:message, self(), buffer.payload})
{[], state}
end
end
Pipeline definition and startup
alias Membrane.RCPipeline
import Membrane.ChildrenSpec
defmodule MyPipeline do
use Membrane.Pipeline
@impl true
def handle_init(_ctx, opts) do
spec =
child(:source, %MessageSource{register_name: :messages_source})
|> child(:sink, %MessageSink{receiver: Keyword.get(opts, :receiver)})
{[spec: spec], nil}
end
end
{:ok, _supervisor, pipeline} = Membrane.Pipeline.start(MyPipeline, receiver: self())
payloads = 1..10
Task.async(fn ->
Enum.each(
payloads,
&send(:messages_source, {:message, &1})
)
end)
:ok
Printing of the messages received and pipeline termination
for _i <- 1..10 do
receive do
{:message, _pid, _value} = msg -> IO.inspect(msg)
end
end
RCPipeline.terminate(pipeline)