Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Membrane and Kino

articles/membrane-and-kino.livemd

Membrane and Kino

Mix.install([
  {:kino, "~> 0.5.2"},
  {:membrane_h264_ffmpeg_plugin, "~> 0.15.0"},
  {:membrane_hackney_plugin, "~> 0.6.0"},
  {:turbojpeg, github: "chitacan/elixir-turbojpeg", branch: "update-deps"}
])

Intro

membrane_h264_ffmpeg_plugin, membrane_hackney_plugin 을 사용해 h264 포맷의 파일을 디코딩하는 membrane pipeline 을 만듭니다. pipeline 의 sink 는 디코딩된 I420 포맷의 프레임을 jpeg 로 변경해서 kino frame 으로 렌더링합니다.

graph LR;
  a[("Video")] -->|http| b["Source(Hackney)"];
  b --> c["FFmpeg Parser"];
  c --> d["FFmpeg Decoder"];
  d --> e["Sink(I420 ➡️ jpeg)"];
  e -->|Kino| f(("LivebookKino.Frame"));

Setup

turbojpeg 가 사용하는 unifex, bundlex 의존성의 버전과 membrane_h264_ffmpeg_plugin 이 사용하는 버전이 달라 turbojpeg, membrane_h264_ffmpeg_plugin 를 함께 설치하는 것이 불가능합니다.

낮은 버전을 사용하는 turbojpegunifex, bundlex 의 버전을 올리고 약간의 패치를 추가했습니다.

Result

jpeg 로 변환된 프레임을 렌더링할 Kino Frame 을 선언합니다.

frame_widget = Kino.Frame.new()

pipeline 을 컨트롤할 UI 를 선언합니다. Play == true 상태가 되면 새로운 pipeline 프로세스를 생성하고 재생을 요청합니다. 재생이 시작되면, 직전 셀에서 선언한 frame_widget 에 비디오 프레임이 표시됩니다. Play == false 가 되면 pipeline 을 종료합니다.

control_widget =
  Kino.Control.form(
    [
      play: Kino.Input.checkbox("Play")
    ],
    report_changes: true
  )

Pipeline

membrane sink element module 을 선언합니다.

대부분의 코드는 elixir-turbojpeg 의 sink 구현과 비슷합니다. handle_write/4 콜백에서 Kino Frame 으로 렌더링하도록 변경했습니다.

defmodule KinoSink do
  use Membrane.Sink
  alias Membrane.{Buffer, Time}
  alias Membrane.Caps.Video.Raw

  def_input_pad(:input, caps: Raw, demand_unit: :buffers)

  def_options(
    kino_widget: [spec: any(), description: ""],
    quality: [type: :integer, description: "Jpeg encoding quality"]
  )

  @impl true
  def handle_init(options) do
    state = %{
      quality: options.quality,
      height: nil,
      width: nil,
      format: nil,
      kino_widget: options.kino_widget,
      timer_started?: false
    }

    {:ok, state}
  end

  @impl true
  def handle_start_of_stream(:input, ctx, state) do
    use Ratio
    {nom, denom} = ctx.pads.input.caps.framerate
    timer = {:demand_timer, Time.seconds(denom) <|> nom}

    {{:ok, demand: :input, start_timer: timer}, %{state | timer_started?: true}}
  end

  @impl true
  def handle_end_of_stream(:input, _ctx, state) do
    {{:ok, stop_timer: :demand_timer}, %{state | timer_started?: false}}
  end

  @impl true
  def handle_caps(:input, caps, ctx, state) do
    %{input: input} = ctx.pads

    if !input.caps || caps == input.caps do
      {:ok, %{state | width: caps.width, height: caps.height, format: caps.format}}
    else
      raise "Caps have changed while playing. This is not supported."
    end
  end

  @impl true
  def handle_write(:input, %Buffer{payload: payload}, _ctx, state) do
    with {:ok, data} <-
           Turbojpeg.yuv_to_jpeg(payload, state.width, state.height, state.quality, state.format),
         Kino.Frame.render(state.kino_widget, Kino.Image.new(data, "image/jpeg")) do
      {:ok, state}
    else
      {:error, _} = error ->
        {error, state}
    end
  end

  @impl true
  def handle_tick(:demand_timer, _ctx, state) do
    {{:ok, demand: :input}, state}
  end
end

membrane pipeline 을 선언합니다.

https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s_720x480.h264 경로의 비디오 파일을 Hackney 모듈로 가져와 I420 포맷으로 디코딩합니다. 디코딩 된 프레임은 turbojpeg 를 사용해 jpeg 로 변환하고 25fps 의 속도로 Livebook 에 선언된 Kino Frame 을 업데이트 합니다.

defmodule Pipeline do
  use Membrane.Pipeline

  alias Membrane.{H264.FFmpeg, Hackney}

  @impl true
  def handle_init(frame) do
    children = [
      hackney: %Hackney.Source{
        location:
          "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s_720x480.h264"
      },
      parser: %FFmpeg.Parser{framerate: {25, 1}},
      decoder: FFmpeg.Decoder,
      sink: %KinoSink{kino_widget: frame, quality: 100}
    ]

    links = [
      link(:hackney)
      |> to(:parser)
      |> to(:decoder)
      |> to(:sink)
    ]

    {{:ok, spec: %ParentSpec{children: children, links: links}}, %{}}
  end
end

pipeline 프로세스를 관리할 Controller 프로세스를 선언합니다. control_widget 의 입력은 handle_info/2 콜백에서 처리합니다.

defmodule Controller do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  def init({frame_widget, control_widget}) do
    Kino.Control.subscribe(control_widget, :control)
    {:ok, %{frame_widget: frame_widget, pipeline: nil}}
  end

  def handle_info({:control, %{data: %{play: false}}}, state) do
    if not is_nil(state.pipeline) and Process.alive?(state.pipeline) do
      Pipeline.stop_and_terminate(state.pipeline)
    end

    {:noreply, state}
  end

  def handle_info({:control, %{data: %{play: true}}}, state) do
    {:ok, pid} = Pipeline.start_link(state.frame_widget)
    Pipeline.play(pid)
    {:noreply, %{state | pipeline: pid}}
  end
end
Kino.start_child({Controller, {frame_widget, control_widget}})