Powered by AppSignal & Oban Pro

Drift detection — CUSUM

livebooks/03_drift_cusum.livemd

Drift detection — CUSUM

Section

Mix.install([
  {:mobius_smarts, path: Path.expand("..", __DIR__)},
  {:kino, "~> 0.14"},
  {:kino_vega_lite, "~> 0.1"}
])
alias VegaLite, as: Vl
alias MobiusSmarts.Detect.Drift

The question: a slow leak, and “since when?”

A device’s memory usage is creeping up. Not a spike — a leak. Each window it sits a hair higher than the last, maybe a quarter of a noise-width per window. (A “noise-width” is one standard deviation, written sigma: the typical wobble of a healthy metric around its normal level. The mean is just that normal level — the average.)

A leak that small is invisible in any single window. The window-to-window noise is several times larger than the per-window creep, so eyeballing one reading — or even a tripwire that flags any window too far from normal — tells you nothing. The leak hides inside the noise.

Worse, by the time it is obvious, you have a second question that is often the harder one: since when? “Memory has been leaking since Tuesday 03:00” is a far better page than “alarm fired Friday”. CUSUM answers both, and the second one for free.

The mental model for this whole notebook: a bucket that collects suspicion. Every window slightly above normal adds a few drops; a drain empties it as fast as honest noise fills it, so on healthy data the bucket hovers near empty forever. A real leak fills slightly faster than the drain empties, so the level ratchets up until it reaches an alarm line — the alarm fires the moment the level touches the line. Rewind to when the bucket was last empty and you have the onset.

Spot the leak

Two memory-usage series, same normal level (70%) and same noise. One is pure noise. The other has a tiny upward leak — +0.05 sigma per window, starting at window 80. Can you tell which is which by eye?

:rand.seed(:exsss, {9, 9, 9})

target = 70.0
sigma = 2.0
n = 200
drift_start = 80
slope = 0.05

noise = fn -> :rand.normal() * sigma end

pure = for _ <- 1..n, do: target + noise.()

leaking =
  for i <- 0..(n - 1) do
    creep = if i >= drift_start, do: (i - drift_start) * slope * sigma, else: 0.0
    target + creep + noise.()
  end

raw_chart = fn data, title ->
  Vl.new(width: 330, height: 220, title: title)
  |> Vl.data_from_values(Enum.with_index(data, fn v, i -> %{"window" => i, "value" => v} end))
  |> Vl.mark(:line, opacity: 0.8)
  |> Vl.encode_field(:x, "window", type: :quantitative)
  |> Vl.encode_field(:y, "value", type: :quantitative, scale: [domain: [62, 84]])
end

Kino.Layout.grid(
  [raw_chart.(pure, "Series A"), raw_chart.(leaking, "Series B")],
  columns: 2
)

Look at the right edge of each chart. Both ramble around 70 for most of their length; Series B sits a touch higher near the end, but noise that size drifts up and down by chance all the time. It is genuinely ambiguous — which is the point. By the last window the leak has added about 6 sigma in total ((199 - 80) * 0.05), spread so thinly across 120 windows that no single reading looks wrong.

The reveal: Series A is pure noise, Series B is leaking. And CUSUM calls it without hesitation.

result_pure = Drift.scan(pure, target: target, sigma: sigma)
result_leak = Drift.scan(leaking, target: target, sigma: sigma)

Kino.DataTable.new([
  %{series: "A (pure)", alarm: result_pure.upper_alarm, onset: result_pure.upper_onset},
  %{series: "B (leak)", alarm: result_leak.upper_alarm, onset: result_leak.upper_onset}
])

Series A: no alarm. Series B: alarm at window 96, and an estimated onset of window 84 — within four windows of the true start (80) we injected. We never told it where to look. Now let’s open the box.

(We pass target:/sigma: explicitly because this notebook invents them. On real Mobius data, hand the whole map from MobiusSmarts.Detect.Jump.baseline/3 straight to Drift.scan(avgs, baseline: baseline) — it picks the right noise scale, sigma_avg, itself.)

The bucket, built by hand

First, standardize. We don’t care about “70.2% vs 70.0%”, we care about how many noise-widths above normal a window is:

> y = (x − target) / sigma

That turns every window into a pure number: y = 1.0 means “one sigma above normal”, y = -0.5 means “half a sigma below”. On healthy data the y values are just standard noise centered on zero.

