Peak Finding
Mix.install([
{:nx_signal, path: __DIR__ |> Path.join("..") |> Path.expand()},
{:kino_vega_lite, "~> 0.1"},
{:tucan, "~> 0.5"}
])
Introduction
A local maximum is a sample that is strictly greater than all of its
neighbours within a given window. NxSignal.PeakFinding provides three
functions for finding such extrema:
-
argrelmax(signal, order)- local maxima -
argrelmin(signal, order)- local minima -
argrelextrema(signal, comparator, order)- generalised version
The order parameter controls the half-width of the comparison window.
A point must beat all neighbours within order samples on each side to
qualify. Small order → sensitive (finds every little wiggle); large
order → selective (finds only broad peaks).
All three functions return %{indices: tensor, valid_indices: scalar}. The
first valid_indices entries of indices are the detected positions; the
remainder are padded with −1.
# Helper: extract valid peak indices as a flat Elixir list
defmodule Peaks do
def indices(result) do
n = Nx.to_number(result.valid_indices)
if n == 0, do: [], else: result.indices |> Nx.slice([0, 0], [n, 1]) |> Nx.to_flat_list()
end
end
Basic usage: effect of the order parameter
A noisy signal consisting of two sinusoids illustrates how order controls
which peaks are returned.
fs = 200
dur = 2.0
t = Nx.linspace(0, dur, n: trunc(fs * dur), endpoint: false, type: :f32)
# Signal: 5 Hz + 13 Hz + noise
key = Nx.Random.key(0)
{noise, _} = Nx.Random.normal(key, shape: {trunc(fs * dur)}, type: :f32)
signal =
Nx.sin(Nx.multiply(t, 2 * :math.pi() * 5))
|> Nx.add(Nx.multiply(0.4, Nx.sin(Nx.multiply(t, 2 * :math.pi() * 13))))
|> Nx.add(Nx.multiply(0.15, noise))
t_list = Nx.to_flat_list(t)
s_list = Nx.to_flat_list(signal)
# Show how the detected peaks change with order
for order <- [3, 10, 20] do
peak_idx = NxSignal.PeakFinding.argrelmax(signal, order: order) |> Peaks.indices()
signal_data = Enum.zip(t_list, s_list) |> Enum.map(fn {ti, v} -> %{time: ti, amplitude: v} end)
peak_data = Enum.map(peak_idx, fn i -> %{time: Enum.at(t_list, i), amplitude: Enum.at(s_list, i)} end)
chart =
VegaLite.new(width: 640, height: 160,
title: "argrelmax (order=#{order}): #{length(peak_idx)} peaks found"
)
|> VegaLite.layers([
VegaLite.new()
|> VegaLite.data_from_values(signal_data)
|> VegaLite.mark(:line, color: "steelblue")
|> VegaLite.encode_field(:x, "time", type: :quantitative, title: "Time (s)")
|> VegaLite.encode_field(:y, "amplitude", type: :quantitative, title: "Amplitude"),
VegaLite.new()
|> VegaLite.data_from_values(peak_data)
|> VegaLite.mark(:point, color: "tomato", size: 80, filled: true)
|> VegaLite.encode_field(:x, "time", type: :quantitative)
|> VegaLite.encode_field(:y, "amplitude", type: :quantitative)
])
Kino.render(chart)
end
:ok
With order = 3 every small ripple is detected. With order = 20, roughly
half the period of the 5 Hz component, only the broad structural peaks
survive.
Rule of thumb: set order to roughly half the period of the feature you
want to detect, measured in samples.
Spectral peak detection
One of the most important applications is finding the dominant frequency components of a signal automatically from its FFT magnitude.
fs_spec = 1_000
dur_spec = 2.0
n_spec = trunc(fs_spec * dur_spec)
t_spec = Nx.linspace(0, dur_spec, n: n_spec, endpoint: false, type: :f32)
# Three tones with different amplitudes
x_spec =
Nx.sin(Nx.multiply(t_spec, 2 * :math.pi() * 50))
|> Nx.add(Nx.multiply(0.6, Nx.sin(Nx.multiply(t_spec, 2 * :math.pi() * 130))))
|> Nx.add(Nx.multiply(0.3, Nx.sin(Nx.multiply(t_spec, 2 * :math.pi() * 210))))
# FFT magnitude (positive frequencies only)
half_spec = div(n_spec, 2)
freqs_spec = NxSignal.fft_frequencies(fs_spec, fft_length: n_spec)[0..half_spec] |> Nx.to_flat_list()
mag =
x_spec
|> Nx.as_type({:c, 64})
|> Nx.fft()
|> Nx.abs()
|> Nx.slice([0], [half_spec + 1])
# Find spectral peaks: order=3 works well for FFT magnitudes
peak_result = NxSignal.PeakFinding.argrelmax(mag, order: 3)
peak_indices = Peaks.indices(peak_result)
mag_list = Nx.to_flat_list(mag)
# Sort by amplitude, keep top 5
peak_freqs =
peak_indices
|> Enum.map(fn i -> {Enum.at(freqs_spec, i), Enum.at(mag_list, i)} end)
|> Enum.sort_by(fn {_, a} -> -a end)
|> Enum.take(5)
IO.inspect(Enum.map(peak_freqs, fn {f, _} -> Float.round(f, 1) end), label: "Detected frequencies (Hz)")
spec_data = Enum.zip(freqs_spec, mag_list) |> Enum.map(fn {f, a} -> %{frequency: f, amplitude: a} end)
peak_pts = Enum.map(peak_indices, fn i -> %{frequency: Enum.at(freqs_spec, i), amplitude: Enum.at(mag_list, i)} end)
VegaLite.new(width: 680, height: 240, title: "Spectral peak detection: 50, 130, 210 Hz")
|> VegaLite.layers([
VegaLite.new()
|> VegaLite.data_from_values(spec_data)
|> VegaLite.mark(:line, color: "steelblue")
|> VegaLite.encode_field(:x, "frequency", type: :quantitative, title: "Frequency (Hz)")
|> VegaLite.encode_field(:y, "amplitude", type: :quantitative, title: "Amplitude"),
VegaLite.new()
|> VegaLite.data_from_values(peak_pts)
|> VegaLite.mark(:point, color: "tomato", size: 100, filled: true)
|> VegaLite.encode_field(:x, "frequency", type: :quantitative)
|> VegaLite.encode_field(:y, "amplitude", type: :quantitative)
])
Envelope detection
An amplitude-modulated (AM) signal’s positive envelope is traced by its local maxima. Finding those peaks recovers the modulating waveform.
fs_am = 2_000
dur_am = 1.0
n_am = trunc(fs_am * dur_am)
t_am = Nx.linspace(0, dur_am, n: n_am, endpoint: false, type: :f32)
# Carrier: 200 Hz; modulation: 5 Hz (one envelope cycle every 200 ms)
carrier = Nx.sin(Nx.multiply(t_am, 2 * :math.pi() * 200))
modulation = Nx.add(0.5, Nx.multiply(0.5, Nx.sin(Nx.multiply(t_am, 2 * :math.pi() * 5))))
am_signal = Nx.multiply(carrier, modulation)
# Order ≈ half the carrier period in samples: 2000 / 200 / 2 = 5
env_result = NxSignal.PeakFinding.argrelmax(am_signal, order: 5)
env_indices = Peaks.indices(env_result)
t_am_list = Nx.to_flat_list(t_am)
am_list = Nx.to_flat_list(am_signal)
am_data = Enum.zip(t_am_list, am_list) |> Enum.map(fn {ti, v} -> %{time: ti, amplitude: v} end)
env_data = Enum.map(env_indices, fn i -> %{time: Enum.at(t_am_list, i), amplitude: Enum.at(am_list, i)} end)
VegaLite.new(width: 680, height: 220,
title: "Envelope detection: AM signal (carrier 200 Hz, modulation 5 Hz)"
)
|> VegaLite.layers([
VegaLite.new()
|> VegaLite.data_from_values(am_data)
|> VegaLite.mark(:line, color: "lightsteelblue", stroke_width: 0.8)
|> VegaLite.encode_field(:x, "time", type: :quantitative, title: "Time (s)")
|> VegaLite.encode_field(:y, "amplitude", type: :quantitative, title: "Amplitude"),
VegaLite.new()
|> VegaLite.data_from_values(env_data)
|> VegaLite.mark(:point, color: "tomato", size: 30, filled: true)
|> VegaLite.encode_field(:x, "time", type: :quantitative)
|> VegaLite.encode_field(:y, "amplitude", type: :quantitative)
])
The detected peaks (red) trace the sinusoidal envelope at 5 Hz. Connecting them would reproduce the modulating signal.
Local minima
argrelmin finds troughs, the negative envelope of the same signal:
min_result = NxSignal.PeakFinding.argrelmin(am_signal, order: 5)
min_indices = Peaks.indices(min_result)
min_data = Enum.map(min_indices, fn i -> %{time: Enum.at(t_am_list, i), amplitude: Enum.at(am_list, i)} end)
VegaLite.new(width: 680, height: 220, title: "Local minima: negative envelope")
|> VegaLite.layers([
VegaLite.new()
|> VegaLite.data_from_values(am_data)
|> VegaLite.mark(:line, color: "lightsteelblue", stroke_width: 0.8)
|> VegaLite.encode_field(:x, "time", type: :quantitative, title: "Time (s)")
|> VegaLite.encode_field(:y, "amplitude", type: :quantitative, title: "Amplitude"),
VegaLite.new()
|> VegaLite.data_from_values(min_data)
|> VegaLite.mark(:point, color: "darkorange", size: 30, filled: true)
|> VegaLite.encode_field(:x, "time", type: :quantitative)
|> VegaLite.encode_field(:y, "amplitude", type: :quantitative)
])
Summary
| Function | Returns | Typical use |
|---|---|---|
argrelmax(signal, order) |
Local maxima | Peak frequency identification, positive envelope, ridge detection |
argrelmin(signal, order) |
Local minima | Trough detection, negative envelope |
argrelextrema(signal, comparator, order) |
Custom extrema |
Any comparison function (e.g. >= for plateaus) |
All return %{indices: tensor, valid_indices: scalar}. Always use
valid_indices to slice off the padding before processing results.