Exile
Mix.install(
[
{:kino, "~> 0.12.3"},
{:kino_bumblebee, "~> 0.5.0"},
{:exla, ">= 0.0.0"},
{:req, "~> 0.4.14"},
{:multipart, "~> 0.4.0"}
],
config: [
nx: [default_backend: EXLA.Backend]
]
)
Setup Model
{:ok, roberta_model_info} =
Bumblebee.load_model({:hf, "cardiffnlp/twitter-roberta-base-offensive"})
{:ok, roberta_tokenizer} = Bumblebee.load_tokenizer({:hf, "FacebookAI/roberta-base"})
roberta_serving =
Bumblebee.Text.text_classification(roberta_model_info, roberta_tokenizer,
top_k: 1,
compile: [batch_size: 1, sequence_length: 100],
defn_options: [compiler: EXLA]
)
defmodule Exile do
@asset_path __ENV__.file |> Path.dirname() |> Path.join("assets")
@beep_sounds (for file <- @asset_path |> File.ls!(),
match?(<<"beep_", _duration::binary>>, file) do
<<"beep_", duration::binary>> = Path.basename(file)
{duration, ".wav"} = duration |> String.split_at(-String.length(".wav"))
{duration, ""} = duration |> Float.parse()
%{file: Path.join(@asset_path, file), duration: duration}
end)
|> Enum.sort_by(&Map.get(&1, :duration))
def censor_audio(input_file, output_file, words) do
timestamps = extract_offensive_timestamps(words)
input_duration = get_audio_duration(input_file)
{inputs, filter_complex} = build_filter_complex(timestamps, input_duration)
ffmpeg_command =
["-y", "-i", input_file] ++
inputs ++ ["-filter_complex", filter_complex, "-map", "[outa]", output_file]
{output, status} = System.cmd("ffmpeg", ffmpeg_command, stderr_to_stdout: true)
{output, status}
end
defp extract_offensive_timestamps(words) do
Enum.filter(words, fn word -> word["label"] == "offensive" end)
|> Enum.map(fn word -> {word["start"], word["end"]} end)
end
defp select_and_adjust_beep_sound(duration) do
Enum.find(@beep_sounds, fn %{duration: d} -> d >= duration end) || List.last(@beep_sounds)
end
defp get_audio_duration(file_path) do
{output, _} =
System.cmd("ffprobe", [
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_path
])
String.trim(output) |> String.to_float()
end
defp build_filter_complex(timestamps, input_duration) do
beep_inputs =
Enum.map(timestamps, fn {start, end_time} ->
beep = select_and_adjust_beep_sound(end_time - start)
["-i", beep.file]
end)
filter_segments = build_audio_filter_segments(timestamps, input_duration)
final_concat =
0..(length(timestamps) - 1)
|> Enum.map(&"[seg#{&1}]")
|> Enum.join()
filter_complex =
Enum.join(
filter_segments ++ ["#{final_concat}concat=n=#{length(timestamps)}:v=0:a=1[outa]"],
"; "
)
{Enum.flat_map(beep_inputs, fn input -> input end), filter_complex}
end
defp build_audio_filter_segments(timestamps, input_duration) do
timestamps
|> Enum.with_index()
|> Enum.flat_map(fn {{start, end_time}, i} ->
beep_index = i + 1
next_start =
if i == length(timestamps) - 1,
do: input_duration,
else: Enum.at(timestamps, i + 1) |> elem(0)
if i == 0 do
[
"[0:a]atrim=start=0:end=#{start}[seg#{i}_pre]",
"[#{beep_index}:a]asetpts=PTS-STARTPTS[beep#{beep_index}]",
"[0:a]atrim=start=#{end_time}:end=#{next_start}[seg#{i}_post]",
"[seg#{i}_pre][beep#{beep_index}][seg#{i}_post]concat=n=3:v=0:a=1[seg#{i}]"
]
else
[
"[#{beep_index}:a]asetpts=PTS-STARTPTS[beep#{beep_index}]",
"[0:a]atrim=start=#{end_time}:end=#{next_start}[seg#{i}_post]",
"[beep#{beep_index}][seg#{i}_post]concat=n=2:v=0:a=1[seg#{i}]"
]
end
end)
end
end
audio_input = Kino.Input.audio("Auto Censor", sampling_rate: 16_000, format: :wav)
form = Kino.Control.form([audio: audio_input], submit: "Run")
download = Kino.Frame.new(placeholder: false)
accepted_exts = ~w(.flac .mp3 .mp4 .mpeg .mpga .m4a .ogg .wav .webm)
Kino.listen(form, fn %{data: %{audio: audio}} ->
if audio do
file_path = Kino.Input.file_path(audio.file_ref)
filename =
if Path.extname(file_path) == "" do
Path.basename(file_path) <> ".wav"
else
file_path
end
{:ok, file_contents} = File.read(file_path)
multipart =
Multipart.new()
|> Multipart.add_part(Multipart.Part.text_field("whisper-1", "model"))
|> Multipart.add_part(
Multipart.Part.file_content_field(filename, file_contents, :file, filename: filename)
)
|> Multipart.add_part(Multipart.Part.text_field("word", "timestamp_granularities[]"))
|> Multipart.add_part(Multipart.Part.text_field("verbose_json", "response_format"))
content_length = Multipart.content_length(multipart)
content_type = Multipart.content_type(multipart, "multipart/form-data")
headers = [
{"authorization", "Bearer #{System.fetch_env!("LB_OPENAI_TOKEN")}"},
{"Content-Type", content_type},
{"Content-Length", to_string(content_length)}
]
transcription =
Req.post!(
"https://api.openai.com/v1/audio/transcriptions",
headers: headers,
body: Multipart.body_stream(multipart)
)
|> Map.get(:body)
words =
Map.get(transcription, "words")
|> Enum.map(fn %{"word" => word} = entry ->
[prediction] = Nx.Serving.run(roberta_serving, word).predictions
Map.put(entry, "label", prediction.label)
end)
output_file_path = System.tmp_dir!() |> Path.join("censored_#{filename}")
case Exile.censor_audio(file_path, output_file_path, words) do
{_output, 0} ->
Kino.Frame.render(
download,
Kino.Shorts.download(fn -> File.read!(output_file_path) end, filename: filename)
)
_ ->
Kino.Frame.render(download, Kino.Text.new("Error!"))
end
end
end)
Kino.Layout.grid([form, download], boxed: true, gap: 16)