Powered by AppSignal & Oban Pro

A Hundred Engines, One Model

notebooks/16_turbofan_fleet.livemd

A Hundred Engines, One Model

Hierarchical Bayesian RUL Estimation on NASA C-MAPSS

Mix.install([
  {:exmc, path: Path.expand("../", __DIR__)},
  {:exla, "~> 0.10"},
  {:jose, "~> 1.11"},
  {:jason, "~> 1.4"},
  {:kino, "~> 0.14"},
  {:kino_vega_lite, "~> 0.1"}
])

I. The Fleet

NASA’s Commercial Modular Aero-Propulsion System Simulation (C-MAPSS) models a fleet of turbofan engines, each running from healthy to failure. FD001 contains 100 engines under one operating condition with one fault mode (HPC degradation). Twenty-one sensors record temperatures, pressures, speeds, and flow rates every cycle.

The question fleet operators face: given the sensor readings from a running engine, how many cycles remain before failure? And critically — can engines with short histories borrow information from engines that have already failed?

# Load preprocessed data
data_path = Path.expand("../data/cmapss/cmapss_fd001.json", __DIR__)
raw = Jason.decode!(File.read!(data_path))

n_engines = raw["n_engines"]
sensors = raw["sensor_names"]
engines = raw["engines"]

# Compute lifetimes
lifetimes =
  engines
  |> Enum.map(fn {id, cycles} -> {String.to_integer(id), length(cycles)} end)
  |> Enum.sort_by(&elem(&1, 0))

lifetime_vals = Enum.map(lifetimes, &elem(&1, 1))

IO.puts("C-MAPSS FD001: #{n_engines} engines, #{length(sensors)} sensors")
IO.puts("Lifetime range: #{Enum.min(lifetime_vals)}-#{Enum.max(lifetime_vals)} cycles")
IO.puts("Mean: #{Float.round(Enum.sum(lifetime_vals) / n_engines, 0)} cycles")
IO.puts("Median: #{Enum.sort(lifetime_vals) |> Enum.at(div(n_engines, 2))} cycles")
alias VegaLite, as: Vl

# Lifetime distribution
life_hist = lifetime_vals
  |> Enum.frequencies()
  |> Enum.map(fn {life, count} -> %{"lifetime" => life, "count" => count} end)

Vl.new(width: 700, height: 200, title: "Fleet Lifetime Distribution (cycles to failure)")
|> Vl.data_from_values(life_hist)
|> Vl.mark(:bar, color: "steelblue")
|> Vl.encode_field(:x, "lifetime", type: :quantitative, bin: %{step: 20}, title: "Cycles")
|> Vl.encode_field(:y, "count", type: :quantitative, title: "Engines")

II. The Health Indicator

Raw sensor readings are noisy. We construct a scalar health indicator (HI) per engine per cycle by normalizing sensors and computing a weighted combination that correlates with remaining useful life. The simplest approach: sensors that trend monotonically with degradation.

# Find sensors that trend with degradation
# Use engine 1 as reference — compute correlation of each sensor with cycle number
engine_1 = engines["1"]
n_cycles_1 = length(engine_1)
cycle_nums = Enum.map(engine_1, & &1["cycle"])

sensor_trends =
  for s <- sensors do
    vals = Enum.map(engine_1, &amp; &amp;1[s])
    # Simple correlation: does it go up or down with cycle?
    first_20 = Enum.take(vals, 20) |> then(&amp;(Enum.sum(&amp;1) / 20))
    last_20 = Enum.take(vals, -20) |> then(&amp;(Enum.sum(&amp;1) / 20))
    delta = last_20 - first_20
    mean = Enum.sum(vals) / n_cycles_1
    range = Enum.max(vals) - Enum.min(vals)
    rel_change = if mean != 0, do: abs(delta / mean), else: 0
    {s, delta, rel_change}
  end
  |> Enum.sort_by(fn {_, _, rc} -> -rc end)

IO.puts("Sensor trends (most degradation-correlated):")
for {s, delta, rc} <- Enum.take(sensor_trends, 8) do
  dir = if delta > 0, do: "UP", else: "DN"
  IO.puts("  #{String.pad_trailing(s, 4)} #{dir} relative_change=#{Float.round(rc * 100, 1)}%")
end

# Select top trending sensors for health indicator
hi_sensors = sensor_trends
  |> Enum.filter(fn {_, _, rc} -> rc > 0.01 end)
  |> Enum.map(&amp;elem(&amp;1, 0))

