Speech to text
Logger.configure(level: :info)
Mix.install(
[
{:bumblebee, "~> 0.5.3"},
{:exla, "~> 0.7.1"},
{:membrane_core, "~> 1.0"},
{:membrane_portaudio_plugin, "~> 0.19.2"}
],
config: [
nx: [default_backend: EXLA.Backend]
]
)
Introduction
This livebook example shows how to perform a real-time speech-to-text conversion with the use of the Membrane Framework and the Bumblebee.
You will see how to fetch the audio from your microphone, perform preprocessing and create your own Membrane element that runs an AI speech-to-text conversion model.
Element performing speech to text conversion
We need to write a custom filter that will perform speech-to-text and forward the resulting transcription. Let’s call this element SpeechToText
.
To perform the transcription the element will use the Open AI’s Whisper model. It can be easily loaded and used with a “little” help from Bumblebee.
The Whipser model requires the input audio samples to be in f32le
format, which means, that they are represented as floating numbers written on 32 bits, with little endian bytes order. The required input sample rate is 16 kHz, as well as only single-channel samples are allowed.
Initialization
In the initialization process, we load the Whisper
model, along with the featurizer, tokenizer, and generation configuration. Then we create a speech-to-text serving and indicate that we want to use EXLA
backend for the tensor operations.
Buffers handling
Once a buffer arrives, we check if it contains enough amount of audio and accumulate it in the state otherwise. Then we perform basic VAD (Voice Activity Detection) to remove silent chunks, what improves the model behaviour. Then we convert the audio to an Nx tensor, feed the model and parse output, which is then sent to via the output pad.
defmodule SpeechToText do
use Membrane.Filter
alias Membrane.RawAudio
require Membrane.Logger
@vad_chunk_duration Membrane.Time.milliseconds(500)
def_input_pad(:input,
accepted_format: %RawAudio{sample_format: :f32le, channels: 1, sample_rate: 16_000}
)
def_output_pad(:output, accepted_format: Membrane.RemoteStream)
def_options(
chunk_duration: [
spec: Membrane.Time.t(),
default: Membrane.Time.seconds(5),
default_inspector: &Membrane.Time.pretty_duration/1,
description: """
The duration of chunks feeding the model.
Must be at least 5 seconds. The longer the chunks,
the better transcription accuracy, but bigger latency.
"""
],
vad_threshold: [
spec: float,
default: 0.03,
description: """
Volume threshold below which the input is considered to be silence.
Used for optimizing aligment of chunks provided to the model
and filtering out the silence to prevent hallucinations.
"""
]
)
@impl true
def handle_setup(_ctx, options) do
{:ok, whisper} = Bumblebee.load_model({:hf, "openai/whisper-tiny"})
{:ok, featurizer} = Bumblebee.load_featurizer({:hf, "openai/whisper-tiny"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai/whisper-tiny"})
{:ok, generation_config} = Bumblebee.load_generation_config({:hf, "openai/whisper-tiny"})
serving =
Bumblebee.Audio.speech_to_text_whisper(whisper, featurizer, tokenizer, generation_config,
defn_options: [compiler: EXLA]
)
Membrane.Logger.info("Whisper model ready")
state =
Map.merge(options, %{
serving: serving,
speech: <<>>,
queue: <<>>,
chunk_size: nil,
vad_chunk_size: nil
})
{[], state}
end
@impl true
def handle_stream_format(:input, stream_format, _ctx, state) do
state = %{
state
| chunk_size: RawAudio.time_to_bytes(state.chunk_duration, stream_format),
vad_chunk_size: RawAudio.time_to_bytes(@vad_chunk_duration, stream_format)
}
{[stream_format: {:output, %Membrane.RemoteStream{}}], state}
end
@impl true
def handle_buffer(:input, buffer, _ctx, state) do
input = state.queue <> buffer.payload
if byte_size(input) > state.vad_chunk_size do
process_data(input, %{state | queue: <<>>})
else
{[], %{state | queue: input}}
end
end
defp process_data(data, state) do
# Here we filter out the silence at the beginning of each chunk.
# This way we can fit as much speech in a single chunk as possible
# and potentially remove whole silent chunks, which cause
# model hallucinations. If after removing the silence the chunk
# is not empty but too small to process, we store it in the state
# and prepend it to the subsequent chunk.
speech =
if state.speech == <<>> do
filter_silence(data, state)
else
state.speech <> data
end
if byte_size(speech) < state.chunk_size do
{[], %{state | speech: speech}}
else
model_input = Nx.from_binary(speech, :f32)
result = Nx.Serving.run(state.serving, model_input)
transcription = Enum.map_join(result.chunks, & &1.text)
buffer = %Membrane.Buffer{payload: transcription}
{[buffer: {:output, buffer}], %{state | speech: <<>>}}
end
end
defp filter_silence(samples, state) do
samples
|> generate_chunks(state.vad_chunk_size)
|> Enum.drop_while(&(calc_volume(&1) < state.vad_threshold))
|> Enum.join()
end
defp generate_chunks(samples, chunk_size) when byte_size(samples) >= 2 * chunk_size do
<> = samples
[chunk | generate_chunks(rest, chunk_size)]
end
defp generate_chunks(samples, _chunk_size) do
[samples]
end
# Calculates audio volume based on standard deviation
# of the samples
defp calc_volume(chunk) do
samples = for <>, do: sample
samples_cnt = Enum.count(samples)
samples_avg = Enum.sum(samples) / samples_cnt
sum_mean_square = samples |> Enum.map(&((&1 - samples_avg) ** 2)) |> Enum.sum()
:math.sqrt(sum_mean_square / samples_cnt)
end
end
Pipeline
The pipeline consists of the following elements:
-
The
Membrane.PortAudio.Source
- responsible for fetching the audio input from your microphone -
The
SpeechToText
filter we have previously created -
The
Membrane.Debug.Sink
that will print the transcriptions to the standard output
Running the cell below will start the pipeline. It may ask you for permission to use your microphone. Try saying something in English and the transcription of your words should appear below the cell.
import Membrane.ChildrenSpec
alias Membrane.RCPipeline
spec =
child(%Membrane.PortAudio.Source{channels: 1, sample_format: :f32le, sample_rate: 16_000})
|> child(SpeechToText)
|> child(%Membrane.Debug.Sink{handle_buffer: &IO.puts(&1.payload)})
pipeline = RCPipeline.start_link!()
RCPipeline.exec_actions(pipeline, spec: spec)
You can terminate the pipeline with the following code:
RCPipeline.terminate(pipeline)
Problems and potential improvements
The main problem with our approach is that we feed the model with small chunks, so that it lacks the context of what is before and after. If a chunk starts or ends in a middle of a word, it’s very hard for the model to recognize it properly. The solution for that is to make the chunks overlap and then apply a clever merging algorithm. This is almost implemented in the Bumblebee - see https://github.com/elixir-nx/bumblebee/issues/261. Merging could be also done outside of Bumblebee, but it’s hard to say how it would perform, since it would have to rely on the output text or timestamps instead of tokens. Alternatively, we could apply some clever algorithm that would tell us where to split the chunks basing on silent breaks.
Another issue is that Whisper seems to hallucinate when given a noise or silence. We’re partially solving that by removing the silence whenever possible. The silence detection and removal is not a trivial task and can possibly be improved. Also, applying noise reduction could be helpful.
We welcome ideas and contributions addressing these problems or bringing other improvements.