Membrane and Kino
Mix.install([
{:kino, "~> 0.5.2"},
{:membrane_h264_ffmpeg_plugin, "~> 0.15.0"},
{:membrane_hackney_plugin, "~> 0.6.0"},
{:turbojpeg, github: "chitacan/elixir-turbojpeg", branch: "update-deps"}
])
Intro
membrane_h264_ffmpeg_plugin
, membrane_hackney_plugin
을 사용해 h264 포맷의 파일을 디코딩하는 membrane pipeline 을 만듭니다.
pipeline 의 sink 는 디코딩된 I420 포맷의 프레임을 jpeg 로 변경해서 kino frame 으로 렌더링합니다.
graph LR;
a[("Video")] -->|http| b["Source(Hackney)"];
b --> c["FFmpeg Parser"];
c --> d["FFmpeg Decoder"];
d --> e["Sink(I420 ➡️ jpeg)"];
e -->|Kino| f(("LivebookKino.Frame"));
Setup
turbojpeg
가 사용하는 unifex
, bundlex
의존성의 버전과 membrane_h264_ffmpeg_plugin
이 사용하는 버전이 달라 turbojpeg
, membrane_h264_ffmpeg_plugin
를 함께 설치하는 것이 불가능합니다.
낮은 버전을 사용하는 turbojpeg
의 unifex
, bundlex
의 버전을 올리고 약간의 패치를 추가했습니다.
Result
jpeg 로 변환된 프레임을 렌더링할 Kino Frame 을 선언합니다.
frame_widget = Kino.Frame.new()
pipeline 을 컨트롤할 UI 를 선언합니다. Play == true
상태가 되면 새로운 pipeline 프로세스를 생성하고 재생을 요청합니다. 재생이 시작되면, 직전 셀에서 선언한 frame_widget
에 비디오 프레임이 표시됩니다.
Play == false
가 되면 pipeline 을 종료합니다.
control_widget =
Kino.Control.form(
[
play: Kino.Input.checkbox("Play")
],
report_changes: true
)
Pipeline
membrane sink element module 을 선언합니다.
대부분의 코드는 elixir-turbojpeg 의 sink 구현과 비슷합니다. handle_write/4
콜백에서 Kino Frame 으로 렌더링하도록 변경했습니다.
defmodule KinoSink do
use Membrane.Sink
alias Membrane.{Buffer, Time}
alias Membrane.Caps.Video.Raw
def_input_pad(:input, caps: Raw, demand_unit: :buffers)
def_options(
kino_widget: [spec: any(), description: ""],
quality: [type: :integer, description: "Jpeg encoding quality"]
)
@impl true
def handle_init(options) do
state = %{
quality: options.quality,
height: nil,
width: nil,
format: nil,
kino_widget: options.kino_widget,
timer_started?: false
}
{:ok, state}
end
@impl true
def handle_start_of_stream(:input, ctx, state) do
use Ratio
{nom, denom} = ctx.pads.input.caps.framerate
timer = {:demand_timer, Time.seconds(denom) <|> nom}
{{:ok, demand: :input, start_timer: timer}, %{state | timer_started?: true}}
end
@impl true
def handle_end_of_stream(:input, _ctx, state) do
{{:ok, stop_timer: :demand_timer}, %{state | timer_started?: false}}
end
@impl true
def handle_caps(:input, caps, ctx, state) do
%{input: input} = ctx.pads
if !input.caps || caps == input.caps do
{:ok, %{state | width: caps.width, height: caps.height, format: caps.format}}
else
raise "Caps have changed while playing. This is not supported."
end
end
@impl true
def handle_write(:input, %Buffer{payload: payload}, _ctx, state) do
with {:ok, data} <-
Turbojpeg.yuv_to_jpeg(payload, state.width, state.height, state.quality, state.format),
Kino.Frame.render(state.kino_widget, Kino.Image.new(data, "image/jpeg")) do
{:ok, state}
else
{:error, _} = error ->
{error, state}
end
end
@impl true
def handle_tick(:demand_timer, _ctx, state) do
{{:ok, demand: :input}, state}
end
end
membrane pipeline 을 선언합니다.
https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s_720x480.h264 경로의 비디오 파일을 Hackney 모듈로 가져와 I420 포맷으로 디코딩합니다. 디코딩 된 프레임은 turbojpeg
를 사용해 jpeg 로 변환하고 25fps 의 속도로 Livebook 에 선언된 Kino Frame 을 업데이트 합니다.
defmodule Pipeline do
use Membrane.Pipeline
alias Membrane.{H264.FFmpeg, Hackney}
@impl true
def handle_init(frame) do
children = [
hackney: %Hackney.Source{
location:
"https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s_720x480.h264"
},
parser: %FFmpeg.Parser{framerate: {25, 1}},
decoder: FFmpeg.Decoder,
sink: %KinoSink{kino_widget: frame, quality: 100}
]
links = [
link(:hackney)
|> to(:parser)
|> to(:decoder)
|> to(:sink)
]
{{:ok, spec: %ParentSpec{children: children, links: links}}, %{}}
end
end
pipeline 프로세스를 관리할 Controller
프로세스를 선언합니다. control_widget
의 입력은 handle_info/2
콜백에서 처리합니다.
defmodule Controller do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
def init({frame_widget, control_widget}) do
Kino.Control.subscribe(control_widget, :control)
{:ok, %{frame_widget: frame_widget, pipeline: nil}}
end
def handle_info({:control, %{data: %{play: false}}}, state) do
if not is_nil(state.pipeline) and Process.alive?(state.pipeline) do
Pipeline.stop_and_terminate(state.pipeline)
end
{:noreply, state}
end
def handle_info({:control, %{data: %{play: true}}}, state) do
{:ok, pid} = Pipeline.start_link(state.frame_widget)
Pipeline.play(pid)
{:noreply, %{state | pipeline: pid}}
end
end
Kino.start_child({Controller, {frame_widget, control_widget}})