Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Messages source and sink

messages_source_and_sink.livemd

Messages source and sink

File.cd(__DIR__)
Logger.configure(level: :error)

Mix.install([
  {:membrane_core, "~> 1.0"}
])

Erlang messages driven source

defmodule MessageSource do
  use Membrane.Source

  require Membrane.Logger

  def_output_pad :output,
    flow_control: :push,
    accepted_format: _any

  def_options register_name: [
                description: "The name under which the element's process will be registered",
                spec: atom()
              ]
  

  @impl true
  def handle_init(_ctx, opts) do
    Process.register(self(), opts.register_name)
    {[], %{buffered: []}}
  end

  @impl true
  def handle_playing(_ctx, state) do
    {actions, state} = send_buffers(state)
    {[stream_format: {:output, %Membrane.RemoteStream{type: :bytestream}}] ++ actions, state}
  end

  @impl true
  def handle_info({:message, message}, ctx, state) do
    state = %{state | buffered: state.buffered ++ [message]}

    if ctx.playback == :playing do
      send_buffers(state)
    else
      {[], state}
    end
  end

  @impl true
  def handle_info(msg, _ctx, state) do
    Membrane.Logger.warning("Unknown message received: #{inspect(msg)}")
    {[], state}
  end

  defp send_buffers(state) do
    actions =
      Enum.map(state.buffered, fn message ->
        {:buffer, {:output, %Membrane.Buffer{payload: message}}}
      end)

    {actions, %{state | buffered: []}}
  end
end

Erlang messages driven sink

defmodule MessageSink do
  use Membrane.Sink

  def_input_pad :input,
    flow_control: :push,
    accepted_format: _any

  def_options receiver: [
                description: "PID of the process that will receive messages from the sink",
                spec: pid()
              ]

  @impl true
  def handle_init(_ctx, opts) do
    {[], %{receiver: opts.receiver}}
  end

  @impl true
  def handle_buffer(:input, buffer, _ctx, state) do
    send(state.receiver, {:message, self(), buffer.payload})
    {[], state}
  end
end

Pipeline definition and startup

alias Membrane.RCPipeline
import Membrane.ChildrenSpec

defmodule MyPipeline do
  use Membrane.Pipeline

  @impl true
  def handle_init(_ctx, opts) do
    spec =
      child(:source, %MessageSource{register_name: :messages_source})
      |> child(:sink, %MessageSink{receiver: Keyword.get(opts, :receiver)})

    {[spec: spec], nil}
  end
end

{:ok, _supervisor, pipeline} = Membrane.Pipeline.start(MyPipeline, receiver: self())
payloads = 1..10

Task.async(fn ->
  Enum.each(
    payloads,
    &send(:messages_source, {:message, &1})
  )
end)

:ok

Printing of the messages received and pipeline termination

for _i <- 1..10 do
  receive do
    {:message, _pid, _value} = msg -> IO.inspect(msg)
  end
end

RCPipeline.terminate(pipeline)