Powered by AppSignal & Oban Pro

Peak Finding

guides/peak_finding.livemd

Peak Finding

Mix.install([
  {:nx_signal, path: __DIR__ |> Path.join("..") |> Path.expand()},
  {:kino_vega_lite, "~> 0.1"},
  {:tucan, "~> 0.5"}
])

Introduction

A local maximum is a sample that is strictly greater than all of its neighbours within a given window. NxSignal.PeakFinding provides three functions for finding such extrema:

  • argrelmax(signal, order) - local maxima
  • argrelmin(signal, order) - local minima
  • argrelextrema(signal, comparator, order) - generalised version

The order parameter controls the half-width of the comparison window. A point must beat all neighbours within order samples on each side to qualify. Small order → sensitive (finds every little wiggle); large order → selective (finds only broad peaks).

All three functions return %{indices: tensor, valid_indices: scalar}. The first valid_indices entries of indices are the detected positions; the remainder are padded with −1.

# Helper: extract valid peak indices as a flat Elixir list
defmodule Peaks do
  def indices(result) do
    n = Nx.to_number(result.valid_indices)
    if n == 0, do: [], else: result.indices |> Nx.slice([0, 0], [n, 1]) |> Nx.to_flat_list()
  end
end

Basic usage: effect of the order parameter

A noisy signal consisting of two sinusoids illustrates how order controls which peaks are returned.

fs  = 200
dur = 2.0
t   = Nx.linspace(0, dur, n: trunc(fs * dur), endpoint: false, type: :f32)

# Signal: 5 Hz + 13 Hz + noise
key = Nx.Random.key(0)
{noise, _} = Nx.Random.normal(key, shape: {trunc(fs * dur)}, type: :f32)

signal =
  Nx.sin(Nx.multiply(t, 2 * :math.pi() * 5))
  |> Nx.add(Nx.multiply(0.4, Nx.sin(Nx.multiply(t, 2 * :math.pi() * 13))))
  |> Nx.add(Nx.multiply(0.15, noise))

t_list  = Nx.to_flat_list(t)
s_list  = Nx.to_flat_list(signal)
# Show how the detected peaks change with order
for order <- [3, 10, 20] do
  peak_idx = NxSignal.PeakFinding.argrelmax(signal, order: order) |> Peaks.indices()

  signal_data = Enum.zip(t_list, s_list) |> Enum.map(fn {ti, v} -> %{time: ti, amplitude: v} end)
  peak_data   = Enum.map(peak_idx, fn i -> %{time: Enum.at(t_list, i), amplitude: Enum.at(s_list, i)} end)

  chart =
    VegaLite.new(width: 640, height: 160,
      title: "argrelmax (order=#{order}): #{length(peak_idx)} peaks found"
    )
    |> VegaLite.layers([
      VegaLite.new()
      |> VegaLite.data_from_values(signal_data)
      |> VegaLite.mark(:line, color: "steelblue")
      |> VegaLite.encode_field(:x, "time", type: :quantitative, title: "Time (s)")
      |> VegaLite.encode_field(:y, "amplitude", type: :quantitative, title: "Amplitude"),
      VegaLite.new()
      |> VegaLite.data_from_values(peak_data)
      |> VegaLite.mark(:point, color: "tomato", size: 80, filled: true)
      |> VegaLite.encode_field(:x, "time", type: :quantitative)
      |> VegaLite.encode_field(:y, "amplitude", type: :quantitative)
    ])

  Kino.render(chart)
end

:ok

With order = 3 every small ripple is detected. With order = 20, roughly half the period of the 5 Hz component, only the broad structural peaks survive.

Rule of thumb: set order to roughly half the period of the feature you want to detect, measured in samples.

Spectral peak detection

One of the most important applications is finding the dominant frequency components of a signal automatically from its FFT magnitude.

fs_spec  = 1_000
dur_spec = 2.0
n_spec   = trunc(fs_spec * dur_spec)
t_spec   = Nx.linspace(0, dur_spec, n: n_spec, endpoint: false, type: :f32)

# Three tones with different amplitudes
x_spec =
  Nx.sin(Nx.multiply(t_spec, 2 * :math.pi() * 50))
  |> Nx.add(Nx.multiply(0.6, Nx.sin(Nx.multiply(t_spec, 2 * :math.pi() * 130))))
  |> Nx.add(Nx.multiply(0.3, Nx.sin(Nx.multiply(t_spec, 2 * :math.pi() * 210))))

# FFT magnitude (positive frequencies only)
half_spec = div(n_spec, 2)
freqs_spec = NxSignal.fft_frequencies(fs_spec, fft_length: n_spec)[0..half_spec] |> Nx.to_flat_list()

mag =
  x_spec
  |> Nx.as_type({:c, 64})
  |> Nx.fft()
  |> Nx.abs()
  |> Nx.slice([0], [half_spec + 1])

# Find spectral peaks: order=3 works well for FFT magnitudes
peak_result  = NxSignal.PeakFinding.argrelmax(mag, order: 3)
peak_indices = Peaks.indices(peak_result)
mag_list     = Nx.to_flat_list(mag)

# Sort by amplitude, keep top 5
peak_freqs =
  peak_indices
  |> Enum.map(fn i -> {Enum.at(freqs_spec, i), Enum.at(mag_list, i)} end)
  |> Enum.sort_by(fn {_, a} -> -a end)
  |> Enum.take(5)