IO.puts("\nUsing #{length(hi_sensors)} sensors for health indicator: #{inspect(hi_sensors)}")
# Compute health indicator per engine: normalized sensor average
# Normalize using fleet-wide min/max from early cycles (healthy state)
defmodule HealthIndicator do
  def compute(engines, hi_sensors) do
    # Compute normalization from first 20 cycles of all engines
    all_early =
      for {_id, cycles} <- engines,
          c <- Enum.take(cycles, 20),
          s <- hi_sensors do
        {s, c[s]}
      end

    stats = all_early
      |> Enum.group_by(&amp;elem(&amp;1, 0), &amp;elem(&amp;1, 1))
      |> Map.new(fn {s, vals} ->
        {s, {Enum.min(vals), Enum.max(vals)}}
      end)

    # For each engine, compute HI per cycle
    for {id, cycles} <- engines, into: %{} do
      hi_series = Enum.map(cycles, fn c ->
        normalized = for s <- hi_sensors do
          {min_v, max_v} = stats[s]
          range = max(max_v - min_v, 1.0e-10)
          (c[s] - min_v) / range
        end
        # Average of normalized sensors (higher = more degraded)
        Enum.sum(normalized) / max(length(normalized), 1)
      end)
      {id, hi_series}
    end
  end
end

health = HealthIndicator.compute(engines, hi_sensors)

# Verify: plot HI for a few engines
sample_engines = ["1", "10", "50", "99"]
hi_plot_data =
  for eid <- sample_engines,
      {hi, i} <- Enum.with_index(health[eid]) do
    lifetime = length(health[eid])
    %{"cycle" => i + 1, "hi" => Float.round(hi, 4), "engine" => "E#{eid}",
      "rul" => lifetime - i - 1}
  end

Vl.new(width: 700, height: 250, title: "Health Indicator: Sample Engines")
|> Vl.data_from_values(hi_plot_data)
|> Vl.mark(:line, stroke_width: 1.5, opacity: 0.8)
|> Vl.encode_field(:x, "cycle", type: :quantitative, title: "Cycle")
|> Vl.encode_field(:y, "hi", type: :quantitative, title: "Health Indicator (0=healthy, 1=degraded)")
|> Vl.encode_field(:color, "engine", type: :nominal)

The health indicator rises from near zero (healthy) toward one (failed). Each engine follows its own trajectory — some degrade fast, others slow. The hierarchical model captures this variation while sharing strength across the fleet.

III. Fleet-Level Hierarchical Model

The key insight: every engine is different, but they are all turbofans degrading through the same failure mode. A hierarchical model lets each engine have its own degradation rate while the fleet prior constrains engines with little data.

We model the final health indicator value at failure as:

Fleet mean lifetime:     mu ~ Normal(200, 50)
Fleet variation:         sigma_fleet ~ HalfNormal(50)
Engine lifetime:         L_i ~ Normal(mu, sigma_fleet)  [for each engine]
Observation noise:       sigma_obs ~ HalfNormal(0.1)

This is simpler than a full trajectory model but captures the essential fleet structure: engines that fail early pull down the fleet mean slightly, and an engine with only 50 cycles of data gets its RUL estimate pulled toward the fleet average.

alias Exmc.{Builder, Sampler}
alias Exmc.Dist.{Normal, HalfNormal, Custom}

defmodule Fleet do
  def t(v), do: Nx.tensor(v, type: :f64)
end
# Prepare fleet data: observed lifetimes
# Use a subset of engines (first 30) for tractability
n_fleet = 30
fleet_lifetimes =
  lifetimes
  |> Enum.take(n_fleet)
  |> Enum.map(fn {_id, life} -> life * 1.0 end)

y = Nx.tensor(fleet_lifetimes, type: :f64)
IO.puts("Fleet: #{n_fleet} engines, lifetimes #{Enum.min(fleet_lifetimes)}-#{Enum.max(fleet_lifetimes)} cycles")

# Build hierarchical model
ir =
  Builder.new_ir()
  |> Builder.data(y)
  |> Builder.rv("mu", Normal, %{mu: Fleet.t(200.0), sigma: Fleet.t(50.0)})
  |> Builder.rv("sigma_fleet", HalfNormal, %{sigma: Fleet.t(50.0)})
  |> Builder.rv("sigma_obs", HalfNormal, %{sigma: Fleet.t(20.0)})

logpdf_fn = fn _x, params ->
  obs = params.__obs_data
  mu = params.mu
  sigma = Nx.max(Nx.add(params.sigma_fleet, params.sigma_obs), Fleet.t(1.0e-6))

  z = Nx.divide(Nx.subtract(obs, mu), sigma)
  Nx.sum(Nx.subtract(Nx.multiply(Fleet.t(-0.5), Nx.multiply(z, z)), Nx.log(sigma)))
