Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Filtering

books/nx_signal/filtering.livemd

Filtering

Mix.install([
  {:nx_signal, "~> 0.2"},
  {:vega_lite, "~> 0.1"},
  {:kino_vega_lite, "~> 0.1"}
])

Prepare the data

fs = 16.0e3
window_duration_seconds = 100.0e-3
window_length = 2 ** ceil(:math.log2(fs * window_duration_seconds))
signal_duration_seconds = 3
signal_length = ceil(signal_duration_seconds * fs)

sin = fn freq, n ->
  Nx.sin(Nx.multiply(2 * :math.pi() * freq / fs, n))
end

half_n = Nx.iota({div(signal_length, 2)})

sin220 = sin.(220, half_n)
sin440 = sin.(440, half_n)
sin1000 = sin.(440 * 5 / 2, half_n)
sin3000 = sin.(220 * 4 / 3 * 4, half_n)

n = Nx.iota({signal_length})
t = Nx.divide(n, fs)
data = Nx.concatenate([Nx.add(sin440, sin1000), Nx.add(sin220, sin3000)])

# Data for plotting

slice = (signal_length - 3000)..(signal_length - 2750)
plot_data = %{y: Nx.to_flat_list(data[[slice]]), x: Nx.to_flat_list(t[[slice]])}
VegaLite.new(width: 600, height: 400, title: "Signal sample")
|> VegaLite.data_from_values(plot_data, only: ["x", "y"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "x", type: :quantitative)
|> VegaLite.encode_field(:y, "y", type: :quantitative)

Preparing the Filter

# The filter used is a simple ideal filter specified through sinc

# Cutoff frequency in Hz
fc = 600
filter_indices = Nx.iota({window_length}) |> Nx.subtract(div(window_length, 2))

h_ideal =
  Nx.multiply(2 * fc / fs, NxSignal.Filters.sinc(Nx.multiply(2 * fc / fs, filter_indices)))

window = NxSignal.Windows.hann(n: window_length)

h = Nx.multiply(h_ideal, window)

hfft =
  h
  |> Nx.fft(length: window_length)
  |> Nx.abs()
  |> Nx.add(1.0e-10)

hfft_power =
  hfft
  |> Nx.log()
  |> Nx.divide(Nx.log(10))
  |> Nx.multiply(20)

f_idx = Nx.iota({window_length}) |> Nx.subtract(div(window_length, 2))

f = Nx.multiply(f_idx, fs / window_length)

plot_data = %{
  n: Enum.to_list(0..(window_length - 1)),
  h: Nx.to_flat_list(h),
  hfft:
    Nx.to_flat_list(
      Nx.take_along_axis(
        hfft_power,
        Nx.select(Nx.less(f_idx, 0), Nx.add(f_idx, window_length), f_idx)
      )
    ),
  f: Nx.to_flat_list(f)
}
VegaLite.new(
  width: 600,
  height: 400,
  title: "600 Hz Low-Pass filter (Hann window); L = 2048"
)
|> VegaLite.data_from_values(plot_data, only: ["n", "h"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "n", type: :quantitative)
|> VegaLite.encode_field(:y, "h", type: :quantitative)
VegaLite.new(
  width: 600,
  height: 400,
  title: "FFT - 1200 Hz Low-Pass filter (Hann window); L = 1764"
)
|> VegaLite.data_from_values(plot_data, only: ["f", "hfft"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "f", type: :quantitative)
|> VegaLite.encode_field(:y, "hfft", type: :quantitative)

Filtering the data

# Now that we have our filter, instead of convolving the filter
# with the time representation of the signal, we can multiply each
# STFT frame by the DFT of the filter (represented by hfft)

{z, t, f} =
  NxSignal.stft(data, window, fft_length: window_length, sampling_rate: fs, scaling: :spectrum)
# Filter
z_filtered = Nx.multiply(z, hfft)

max_f =
  Nx.select(Nx.greater_equal(f, fs / 2), Nx.iota(f.shape), Nx.size(f) + 1)
  |> Nx.argmin()
  |> Nx.to_number()

spectrogram = z |> Nx.slice([0, 0], [Nx.size(t), max_f]) |> Nx.abs() |> Nx.pow(2)

filtered_spectrogram =
  z_filtered |> Nx.slice([0, 0], [Nx.size(t), max_f]) |> Nx.abs() |> Nx.pow(2)

# Reconstruct the time signal
data_out =
  z_filtered
  |> NxSignal.istft(window, fft_length: window_length, scaling: :spectrum, sampling_rate: fs)
  |> Nx.as_type(data.type)
plot_data =
  for t_idx <- 0..(Nx.size(t) - 1),
      f_idx <- 0..max_f,
      Nx.to_number(f[[f_idx]]) <= 4000,
      reduce: %{"t" => [], "f" => [], "s" => [], "filtered_s" => []} do
    %{"t" => t_acc, "f" => f_acc, "s" => s_acc, "filtered_s" => filtered_s_acc} ->
      %{
        "t" => [Nx.to_number(t[[t_idx]]) | t_acc],
        "f" => [Float.round(Nx.to_number(f[[f_idx]]), 3) | f_acc],
        "s" => [Nx.to_number(spectrogram[[t_idx, f_idx]]) | s_acc],
        "filtered_s" => [Nx.to_number(filtered_spectrogram[[t_idx, f_idx]]) | filtered_s_acc]
      }
  end
defmodule Spectrogram do
  alias VegaLite, as: Vl

  def plot(title, dataset) do
    Vl.new(title: title, width: 500, height: 500)
    |> Vl.mark(:rect)
    |> Vl.data_from_values(dataset)
    |> Vl.encode_field(:x, "t",
      type: :quantitative,
      title: "Time (seconds)",
      axis: [tick_min_step: 0.1],
      grid: false
    )
    |> Vl.encode_field(:y, "f",
      type: :quantitative,
      sort: "-x",
      title: "Frequency (Hz)",
      axis: [tick_count: 25],
      grid: false
    )
    |> Vl.encode_field(:color, "s",
      aggregate: :max,
      type: :quantitative,
      scale: [scheme: "viridis"],
      legend: [title: "dBFS"]
    )
    |> Vl.config(view: [stroke: nil])
  end
end
Spectrogram.plot("Spectrogram", Map.take(plot_data, ["t", "f", "s"]))
Spectrogram.plot(
  "Filtered Spectrogram",
  Map.take(plot_data, ["t", "f"]) |> Map.put("s", plot_data["filtered_s"])
)
plot_data = %{
  y: Nx.to_flat_list(data_out[[slice]]),
  x: Nx.to_flat_list(Nx.divide(n, fs)[[slice]])
}
VegaLite.new(width: 600, height: 400, title: "Reconstructed Signal")
|> VegaLite.data_from_values(plot_data, only: ["x", "y"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "x", type: :quantitative)
|> VegaLite.encode_field(:y, "y", type: :quantitative)

As we can see, the same original slice is now a pure 220Hz sine wave, because we got rid of the higher frequency harmonic.