Now the drain. We subtract a fixed allowance k from every window before adding it to the bucket. With k = 0.5, a window has to be more than half a sigma above normal to add anything net-positive; anything below that drains the bucket. Here is y − k for the healthy series:

k = 0.5
ys_pure = Enum.map(pure, fn x -> (x - target) / sigma end)

drops_chart =
  Vl.new(width: 700, height: 200, title: "y − k per window (healthy noise)")
  |> Vl.data_from_values(Enum.with_index(ys_pure, fn y, i -> %{"window" => i, "drop" => y - k} end))
  |> Vl.layers([
    Vl.new()
    |> Vl.mark(:bar)
    |> Vl.encode_field(:x, "window", type: :quantitative)
    |> Vl.encode_field(:y, "drop", type: :quantitative),
    Vl.new()
    |> Vl.data_from_values([%{"zero" => 0}])
    |> Vl.mark(:rule, color: "black")
    |> Vl.encode_field(:y, "zero", type: :quantitative)
  ])

drops_chart

Most bars sit below zero: honest noise, after the allowance, mostly drains the bucket. A few windows poke above, but they’re outnumbered. That balance is the whole trick — k is tuned so noise drains about as fast as it fills.

The bucket itself is one rule applied window after window: add y − k, but never let it go below empty.

> bucket = max(0, previous_bucket + y − k)

The max(0, …) is the floor of the bucket — suspicion can’t go negative, you just can’t be “less than not-suspicious”. Enum.scan carries the running level forward:

bucket_pure = Enum.scan(ys_pure, 0.0, fn y, prev -> max(0.0, prev + y - k) end)

bucket_chart = fn bucket, title ->
  Vl.new(width: 700, height: 220, title: title)
  |> Vl.data_from_values(Enum.with_index(bucket, fn v, i -> %{"window" => i, "bucket" => v} end))
  |> Vl.mark(:area, opacity: 0.5, line: true)
  |> Vl.encode_field(:x, "window", type: :quantitative)
  |> Vl.encode_field(:y, "bucket", type: :quantitative, scale: [domain: [0, 8]])
end

bucket_chart.(bucket_pure, "Suspicion bucket — healthy series")

The bucket scrapes along near empty the whole way, with brief shallow rises when noise clusters high, each one draining straight back down. It tops out around 4.3, never reaching an alarm line of 5. This is what “in control” looks like: the bucket can’t fill on noise alone.

The drift case: the bucket reaches the alarm line

Same construction on the leaking series. Now we let the official Drift.scan draw the alarm line h = 5 and the points, and we overlay the true drift start so we can judge the onset estimate.

h = 5.0
ys_leak = Enum.map(leaking, fn x -> (x - target) / sigma end)
bucket_leak = Enum.scan(ys_leak, 0.0, fn y, prev -> max(0.0, prev + y - k) end)

bucket_points = Enum.with_index(bucket_leak, fn v, i -> %{"window" => i, "bucket" => v} end)

Vl.new(width: 700, height: 300, title: "Suspicion bucket — leaking series")
|> Vl.layers([
  Vl.new()
  |> Vl.data_from_values(bucket_points)
  |> Vl.mark(:area, opacity: 0.5, line: true)
  |> Vl.encode_field(:x, "window", type: :quantitative)
  |> Vl.encode_field(:y, "bucket", type: :quantitative, scale: [domain: [0, 16]]),
  Vl.new()
  |> Vl.data_from_values([%{"h" => h}])
  |> Vl.mark(:rule, color: "firebrick", stroke_dash: [6, 4])
  |> Vl.encode_field(:y, "h", type: :quantitative),
  Vl.new()
  |> Vl.data_from_values([%{"x" => drift_start, "label" => "true onset"}])
  |> Vl.mark(:rule, color: "gray", stroke_dash: [2, 2])
  |> Vl.encode_field(:x, "x", type: :quantitative),
  Vl.new()
  |> Vl.data_from_values([%{"x" => result_leak.upper_onset, "y" => 0, "label" => "detected onset"}])
  |> Vl.mark(:point, color: "seagreen", size: 140, filled: true)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative),
  Vl.new()
  |> Vl.data_from_values([%{"x" => result_leak.upper_alarm, "y" => h, "label" => "alarm"}])
  |> Vl.mark(:point, color: "firebrick", size: 140, filled: true)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative)
])

Watch the bucket: flat and near-empty until the leak begins, then a steady ratchet upward — each window adds a little more than the drain removes — until it reaches the dashed red line. The red dot is the alarm (window 96); the green dot at the bottom is the detected onset (window 84); the gray dashed line is the true start (window 80).