end

dist = Custom.new(logpdf_fn, support: :real)

ir =
  Custom.rv(ir, "lik", dist, %{
    mu: "mu", sigma_fleet: "sigma_fleet", sigma_obs: "sigma_obs",
    __obs_data: "__obs_data"
  })
  |> Builder.obs("lik_obs", "lik", Nx.tensor(0.0, type: :f64))
{trace, stats} = Sampler.sample(ir,
  %{"mu" => 200.0, "sigma_fleet" => 40.0, "sigma_obs" => 10.0},
  num_warmup: 1000,
  num_samples: 1000,
  ncp: false
)

mu_mean = Nx.mean(trace["mu"]) |> Nx.to_number()
sigma_fleet = Nx.mean(trace["sigma_fleet"]) |> Nx.to_number()
sigma_obs = Nx.mean(trace["sigma_obs"]) |> Nx.to_number()

IO.puts("Divergences: #{stats.divergences}")
IO.puts("Fleet mean lifetime: #{Float.round(mu_mean, 1)} cycles")
IO.puts("Fleet std (between engines): #{Float.round(sigma_fleet, 1)} cycles")
IO.puts("Observation noise: #{Float.round(sigma_obs, 1)} cycles")

IV. Predicting RUL for a Running Engine

The fleet posterior gives us a prior for any new engine. Given that engine 42 has survived 150 cycles, what is its remaining useful life?

This is a truncated prediction: we know the engine has survived at least 150 cycles, so the RUL distribution is the fleet posterior conditioned on L > 150.

# Engine 42: observed 150 cycles, still running. What's its RUL?
observed_cycles = 150

mu_samples = Nx.to_flat_list(trace["mu"])
sigma_samples = Nx.to_flat_list(trace["sigma_fleet"])

# For each posterior sample, draw a lifetime conditioned on survival past observed_cycles
rul_predictions =
  Enum.zip(mu_samples, sigma_samples)
  |> Enum.flat_map(fn {mu, sigma} ->
    sigma = max(sigma + sigma_obs, 1.0)
    # Sample from truncated normal (rejection sampling)
    for _ <- 1..10 do
      l = mu + :rand.normal() * sigma
      if l > observed_cycles, do: l - observed_cycles, else: nil
    end
  end)
  |> Enum.reject(&amp;is_nil/1)

rul_mean = Enum.sum(rul_predictions) / max(length(rul_predictions), 1)
rul_sorted = Enum.sort(rul_predictions)
n_rul = length(rul_sorted)
rul_10 = Enum.at(rul_sorted, div(n_rul * 10, 100))
rul_50 = Enum.at(rul_sorted, div(n_rul * 50, 100))
rul_90 = Enum.at(rul_sorted, div(n_rul * 90, 100))

# Actual RUL for engine 42 (from data)
actual_life = elem(Enum.at(lifetimes, 41), 1)
actual_rul = actual_life - observed_cycles

IO.puts("=== Engine 42 RUL Prediction ===")
IO.puts("Observed: #{observed_cycles} cycles")
IO.puts("Predicted RUL: #{Float.round(rul_mean, 0)} cycles (mean)")
IO.puts("10th percentile: #{Float.round(rul_10, 0)} cycles (conservative)")
IO.puts("Median: #{Float.round(rul_50, 0)} cycles")
IO.puts("90th percentile: #{Float.round(rul_90, 0)} cycles (optimistic)")
IO.puts("Actual RUL: #{actual_rul} cycles")
# Plot RUL distribution
rul_hist = rul_predictions
  |> Enum.map(&amp;Float.round(&amp;1, 0))
  |> Enum.filter(&amp; &amp;1 > 0 and &amp;1 < 300)
  |> Enum.frequencies()
  |> Enum.map(fn {rul, count} -> %{"rul" => rul, "count" => count} end)

Vl.new(width: 700, height: 250, title: "Predicted RUL Distribution for Engine 42 (at cycle #{observed_cycles})")
|> Vl.data_from_values(rul_hist)
|> Vl.layers([
  Vl.new()
  |> Vl.mark(:bar, color: "steelblue", opacity: 0.7)
  |> Vl.encode_field(:x, "rul", type: :quantitative, bin: %{step: 10}, title: "Remaining Useful Life (cycles)")
  |> Vl.encode_field(:y, "count", type: :quantitative, title: "Posterior samples"),
  Vl.new()
  |> Vl.mark(:rule, color: "red", stroke_width: 2)
  |> Vl.encode(:x, datum: actual_rul)
])

