Filtering
Mix.install([
{:nx_signal, "~> 0.2"},
{:vega_lite, "~> 0.1"},
{:kino_vega_lite, "~> 0.1"}
])
Prepare the data
fs = 16.0e3
window_duration_seconds = 100.0e-3
window_length = 2 ** ceil(:math.log2(fs * window_duration_seconds))
signal_duration_seconds = 3
signal_length = ceil(signal_duration_seconds * fs)
sin = fn freq, n ->
Nx.sin(Nx.multiply(2 * :math.pi() * freq / fs, n))
end
half_n = Nx.iota({div(signal_length, 2)})
sin220 = sin.(220, half_n)
sin440 = sin.(440, half_n)
sin1000 = sin.(440 * 5 / 2, half_n)
sin3000 = sin.(220 * 4 / 3 * 4, half_n)
n = Nx.iota({signal_length})
t = Nx.divide(n, fs)
data = Nx.concatenate([Nx.add(sin440, sin1000), Nx.add(sin220, sin3000)])
# Data for plotting
slice = (signal_length - 3000)..(signal_length - 2750)
plot_data = %{y: Nx.to_flat_list(data[[slice]]), x: Nx.to_flat_list(t[[slice]])}
VegaLite.new(width: 600, height: 400, title: "Signal sample")
|> VegaLite.data_from_values(plot_data, only: ["x", "y"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "x", type: :quantitative)
|> VegaLite.encode_field(:y, "y", type: :quantitative)
Preparing the Filter
# The filter used is a simple ideal filter specified through sinc
# Cutoff frequency in Hz
fc = 600
filter_indices = Nx.iota({window_length}) |> Nx.subtract(div(window_length, 2))
h_ideal =
Nx.multiply(2 * fc / fs, NxSignal.Filters.sinc(Nx.multiply(2 * fc / fs, filter_indices)))
window = NxSignal.Windows.hann(n: window_length)
h = Nx.multiply(h_ideal, window)
hfft =
h
|> Nx.fft(length: window_length)
|> Nx.abs()
|> Nx.add(1.0e-10)
hfft_power =
hfft
|> Nx.log()
|> Nx.divide(Nx.log(10))
|> Nx.multiply(20)
f_idx = Nx.iota({window_length}) |> Nx.subtract(div(window_length, 2))
f = Nx.multiply(f_idx, fs / window_length)
plot_data = %{
n: Enum.to_list(0..(window_length - 1)),
h: Nx.to_flat_list(h),
hfft:
Nx.to_flat_list(
Nx.take_along_axis(
hfft_power,
Nx.select(Nx.less(f_idx, 0), Nx.add(f_idx, window_length), f_idx)
)
),
f: Nx.to_flat_list(f)
}
VegaLite.new(
width: 600,
height: 400,
title: "600 Hz Low-Pass filter (Hann window); L = 2048"
)
|> VegaLite.data_from_values(plot_data, only: ["n", "h"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "n", type: :quantitative)
|> VegaLite.encode_field(:y, "h", type: :quantitative)
VegaLite.new(
width: 600,
height: 400,
title: "FFT - 1200 Hz Low-Pass filter (Hann window); L = 1764"
)
|> VegaLite.data_from_values(plot_data, only: ["f", "hfft"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "f", type: :quantitative)
|> VegaLite.encode_field(:y, "hfft", type: :quantitative)
Filtering the data
# Now that we have our filter, instead of convolving the filter
# with the time representation of the signal, we can multiply each
# STFT frame by the DFT of the filter (represented by hfft)
{z, t, f} =
NxSignal.stft(data, window, fft_length: window_length, sampling_rate: fs, scaling: :spectrum)
# Filter
z_filtered = Nx.multiply(z, hfft)
max_f =
Nx.select(Nx.greater_equal(f, fs / 2), Nx.iota(f.shape), Nx.size(f) + 1)
|> Nx.argmin()
|> Nx.to_number()
spectrogram = z |> Nx.slice([0, 0], [Nx.size(t), max_f]) |> Nx.abs() |> Nx.pow(2)
filtered_spectrogram =
z_filtered |> Nx.slice([0, 0], [Nx.size(t), max_f]) |> Nx.abs() |> Nx.pow(2)
# Reconstruct the time signal
data_out =
z_filtered
|> NxSignal.istft(window, fft_length: window_length, scaling: :spectrum, sampling_rate: fs)
|> Nx.as_type(data.type)
plot_data =
for t_idx <- 0..(Nx.size(t) - 1),
f_idx <- 0..max_f,
Nx.to_number(f[[f_idx]]) <= 4000,
reduce: %{"t" => [], "f" => [], "s" => [], "filtered_s" => []} do
%{"t" => t_acc, "f" => f_acc, "s" => s_acc, "filtered_s" => filtered_s_acc} ->
%{
"t" => [Nx.to_number(t[[t_idx]]) | t_acc],
"f" => [Float.round(Nx.to_number(f[[f_idx]]), 3) | f_acc],
"s" => [Nx.to_number(spectrogram[[t_idx, f_idx]]) | s_acc],
"filtered_s" => [Nx.to_number(filtered_spectrogram[[t_idx, f_idx]]) | filtered_s_acc]
}
end
defmodule Spectrogram do
alias VegaLite, as: Vl
def plot(title, dataset) do
Vl.new(title: title, width: 500, height: 500)
|> Vl.mark(:rect)
|> Vl.data_from_values(dataset)
|> Vl.encode_field(:x, "t",
type: :quantitative,
title: "Time (seconds)",
axis: [tick_min_step: 0.1],
grid: false
)
|> Vl.encode_field(:y, "f",
type: :quantitative,
sort: "-x",
title: "Frequency (Hz)",
axis: [tick_count: 25],
grid: false
)
|> Vl.encode_field(:color, "s",
aggregate: :max,
type: :quantitative,
scale: [scheme: "viridis"],
legend: [title: "dBFS"]
)
|> Vl.config(view: [stroke: nil])
end
end
Spectrogram.plot("Spectrogram", Map.take(plot_data, ["t", "f", "s"]))
Spectrogram.plot(
"Filtered Spectrogram",
Map.take(plot_data, ["t", "f"]) |> Map.put("s", plot_data["filtered_s"])
)
plot_data = %{
y: Nx.to_flat_list(data_out[[slice]]),
x: Nx.to_flat_list(Nx.divide(n, fs)[[slice]])
}
VegaLite.new(width: 600, height: 400, title: "Reconstructed Signal")
|> VegaLite.data_from_values(plot_data, only: ["x", "y"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "x", type: :quantitative)
|> VegaLite.encode_field(:y, "y", type: :quantitative)
As we can see, the same original slice is now a pure 220Hz sine wave, because we got rid of the higher frequency harmonic.