The payoff in one sentence: leaking since window 84 (onset) — not “alarm at window 96” (alarm). The onset rewinds the alarm back to roughly when the trouble actually began.

Onset accuracy

The onset is just the last window where the bucket was empty before the alarm. Before the leak, the bucket keeps touching zero; once the leak takes hold it stops draining all the way down, so the last empty moment marks the leak’s foot.

Kino.DataTable.new([
  %{quantity: "true drift start (injected)", window: drift_start},
  %{quantity: "detected onset", window: result_leak.upper_onset},
  %{quantity: "alarm raised", window: result_leak.upper_alarm},
  %{quantity: "onset error (windows)", window: result_leak.upper_onset - drift_start}
])

Detected onset 84 vs true 80 — off by four windows — while the alarm itself didn’t fire until window 96. The estimate is biased a little late (the leak has to creep a bit before the bucket stops touching zero), but it lands far closer to the real start than the alarm does.

Tuning k: the drain rate

The rule of thumb: k is half the drift size you care about. The default 0.5 targets one-sigma drifts. Set it too low and noise leaks through the drain; set it too high and a real drift drains away before it can accumulate. Here is the same leaking series at three drains.

buckets_at_k =
  for kk <- [0.25, 0.5, 1.0] do
    r = Drift.scan(leaking, target: target, sigma: sigma, k: kk)
    series = Nx.to_flat_list(r.upper)

    chart =
      Vl.new(width: 330, height: 200, title: "k = #{kk}  (alarm: #{inspect(r.upper_alarm)})")
      |> Vl.data_from_values(Enum.with_index(series, fn v, i -> %{"window" => i, "bucket" => v} end))
      |> Vl.layers([
        Vl.new()
        |> Vl.mark(:area, opacity: 0.5, line: true)
        |> Vl.encode_field(:x, "window", type: :quantitative)
        |> Vl.encode_field(:y, "bucket", type: :quantitative, scale: [domain: [0, 20]]),
        Vl.new()
        |> Vl.data_from_values([%{"h" => h}])
        |> Vl.mark(:rule, color: "firebrick", stroke_dash: [6, 4])
        |> Vl.encode_field(:y, "h", type: :quantitative)
      ])

    chart
  end

Kino.Layout.grid(buckets_at_k, columns: 3)

k = 0.25 (left): the drain is weak, the bucket is twitchy and rises on noise alone — run the same k on the healthy series and it false-alarms at window 137. It catches the leak early but its onset estimate (window 30) lands wildly before the real start, because it was already half-full on noise. k = 0.5 (middle): clean, the bucket stays empty on noise and ratchets only on the real leak. k = 1.0 (right): the drain is so strong it eats most of the small creep, so detection slips to window 113 and the onset to 100 — the drift nearly drains away. Middle is the Goldilocks zone for a one-sigma-class leak.

Tuning h: false alarms vs delay

The alarm line h trades two costs against each other: lower h catches drifts sooner but false-alarms more often on healthy data; higher h is quieter but slower. The key fact is how that trade scales. Let’s measure it directly — run many healthy series and see how long, on average, a bucket survives before a false alarm.

This is a Monte Carlo estimate: simulate a process many times and average the outcomes. Keep it modest (200 runs, up to 3000 windows each) so it finishes in a few seconds; the numbers below are rough, but the shape is the point.

:rand.seed(:exsss, {42, 42, 42})

mc_runs = 200
mc_len = 3000

mc_results =
  for hh <- [3.0, 4.0, 5.0, 6.0] do
    times =
      for _ <- 1..mc_runs do
        series = for _ <- 1..mc_len, do: target + :rand.normal() * sigma
        r = Drift.scan(series, target: target, sigma: sigma, h: hh)

        [r.upper_alarm, r.lower_alarm]
        |> Enum.reject(&amp;is_nil/1)
        |> case do
          [] -> mc_len
          list -> Enum.min(list)
        end
      end

    %{h: hh, avg_windows_to_false_alarm: round(Enum.sum(times) / length(times))}
  end

Kino.DataTable.new(mc_results)
Vl.new(width: 700, height: 260, title: "Avg windows until a false alarm vs h (log scale)")
|> Vl.data_from_values(mc_results)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "h", type: :quantitative)
|> Vl.encode_field(:y, "avg_windows_to_false_alarm",
  type: :quantitative,
  scale: [type: :log]
)