The red line is the truth. The histogram is what the model believes. The width of the distribution is the honest uncertainty — and it is the most valuable output. A point estimate of “87 cycles” is useful. A distribution that says “between 30 and 150 cycles, with 80% probability” is actionable: schedule the inspection at 30 cycles, plan the replacement part for 80.

V. Fleet Comparison: Who’s At Risk?

# Predict RUL for all engines at their midlife point
fleet_rul =
  lifetimes
  |> Enum.take(n_fleet)
  |> Enum.map(fn {id, life} ->
    midpoint = div(life, 2)
    actual_rul = life - midpoint

    # Posterior predictive RUL given survival to midpoint
    preds =
      Enum.zip(mu_samples, sigma_samples)
      |> Enum.flat_map(fn {mu, sigma} ->
        sigma = max(sigma + sigma_obs, 1.0)
        for _ <- 1..5 do
          l = mu + :rand.normal() * sigma
          if l > midpoint, do: l - midpoint, else: nil
        end
      end)
      |> Enum.reject(&amp;is_nil/1)

    pred_mean = if preds != [], do: Enum.sum(preds) / length(preds), else: 0

    %{"engine" => "E#{id}", "predicted_rul" => Float.round(pred_mean, 0),
      "actual_rul" => actual_rul,
      "error" => Float.round(pred_mean - actual_rul, 0)}
  end)

# Summary
errors = Enum.map(fleet_rul, &amp; &amp;1["error"])
mae = Enum.sum(Enum.map(errors, &amp;abs/1)) / length(errors)
IO.puts("Fleet RUL prediction (at midlife):")
IO.puts("  MAE: #{Float.round(mae, 1)} cycles")
IO.puts("  Mean error: #{Float.round(Enum.sum(errors) / length(errors), 1)} cycles (+ = overestimate)")
# Scatter: predicted vs actual RUL
Vl.new(width: 500, height: 500, title: "Predicted vs Actual RUL (at midlife)")
|> Vl.data_from_values(fleet_rul)
|> Vl.layers([
  # Diagonal
  Vl.new()
  |> Vl.mark(:rule, color: "gray", stroke_dash: [4, 4])
  |> Vl.encode(:x, datum: 0)
  |> Vl.encode(:x2, datum: 200)
  |> Vl.encode(:y, datum: 0)
  |> Vl.encode(:y2, datum: 200),
  # Points
  Vl.new()
  |> Vl.mark(:point, size: 60, filled: true, color: "steelblue")
  |> Vl.encode_field(:x, "actual_rul", type: :quantitative, title: "Actual RUL (cycles)")
  |> Vl.encode_field(:y, "predicted_rul", type: :quantitative, title: "Predicted RUL (cycles)")
])

Points near the diagonal are good predictions. Points above overestimate remaining life (dangerous — the engine fails before expected). Points below underestimate (conservative — the engine has more life than predicted, which wastes maintenance budget but keeps things safe).

Summary

Model Data What it answers
Changepoint detector Single bearing RMS When did degradation begin?
Exponential degradation Post-onset trajectory When will this bearing cross the failure threshold?
Hierarchical fleet 100 engine lifetimes What’s the RUL for a running engine, informed by the fleet?

The hierarchical model is the production workhorse. A new engine with 50 cycles of data gets its RUL estimate anchored to the fleet posterior — the same partial pooling that found the bad batch in the piston ring story and the bad factory in the laptop story. The math is the same. The stakes are different.


Data: NASA C-MAPSS FD001, 100 turbofan engines, 21 sensors, single operating condition, single fault mode (HPC degradation). Model: hierarchical Normal lifetime with fleet-level partial pooling. Inference: NUTS via eXMC.

References

  • Saxena, A. et al. (2008). “Damage Propagation Modeling for Aircraft Engine Run-to-Failure Simulation.” IEEE PHM 2008.
  • Bull, L. et al. (2023). “Hierarchical Bayesian modeling for knowledge transfer across engineering fleets.” Computer-Aided Civil & Infra. Eng.
  • Gebraeel, N. et al. (2005). “Residual-life distributions from component degradation signals.” IIE Transactions, 37, 543-557.
  • Ramasso, E. & Saxena, A. (2014). “Performance Benchmarking and Analysis of Prognostic Methods for CMAPSS Datasets.” Int. J. Prognostics & Health Management, 5(2).