Process FFmpeg result with stream
Introduction
ffmpex 와 ffmpex with kino 에서 http 경로에 저장된 영상을 ffmpex 로 다루는 코드들을 구현해 보았습니다.
주로 영상의 프레임을 jpeg 로 출력하는 것이었는데요, 이 코드들에는 한 가지 문제가 있습니다. ffmpex 구현의 특성상 ffmpeg 의 stdout 을 한 번에 읽어들여 하나의 변수로 처리해야 하는 것입니다.
FFmpex.new_command()
|> add_input_file(@video)
|> add_file_option(option_ss(10))
|> to_stdout()
|> add_stream_specifier(stream_type: :video)
|> add_stream_option(option_frames(10))
|> add_stream_option(option_c("mjpeg"))
|> add_file_option(option_f("image2pipe"))
|> execute()
|> then(fn {:ok, image} ->
# ffmpeg 실행이 끝난 후 images 에 10초 이후 10개의 jpeg 프레임이 포함된 데이터가 한 반에 넘어옵니다.
end)
ffmpeg 이 처리하는 데이터가 늘어날수록, stdout 으로 출력되는 데이터가 늘어나고, 이를 처리하기 위해 Elixir 노드는 더 많은 메모리를 사용하게 됩니다.
또한 ffmpeg 이 처리한 데이터가 stdout 으로 모두 출력된 이후에 Elixir 가 데이터를 처리할 수 있다는 점도 문제입니다. ffmpeg 이 stdout 으로 출력한 데이터를 일부만 읽어서 먼저 처리하고 싶어도, 항상 모든 데이터가 stdout 으로 출력될때까지 기다려야 합니다.
Solution
이 문제를 해결하는 방법은 stdout 을 Stream 으로 처리하는 것입니다. 안타깝게도 ffmpex 는 stdout 을 Stream 으로 처리하는 옵션이 없습니다. ffmpex 가 ffmpeg 를 호출할 때 사용하는 rambo 에는 옵션이 존재하지만, ffmpex 에서는 해당 옵션을 사용할 수 없도록 작성되어 있습니다.
다행히 rambo 라이브러리에 다른 대안들에 대한 정보가 잘 정리되어 있었고, Stream 처리 기능을 가지고 있는 Exile 을 찾을 수 있었습니다.
Setup
Exile
은 아직 https://hex.pm 에 배포되지 않았기 때문에 git 의존성으로 설치합니다. (비교를 위해서 ffmpex 도 같이 설치합니다.)
Mix.install([
{:exile, git: "https://github.com/akash-akya/exile"},
{:ffmpex, "~> 0.10.0"},
{:kino, "~> 0.5"}
])
source = "https://storage.googleapis.com/momenti-staging-public-2/content/aespa.mp4"
Exile
을 사용해 source
로 부터 하나의 프레임을 jpeg 로 가져오는 코드를 작성해 보았습니다.
ffmpeg 이 처리한 데이터를 stdout 으로 출력하도록 ffmpeg 명령어의 output 을 pipe:1
로 선언합니다.
Exile.stream!/2
은 명령어를 실행하고, stdout 을 읽어 Stream 처리가 가능한 데이터로 변환합니다. Stream 의 여러 함수들을 사용해 stdout 데이터를 처리할 수 있습니다.
여기서는 Stream.into/3
으로 데이터를 모두 파일로 기록하였습니다.
Exile.stream!(
~w(ffmpeg -ss 65 -i #{source} -f image2pipe -vf scale=iw/2:ih/2 -c:v mjpeg -q:v 31 -frames:v 1 pipe:1)
)
|> Stream.into(File.stream!("./data/exile_frame.jpeg"))
|> Stream.run()
|> then(fn _ ->
content = File.read!("./data/exile_frame.jpeg")
Kino.Image.new(content, "image/png")
end)
Process multiple frames
Exile
을 사용해서 하나 이상의 프레임을 처리하는 코드를 작성해 봅시다.
우선 stdout 으로 부터 읽어들인 데이터를 처리하는 방법을 고민해야 합니다. 프레임이 하나인 경우에는 모든 데이터를 jpeg 파일 하나로 모아 기록하면 그만이었습니다. 하지만 여러 프레임을 요청하면 ffmpeg 은 별도의 구분 없이 모든 데이터를 stdout 으로 출력합니다. 다행히도 jpeg 는 SOI
(start of image), EOI
(end of image) 로 이미지의 시작과 끝을 표시하기 때문에, 이 값을 확인해서 데이터를 여러 장의 jpeg 이미지로 변환 할 수 있습니다.
Stream.transform/3
을 사용해 이 과정을 구현했습니다.
defmodule Feature.Exile do
@soi <<255, 216>>
@eoi <<255, 217>>
defp split(binary, acc) do
case :binary.split(binary, <<@eoi, @soi>>, [:global, :trim_all]) do
[_] ->
{[nil], acc <> binary}
[head | rest] ->
[tail | part] = rest |> Enum.reverse()
middle = part |> Enum.reverse() |> Enum.map(&(@soi <> &1 <> @eoi))
{[acc <> head <> @eoi] ++ middle, @soi <> tail}
end
end
defp split(binary, _acc, :both) do
case :binary.split(binary, <<@eoi, @soi>>, [:global, :trim_all]) do
[_] ->
{[binary], <<>>}
_ ->
{:binary.split(binary, @soi, [:global, :trim_all]) |> Enum.map(&(@soi <> &1)), <<>>}
end
end
defp split(binary, acc, :hd) do
case :binary.split(binary, <<@eoi, @soi>>, [:global, :trim_all]) do
[_] ->
{[nil], binary}
[head | rest] ->
[tail | part] = rest |> Enum.reverse()
middle = part |> Enum.reverse() |> Enum.map(&(@soi <> &1 <> @eoi))
{[acc <> head <> @eoi] ++ middle, @soi <> tail}
end
end
defp split(binary, acc, :tl) do
case :binary.split(binary, <<@eoi, @soi>>, [:global, :trim_all]) do
[_] ->
{[acc <> binary], <<>>}
[head | rest] ->
[tail | part] = rest |> Enum.reverse()
middle = part |> Enum.reverse() |> Enum.map(&(@soi <> &1 <> @eoi))
{[acc <> head <> @eoi] ++ middle ++ [tail], <<>>}
end
end
def run(widget, opts) do
start_time = System.monotonic_time(:millisecond)
source = Keyword.get(opts, :source)
start = Keyword.get(opts, :start)
frames = Keyword.get(opts, :frames)
Exile.stream!(
~w(ffmpeg -ss #{start} -i #{source} -f image2pipe -vf scale=iw/2:ih/2 -c:v mjpeg -q:v 31 -frames:v #{frames} pipe:1)
)
|> Stream.transform(<<>>, fn stdout, acc ->
size = byte_size(stdout)
{head, part, tail} = {size - 2, size - 4, size - 2}
case stdout do
<<@soi, _::binary-size(part), @eoi>> ->
split(stdout, acc, :both)
<<@soi, _::binary-size(tail)>> ->
split(stdout, acc, :hd)
<<_::binary-size(head), @eoi>> ->
split(stdout, acc, :tl)
_ ->
split(stdout, acc)
end
end)
|> Stream.reject(&is_nil/1)
|> Stream.scan(:first, fn
image, :first ->
IO.inspect(System.monotonic_time(:millisecond) - start_time,
label: "Time for 1st frame (in ms)"
)
image
image, _ ->
image
end)
|> Stream.each(fn image ->
image_frame = Kino.Image.new(image, "image/png")
Kino.Frame.render(widget, image_frame)
end)
|> Stream.run()
end
end
exile_frame_widget = Kino.Frame.new()
영상의 시작 시점 (:start
) 과 프레임 수 (:frames
) 를 변경해가면서 셀을 실행해보면, 시작 시점과 프레임 수에 관계 없이 출력되는 Time for 1st frame
의 값이 크게 변하지 않는다는 것을 알 수 있습니다. 프레임 수를 100, 200 심지어 1,000 으로 늘리더라도 Time for 1st frame
는 1,500 ~ 3,000 사이를 유지합니다.
Exile
덕분에 ffmpeg 실행이 끝날때까지 기다리지 않고, 출력된 stdout 을 필요한 만큼 Stream
읽고 jpeg 이미지로 변환하기 때문입니다.
비슷한 로직을 ffmpex 로 구현해둔 아래 섹션을 실행해 보면, 차이를 더 명확하게 확인할 수 있습니다.
Feature.Exile.run(exile_frame_widget, source: source, start: 65, frames: 200)
Process multiple frames (with FFmpex)
defmodule Feature.Ffmpex do
import FFmpex
use FFmpex.Options
@soi <<255, 216>>
def run(widget, opts) do
start_time = System.monotonic_time(:millisecond)
source = Keyword.get(opts, :source)
start = Keyword.get(opts, :start)
frames = Keyword.get(opts, :frames)
FFmpex.new_command()
|> add_input_file(source)
|> add_file_option(option_ss(start))
|> to_stdout()
|> add_stream_specifier(stream_type: :video)
|> add_stream_option(option_frames(frames))
|> add_stream_option(option_c("mjpeg"))
|> add_stream_option(option_vf("scale=iw/2:ih/2"))
|> add_stream_option(option_q("31"))
|> add_file_option(option_f("image2pipe"))
|> execute()
|> then(fn
{:ok, images} ->
IO.inspect(System.monotonic_time(:millisecond) - start_time,
label: "Time for 1st frame (in ms)"
)
images
{:error, err} ->
err
end)
|> :binary.split(@soi, [:global, :trim_all])
|> Enum.map(&(@soi <> &1))
|> Enum.each(fn image ->
image_frame = Kino.Image.new(image, "image/png")
Kino.Frame.render(widget, image_frame)
end)
end
end
ffmpex_frame_widget = Kino.Frame.new()
Feature.Ffmpex.run(ffmpex_frame_widget, source: source, start: 65, frames: 200)