The line is roughly straight on a log y-axis, which means time-to- false-alarm grows roughly exponentially in h. In our run: about 61 windows at h = 3, 168 at h = 4, 439 at h = 5, and 1247 at h = 6 (the last is an undercount — some runs never false-alarmed within the 3000-window cap). Those counts are for either bucket firing; per side they’re roughly double. So h = 5 gives on the order of ~800+ healthy windows per side between false alarms — in the same ballpark as the docs’ “~650 windows per side at the defaults”. Bump h to 6 and quiet stretches stretch several times longer, at the cost of detecting real drifts a little later. Size h to your monitoring horizon.

Streaming form: one update per window

scan is for batch analysis of a stored series. On-device you get one window at a time, so you keep a tiny state and step it. The state is O(1) floats per metric — two bucket levels and the config — no history retained. After an alarm we reset to start watching for the next drift.

state0 = Drift.new(target: target, sigma: sigma)

{alarm_step, onset_at_alarm, _final} =
  leaking
  |> Enum.with_index()
  |> Enum.reduce_while({nil, nil, state0}, fn {x, i}, {_, _, st} ->
    case Drift.step(st, x) do
      {:upper_alarm, st2} -> {:halt, {i, st2.upper_onset, Drift.reset(st2)}}
      {_status, st2} -> {:cont, {nil, nil, st2}}
    end
  end)

IO.puts("streaming alarm at window #{alarm_step}, onset #{onset_at_alarm}")

Streaming reports alarm at window 96 and onset 84 — identical to the batch scan result above. Same bucket, same alarm, fed one drop at a time.

For the curious: how scan skips the loop

The hand-built bucket used Enum.scan — a genuine left-to-right recursion, each window depending on the one before. Drift.scan produces the exact same numbers without any per-element loop, using a reflection identity:

> S_t = C_t − min(0, running minimum of C), > where C is the cumulative sum of (y − k).

That’s two vectorized passes (one cumulative sum, one cumulative minimum) instead of a loop — which is what lets it run cheaply over a whole stored series on any Nx backend. The proof is that the two agree:

scan_bucket = Nx.to_flat_list(Drift.scan(leaking, target: target, sigma: sigma).upper)

max_diff =
  Enum.zip(bucket_leak, scan_bucket)
  |> Enum.map(fn {a, b} -> abs(a - b) end)
  |> Enum.max()

IO.puts("largest difference, Enum.scan bucket vs scan/2 bucket: #{max_diff}")

The largest disagreement is around 1.0e-13 — floating-point dust. They are the same computation.

Blind spots

CUSUM is tuned for the small and slow. It is deliberately patient, which makes it the wrong tool for the big and sudden. A sharp jump fills the bucket quickly, but the bucket still has to fill — so a dedicated per-window tripwire beats it on speed.

:rand.seed(:exsss, {31, 31, 31})

jump_target = 70.0
jump_n = 160
jump_at = 100
jump_size = 4.0

jump_series =
  for i <- 0..(jump_n - 1) do
    shift = if i >= jump_at, do: jump_size * sigma, else: 0.0
    jump_target + shift + :rand.normal() * sigma
  end

jump_drift = Drift.scan(jump_series, target: jump_target, sigma: sigma)

# The Shewhart tripwire (see 01_jump_shewhart_charts.livemd) flags any
# single window past 3 sigma — no history, no waiting.
tripwire_window =
  jump_series
  |> Enum.with_index()
  |> Enum.find_value(fn {x, i} -> if abs((x - jump_target) / sigma) > 3.0, do: i end)

IO.puts("jump landed at window #{jump_at}")
IO.puts("Shewhart tripwire (>3 sigma) fires at window #{tripwire_window}")
IO.puts("CUSUM bucket hits the alarm line at window #{jump_drift.upper_alarm}")

The tripwire fires at window 100, the instant the jump lands; the bucket needs a window to fill and hits the line at 101. One window is small here, but for a step you don’t want to wait for — that’s MobiusSmarts.Detect.Jump (Shewhart charts, 01_jump_shewhart_charts.livemd).

Two other things the bucket can’t see, by construction: a change in spread rather than level — a metric that gets shakier while its average holds — is Jump‘s wobble result (also in the 01 notebook); and a change in distribution shape under a steady level belongs to MobiusSmarts.Detect.Shape (06_shape_distribution_distances.livemd). Run all three in parallel and each covers the others’ blind spot.