Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Spectrogram Plotting

books/nx_signal/spectrogram.livemd

Spectrogram Plotting

Mix.install([
  {:nx_signal, "~> 0.2"},
  {:vega_lite, "~> 0.1.4"},
  {:kino_vega_lite, "~> 0.1.1"}
])

Generating the audio data

# You can load an audio file here.  For this example,
# we're producing 3 seconds of 220Hz, 440Hz, 1kHz and 3kHz sine waves

fs = 44.1e3
t_max = 3

full_n = ceil(fs * t_max)
half_n = div(full_n, 2)

# samples/sec * sec = samples
n = Nx.iota({half_n})

sin = fn freq, n ->
  Nx.sin(Nx.multiply(2 * :math.pi() * freq / fs, n))
end

sin220 = sin.(220, n)
sin440 = sin.(440, n)
sin1000 = sin.(1000, n)
sin3000 = sin.(3000, n)

data = Nx.concatenate([Nx.add(sin440, sin1000), Nx.add(sin220, sin3000)])
n = Nx.iota({full_n})

d = %{data: Nx.to_flat_list(data[[1000..1250]]), n: Nx.to_flat_list(n[[1000..1250]])}
VegaLite.new(width: 600, height: 600, title: "Audio Sample")
|> VegaLite.data_from_values(d, only: ["n", "data"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "n", type: :ordinal)
|> VegaLite.encode_field(:y, "data", type: :quantitative)
defmodule Spectrogram do
  alias VegaLite, as: Vl
  import Nx.Defn

  def calculate_stft_and_plot_spectrogram(
        input,
        fs,
        window_duration_ms,
        plot_cutoff_frequency \\ 4000
      ) do
    n_window = ceil(fs * window_duration_ms)
    {spectrogram, f, t, max_f} = stft(input, fs: fs, n_window: n_window)

    max_f = Nx.to_number(max_f)
    spectrogram = Nx.slice(spectrogram, [0, 0], [Nx.size(t), max_f])
    f = Nx.slice(f, [0], [max_f])

    spectrogram
    |> to_plot_data(f, t, plot_cutoff_frequency)
    |> plot()
  end

  defn stft(input, opts) do
    fs = opts[:fs]
    n_window = opts[:n_window]

    # ms to samples
    window = NxSignal.Windows.hann(n: n_window, is_periodic: true)

    # use the default overlap of 50%
    {s, t, f} = NxSignal.stft(input, window, sampling_rate: fs, fft_length: 1024)

    max_f =
      Nx.select(f >= fs / 2, Nx.iota(f.shape), Nx.size(f) + 1)
      |> Nx.argmin()

    spectrogram = Nx.abs(s)
    # to dBFS
    spectrogram = 20 * Nx.log(spectrogram / Nx.reduce_max(spectrogram)) / Nx.log(10)

    {spectrogram, f, t, max_f}
  end

  defp to_plot_data(s, f, t, plot_cutoff_frequency) do
    for t_idx <- 0..(Nx.size(t) - 1),
        f_idx <- 0..(Nx.size(f) - 1),
        Nx.to_number(f[[f_idx]]) <= plot_cutoff_frequency,
        reduce: %{"t" => [], "f" => [], "s" => []} do
      %{"t" => t_acc, "f" => f_acc, "s" => s_acc} ->
        %{
          "t" => [Nx.to_number(t[[t_idx]]) | t_acc],
          "f" => [Float.round(Nx.to_number(f[[f_idx]]), 3) | f_acc],
          "s" => [Nx.to_number(s[[t_idx, f_idx]]) | s_acc]
        }
    end
  end

  defp plot(dataset) do
    Vl.new(title: "Spectrogram", width: 500, height: 500)
    |> Vl.mark(:rect)
    |> Vl.data_from_values(dataset)
    |> Vl.encode_field(:x, "t",
      type: :quantitative,
      title: "Time (seconds)",
      axis: [tick_min_step: 0.1],
      grid: false
    )
    |> Vl.encode_field(:y, "f",
      type: :quantitative,
      sort: "-x",
      title: "Frequency (Hz)",
      axis: [tick_count: 25],
      grid: false
    )
    |> Vl.encode_field(:color, "s",
      aggregate: :max,
      type: :quantitative,
      scale: [scheme: "viridis"],
      legend: [title: "dBFS"]
    )
    |> Vl.config(view: [stroke: nil])
  end
end
Spectrogram.calculate_stft_and_plot_spectrogram(data, fs, 50.0e-3)

Notice how the first half of the spectrogram looks cleaner than the second one. This is due to the window length, that also interferes in how we can observe both our time and frequency resolutions.

Below we can see what happens if we use different window durations (150ms, 100ms and 25ms respectively).

Spectrogram.calculate_stft_and_plot_spectrogram(data, fs, 150.0e-3)
Spectrogram.calculate_stft_and_plot_spectrogram(data, fs, 100.0e-3)
Spectrogram.calculate_stft_and_plot_spectrogram(data, fs, 25.0e-3)