Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Exile

exile.livemd

Exile

Mix.install(
  [
    {:kino, "~> 0.12.3"},
    {:kino_bumblebee, "~> 0.5.0"},
    {:exla, ">= 0.0.0"},
    {:req, "~> 0.4.14"},
    {:multipart, "~> 0.4.0"}
  ],
  config: [
    nx: [default_backend: EXLA.Backend]
  ]
)

Setup Model

{:ok, roberta_model_info} =
  Bumblebee.load_model({:hf, "cardiffnlp/twitter-roberta-base-offensive"})

{:ok, roberta_tokenizer} = Bumblebee.load_tokenizer({:hf, "FacebookAI/roberta-base"})

roberta_serving =
  Bumblebee.Text.text_classification(roberta_model_info, roberta_tokenizer,
    top_k: 1,
    compile: [batch_size: 1, sequence_length: 100],
    defn_options: [compiler: EXLA]
  )
defmodule Exile do
  @asset_path __ENV__.file |> Path.dirname() |> Path.join("assets")
  @beep_sounds (for file <- @asset_path |> File.ls!(),
                    match?(<<"beep_", _duration::binary>>, file) do
                  <<"beep_", duration::binary>> = Path.basename(file)
                  {duration, ".wav"} = duration |> String.split_at(-String.length(".wav"))
                  {duration, ""} = duration |> Float.parse()
                  %{file: Path.join(@asset_path, file), duration: duration}
                end)
               |> Enum.sort_by(&amp;Map.get(&amp;1, :duration))

  def censor_audio(input_file, output_file, words) do
    timestamps = extract_offensive_timestamps(words)
    input_duration = get_audio_duration(input_file)
    {inputs, filter_complex} = build_filter_complex(timestamps, input_duration)

    ffmpeg_command =
      ["-y", "-i", input_file] ++
        inputs ++ ["-filter_complex", filter_complex, "-map", "[outa]", output_file]

    {output, status} = System.cmd("ffmpeg", ffmpeg_command, stderr_to_stdout: true)

    {output, status}
  end

  defp extract_offensive_timestamps(words) do
    Enum.filter(words, fn word -> word["label"] == "offensive" end)
    |> Enum.map(fn word -> {word["start"], word["end"]} end)
  end

  defp select_and_adjust_beep_sound(duration) do
    Enum.find(@beep_sounds, fn %{duration: d} -> d >= duration end) || List.last(@beep_sounds)
  end

  defp get_audio_duration(file_path) do
    {output, _} =
      System.cmd("ffprobe", [
        "-v",
        "error",
        "-show_entries",
        "format=duration",
        "-of",
        "default=noprint_wrappers=1:nokey=1",
        file_path
      ])

    String.trim(output) |> String.to_float()
  end

  defp build_filter_complex(timestamps, input_duration) do
    beep_inputs =
      Enum.map(timestamps, fn {start, end_time} ->
        beep = select_and_adjust_beep_sound(end_time - start)
        ["-i", beep.file]
      end)

    filter_segments = build_audio_filter_segments(timestamps, input_duration)

    final_concat =
      0..(length(timestamps) - 1)
      |> Enum.map(&amp;"[seg#{&amp;1}]")
      |> Enum.join()

    filter_complex =
      Enum.join(
        filter_segments ++ ["#{final_concat}concat=n=#{length(timestamps)}:v=0:a=1[outa]"],
        "; "
      )

    {Enum.flat_map(beep_inputs, fn input -> input end), filter_complex}
  end

  defp build_audio_filter_segments(timestamps, input_duration) do
    timestamps
    |> Enum.with_index()
    |> Enum.flat_map(fn {{start, end_time}, i} ->
      beep_index = i + 1

      next_start =
        if i == length(timestamps) - 1,
          do: input_duration,
          else: Enum.at(timestamps, i + 1) |> elem(0)

      if i == 0 do
        [
          "[0:a]atrim=start=0:end=#{start}[seg#{i}_pre]",
          "[#{beep_index}:a]asetpts=PTS-STARTPTS[beep#{beep_index}]",
          "[0:a]atrim=start=#{end_time}:end=#{next_start}[seg#{i}_post]",
          "[seg#{i}_pre][beep#{beep_index}][seg#{i}_post]concat=n=3:v=0:a=1[seg#{i}]"
        ]
      else
        [
          "[#{beep_index}:a]asetpts=PTS-STARTPTS[beep#{beep_index}]",
          "[0:a]atrim=start=#{end_time}:end=#{next_start}[seg#{i}_post]",
          "[beep#{beep_index}][seg#{i}_post]concat=n=2:v=0:a=1[seg#{i}]"
        ]
      end
    end)
  end
end
audio_input = Kino.Input.audio("Auto Censor", sampling_rate: 16_000, format: :wav)
form = Kino.Control.form([audio: audio_input], submit: "Run")
download = Kino.Frame.new(placeholder: false)
accepted_exts = ~w(.flac .mp3 .mp4 .mpeg .mpga .m4a .ogg .wav .webm)

Kino.listen(form, fn %{data: %{audio: audio}} ->
  if audio do
    file_path = Kino.Input.file_path(audio.file_ref)

    filename =
      if Path.extname(file_path) == "" do
        Path.basename(file_path) <> ".wav"
      else
        file_path
      end

    {:ok, file_contents} = File.read(file_path)

    multipart =
      Multipart.new()
      |> Multipart.add_part(Multipart.Part.text_field("whisper-1", "model"))
      |> Multipart.add_part(
        Multipart.Part.file_content_field(filename, file_contents, :file, filename: filename)
      )
      |> Multipart.add_part(Multipart.Part.text_field("word", "timestamp_granularities[]"))
      |> Multipart.add_part(Multipart.Part.text_field("verbose_json", "response_format"))

    content_length = Multipart.content_length(multipart)
    content_type = Multipart.content_type(multipart, "multipart/form-data")

    headers = [
      {"authorization", "Bearer #{System.fetch_env!("LB_OPENAI_TOKEN")}"},
      {"Content-Type", content_type},
      {"Content-Length", to_string(content_length)}
    ]

    transcription =
      Req.post!(
        "https://api.openai.com/v1/audio/transcriptions",
        headers: headers,
        body: Multipart.body_stream(multipart)
      )
      |> Map.get(:body)

    words =
      Map.get(transcription, "words")
      |> Enum.map(fn %{"word" => word} = entry ->
        [prediction] = Nx.Serving.run(roberta_serving, word).predictions

        Map.put(entry, "label", prediction.label)
      end)

    output_file_path = System.tmp_dir!() |> Path.join("censored_#{filename}")

    case Exile.censor_audio(file_path, output_file_path, words) do
      {_output, 0} ->
        Kino.Frame.render(
          download,
          Kino.Shorts.download(fn -> File.read!(output_file_path) end, filename: filename)
        )

      _ ->
        Kino.Frame.render(download, Kino.Text.new("Error!"))
    end
  end
end)

Kino.Layout.grid([form, download], boxed: true, gap: 16)