IO.inspect(Enum.map(peak_freqs, fn {f, _} -> Float.round(f, 1) end), label: "Detected frequencies (Hz)")
spec_data  = Enum.zip(freqs_spec, mag_list) |> Enum.map(fn {f, a} -> %{frequency: f, amplitude: a} end)
peak_pts   = Enum.map(peak_indices, fn i -> %{frequency: Enum.at(freqs_spec, i), amplitude: Enum.at(mag_list, i)} end)

VegaLite.new(width: 680, height: 240, title: "Spectral peak detection: 50, 130, 210 Hz")
|> VegaLite.layers([
  VegaLite.new()
  |> VegaLite.data_from_values(spec_data)
  |> VegaLite.mark(:line, color: "steelblue")
  |> VegaLite.encode_field(:x, "frequency", type: :quantitative, title: "Frequency (Hz)")
  |> VegaLite.encode_field(:y, "amplitude", type: :quantitative, title: "Amplitude"),
  VegaLite.new()
  |> VegaLite.data_from_values(peak_pts)
  |> VegaLite.mark(:point, color: "tomato", size: 100, filled: true)
  |> VegaLite.encode_field(:x, "frequency", type: :quantitative)
  |> VegaLite.encode_field(:y, "amplitude", type: :quantitative)
])

Envelope detection

An amplitude-modulated (AM) signal’s positive envelope is traced by its local maxima. Finding those peaks recovers the modulating waveform.

fs_am  = 2_000
dur_am = 1.0
n_am   = trunc(fs_am * dur_am)
t_am   = Nx.linspace(0, dur_am, n: n_am, endpoint: false, type: :f32)

# Carrier: 200 Hz; modulation: 5 Hz (one envelope cycle every 200 ms)
carrier    = Nx.sin(Nx.multiply(t_am, 2 * :math.pi() * 200))
modulation = Nx.add(0.5, Nx.multiply(0.5, Nx.sin(Nx.multiply(t_am, 2 * :math.pi() * 5))))
am_signal  = Nx.multiply(carrier, modulation)

# Order ≈ half the carrier period in samples: 2000 / 200 / 2 = 5
env_result  = NxSignal.PeakFinding.argrelmax(am_signal, order: 5)
env_indices = Peaks.indices(env_result)

t_am_list  = Nx.to_flat_list(t_am)
am_list    = Nx.to_flat_list(am_signal)

am_data   = Enum.zip(t_am_list, am_list) |> Enum.map(fn {ti, v} -> %{time: ti, amplitude: v} end)
env_data  = Enum.map(env_indices, fn i -> %{time: Enum.at(t_am_list, i), amplitude: Enum.at(am_list, i)} end)

VegaLite.new(width: 680, height: 220,
  title: "Envelope detection: AM signal (carrier 200 Hz, modulation 5 Hz)"
)
|> VegaLite.layers([
  VegaLite.new()
  |> VegaLite.data_from_values(am_data)
  |> VegaLite.mark(:line, color: "lightsteelblue", stroke_width: 0.8)
  |> VegaLite.encode_field(:x, "time", type: :quantitative, title: "Time (s)")
  |> VegaLite.encode_field(:y, "amplitude", type: :quantitative, title: "Amplitude"),
  VegaLite.new()
  |> VegaLite.data_from_values(env_data)
  |> VegaLite.mark(:point, color: "tomato", size: 30, filled: true)
  |> VegaLite.encode_field(:x, "time", type: :quantitative)
  |> VegaLite.encode_field(:y, "amplitude", type: :quantitative)
])

The detected peaks (red) trace the sinusoidal envelope at 5 Hz. Connecting them would reproduce the modulating signal.

Local minima

argrelmin finds troughs, the negative envelope of the same signal:

min_result  = NxSignal.PeakFinding.argrelmin(am_signal, order: 5)
min_indices = Peaks.indices(min_result)
min_data    = Enum.map(min_indices, fn i -> %{time: Enum.at(t_am_list, i), amplitude: Enum.at(am_list, i)} end)

VegaLite.new(width: 680, height: 220, title: "Local minima: negative envelope")
|> VegaLite.layers([
  VegaLite.new()
  |> VegaLite.data_from_values(am_data)
  |> VegaLite.mark(:line, color: "lightsteelblue", stroke_width: 0.8)
  |> VegaLite.encode_field(:x, "time", type: :quantitative, title: "Time (s)")
  |> VegaLite.encode_field(:y, "amplitude", type: :quantitative, title: "Amplitude"),
  VegaLite.new()
  |> VegaLite.data_from_values(min_data)
  |> VegaLite.mark(:point, color: "darkorange", size: 30, filled: true)
  |> VegaLite.encode_field(:x, "time", type: :quantitative)
  |> VegaLite.encode_field(:y, "amplitude", type: :quantitative)
])

Summary

Function Returns Typical use
argrelmax(signal, order) Local maxima Peak frequency identification, positive envelope, ridge detection
argrelmin(signal, order) Local minima Trough detection, negative envelope
argrelextrema(signal, comparator, order) Custom extrema Any comparison function (e.g. >= for plateaus)

All return %{indices: tensor, valid_indices: scalar}. Always use valid_indices to slice off the padding before processing results.