TimelessMetrics Architecture
Supervision Tree
The application starts a top-level one_for_one supervisor containing
TimelessMetrics.Supervisor (the data store) and TimelessMetrics.HTTP (the API server).
The data store supervisor uses a rest_for_one strategy so that if a foundational child
(like DB or Registry) crashes, all children started after it are restarted in order.
SelfMonitor and the scraper subsystem are conditionally included based on configuration.
graph TD
App["TimelessMetrics.Application
one_for_one"]
App --> Sup
App --> HTTP["TimelessMetrics.HTTP
Bandit Plug server"]
Sup["TimelessMetrics.Supervisor
rest_for_one"]
Sup --> DB["TimelessMetrics.DB
SQLite writer + reader pool"]
Sup --> Reg["Registry
:actor_registry, keys: :unique"]
Sup --> DynSup["DynamicSupervisor
:actor_sup, one_for_one"]
Sup --> SM["TimelessMetrics.Actor.SeriesManager
ETS index + series lifecycle"]
Sup --> Rollup["TimelessMetrics.Actor.Rollup
5-min tick"]
Sup --> Ret["TimelessMetrics.Actor.Retention
1-hr tick"]
Sup --> AE["TimelessMetrics.AlertEvaluator
60-sec tick"]
Sup --> SelfMon["TimelessMetrics.SelfMonitor
optional, default: on"]
Sup --> ScrapeSup["DynamicSupervisor
:scrape_sup, one_for_one"]
Sup --> Scraper["TimelessMetrics.Scraper
optional, default: on"]
DynSup --> SS1["SeriesServer
series 1"]
DynSup --> SS2["SeriesServer
series 2"]
DynSup --> SSN["SeriesServer
series N"]
ScrapeSup --> W1["Scraper.Worker
target 1"]
ScrapeSup --> W2["Scraper.Worker
target 2"]
style SelfMon stroke-dasharray: 5 5
style ScrapeSup stroke-dasharray: 5 5
style Scraper stroke-dasharray: 5 5
Write Path
A write enters through TimelessMetrics.write/4 or write_batch/2, which delegates to
Engine. The engine resolves each {metric, labels} pair to a SeriesServer pid via
SeriesManager.get_or_start – a hot path using ETS + Registry that completes in
microseconds. The actual write is a non-blocking GenServer.cast, so callers never wait.
Inside the SeriesServer, points accumulate in raw_buffer until block_size (default
-
is reached, at which point they are Gorilla + zstd compressed into a block. A
periodic flush timer (default 60s) persists blocks and buffer to a
.datfile viaBlockStore, and a stale-buffer timer (30s) compresses idle buffers early.
flowchart TD
API["TimelessMetrics.write(store, metric, labels, value)"]
API --> Eng["Engine.write/5"]
Eng --> TS["Extract/default timestamp"]
TS --> GOS["SeriesManager.get_or_start(manager, metric, labels)"]
subgraph Hot Path
GOS --> ETS[":ets.lookup(:actor_index,
{metric, encoded_labels})"]
ETS -->|hit| RegL["Registry.lookup(:actor_registry, series_id)"]
RegL -->|pid found| PID["Return {series_id, pid}"]
end
subgraph Cold Path
ETS -->|miss| Call["GenServer.call(manager, {:get_or_start, ...})"]
RegL -->|not found| Call
Call --> SQLIns["INSERT INTO series (metric_name, labels)"]
SQLIns --> ETSIns[":ets.insert(:actor_index, {key, series_id})"]
ETSIns --> StartChild["DynamicSupervisor.start_child → SeriesServer"]
StartChild --> PID
end
PID --> Cast["GenServer.cast(pid, {:write, ts, val})"]
subgraph SeriesServer
Cast --> Prepend["Prepend {ts, val} to raw_buffer"]
Prepend --> Check{"raw_count >= block_size?"}
Check -->|yes| Compress["compress_buffer/1"]
Check -->|no| Wait["Await more writes"]
Compress --> Sort["Sort raw_buffer by timestamp"]
Sort --> Gorilla["GorillaStream.compress(sorted, compression: :zstd)"]
Gorilla --> Enqueue[":queue.in(block, blocks)"]
Enqueue --> Ring{"block_count > max_blocks?"}
Ring -->|yes| Drop["Drop oldest block"]
Ring -->|no| Clear["Clear raw_buffer"]
Drop --> Clear
end
subgraph Periodic Timers
FlushTimer[":flush_to_disk every 60s"]
FlushTimer --> BStore["BlockStore.write(path, blocks, raw_buffer)
atomic tmp + rename"]
StaleTimer[":maybe_compress_stale every 30s"]
StaleTimer --> StaleCheck{"raw_count > 0 AND
idle > 30s?"}
StaleCheck -->|yes| Compress
MergeTimer[":maybe_merge_blocks every 5min"]
MergeTimer --> MergeCheck{"old blocks ≥
min_count?"}
MergeCheck -->|yes| MergeExec["Decompress → merge → recompress"]
end
Read / Query Path
Queries enter through TimelessMetrics.query*/4 functions. For multi-series queries,
SeriesManager.find_series pattern-matches the ETS index to discover all series matching
a metric name and label filter. The engine then fans out GenServer.call requests in
parallel via Task.async_stream (concurrency = scheduler count). Each SeriesServer
decompresses overlapping blocks, filters the raw buffer by time range, merges and sorts
the results. For aggregation queries, bucketing and aggregate computation happen
in-process before the reply. Cross-series aggregation (group-by) merges results after
the fan-out completes.
sequenceDiagram
participant C as Caller
participant E as Engine
participant SM as SeriesManager
participant ETS as ETS Index
participant T as Task.async_stream
participant SS1 as SeriesServer 1
participant SS2 as SeriesServer N
participant A as Aggregation
C->>E: query_aggregate_multi(store, metric, label_filter, opts)
E->>SM: find_series(manager, metric, label_filter)
SM->>ETS: match_object({metric, :_}, :_)
ETS-->>SM: [{key, series_id}, ...]
SM->>SM: decode labels, filter by label_filter
SM->>SM: Registry.lookup each series_id
SM-->>E: [{series_id, labels, pid}, ...]
E->>T: async_stream(matching, max_concurrency: schedulers)
par Fan-out to all matching series
T->>SS1: GenServer.call({:query_aggregate, from, to, bucket, agg_fn})
SS1->>SS1: Decompress overlapping blocks
SS1->>SS1: Filter raw_buffer by [from, to]
SS1->>SS1: Merge + sort by timestamp
SS1->>A: bucket_points(points, bucket_seconds, agg_fn)
A-->>SS1: [{bucket_ts, agg_value}, ...]
SS1-->>T: {:ok, buckets}
and
T->>SS2: GenServer.call({:query_aggregate, from, to, bucket, agg_fn})
SS2-->>T: {:ok, buckets}
end
T-->>E: [%{labels: l, data: [...]}, ...]
E-->>C: {:ok, results}
Storage Layer
Data lives in two places. Per-series .dat files hold the in-memory ring buffer of
compressed blocks plus the unflushed raw buffer tail – written atomically via
tmp-file + rename by BlockStore. SQLite (WAL mode, single writer + pooled readers)
stores series metadata, daily rollups, alert rules, scrape targets, and watermarks.
Blocks use a two-stage compression pipeline: Gorilla time-series encoding
(delta-of-delta timestamps, XOR-based values) followed by zstd container compression.
flowchart LR
subgraph Per-Series .dat File
Header["Header (12 bytes)
'AM' magic | version 1
block_count u32 | raw_count u32 | flags"]
Block1["Block 0
start_ts i64 | end_ts i64
point_count u32 | data_len u32
data: Gorilla + zstd"]
BlockN["Block N
..."]
Raw["Raw Buffer Tail
repeated raw_count×
timestamp i64 | value f64"]
Header --- Block1 --- BlockN --- Raw
end
subgraph SQLite metrics.db
Series["series
id, metric_name, labels, created_at"]
Metadata["metric_metadata
metric_name, type, unit, description"]
TierDaily["tier_daily
series_id, bucket, avg, min, max, count, sum, last"]
AlertRules["alert_rules
id, name, metric, condition, threshold, ..."]
AlertState["alert_state
rule_id, series_labels, state, triggered_at"]
AlertHistory["alert_history
id, rule_id, state, value, created_at"]
ScrapeTargets["scrape_targets
id, job_name, address, metrics_path, ..."]
ScrapeHealth["scrape_health
target_id, health, last_scrape, last_error"]
Watermarks["_watermarks
tier, last_bucket"]
Annotations["annotations
id, timestamp, title, tags"]
end
SS["SeriesServer"] -->|"flush every 60s"| Header
SS -->|"read on startup"| Header
SM["SeriesManager"] -->|"INSERT series"| Series
Rollup["Actor.Rollup"] -->|"INSERT tier_daily"| TierDaily
Rollup -->|"read/write watermark"| Watermarks
Ret["Actor.Retention"] -->|"DELETE old rows"| TierDaily
Ret -->|"DELETE orphan series"| Series
Block Merge Compaction
Each series actor accumulates many small compressed blocks over time (one per
block_size points or stale-buffer flush). Merge compaction consolidates adjacent
old blocks into fewer, larger blocks for better compression and faster large-range
queries. Unlike TimelessLogs/Traces, merge runs inside each SeriesServer process
– there is no centralized compactor. It triggers on a periodic timer (default
every 5 minutes) and can also be forced via TimelessMetrics.merge_now(store),
which fans out to all series processes.
flowchart TD
subgraph "Per-SeriesServer"
TIMER[":maybe_merge_blocks
every 5 min"]
TIMER --> AGE["Filter: block.end_ts < now - min_age
default: 300s"]
AGE --> COUNT{"eligible blocks ≥
min_count?
default: 4"}
COUNT -->|no| NOOP[":noop"]
COUNT -->|yes| GROUP["Group into batches
≈ max_points each
default: 10,000"]
GROUP --> B1["Batch 1"]
GROUP --> BN["Batch N"]
B1 & BN --> DECOMP["GorillaStream.decompress
each block in batch"]
DECOMP --> SORT["Concatenate + sort
by timestamp"]
SORT --> RECOMP["GorillaStream.compress
(Gorilla + zstd)"]
RECOMP --> REPLACE["Replace batch entries
in :queue with
single merged block"]
REPLACE --> DIRTY["Mark dirty for
next disk flush"]
end
API["TimelessMetrics.merge_now(store)"] -->|"fan-out via
SeriesManager.merge_all"| TIMER
| Option | Default | Description |
|---|---|---|
merge_block_min_count |
4 | Min eligible blocks before merge triggers |
merge_block_max_points |
10,000 | Target points per merged block |
merge_block_min_age_seconds |
300 | Only merge blocks older than this |
merge_interval |
300,000 | Merge check timer interval (ms) |
HTTP Ingest and Query
TimelessMetrics.HTTP is a Plug.Router served by Bandit. Ingest endpoints accept
InfluxDB line protocol, VictoriaMetrics JSON lines, and Prometheus text exposition
format – all parsed (with optional parallel parsing above 2000 lines) into
{metric, labels, value, timestamp} tuples and dispatched via Engine.write_batch.
Query endpoints support raw export, latest-value, and range queries with aggregation;
a /prometheus/api/v1/query_range endpoint provides Grafana-compatible PromQL.
Optional bearer-token auth protects all endpoints except /health.
flowchart TD
subgraph Ingest
ILP["POST /write
InfluxDB line protocol"]
VMJ["POST /api/v1/import
VM JSON lines"]
Prom["POST /api/v1/import/prometheus
Prometheus text"]
ILP --> Parse["Parse → [{metric, labels, value, ts}]"]
VMJ --> Parse
Prom --> Parse
Parse --> WB["TimelessMetrics.write_batch(store, entries)"]
WB --> Eng["Engine.write_batch → SeriesManager → SeriesServer cast"]
end
subgraph Query
Exp["GET /api/v1/export
raw points"]
QLatest["GET /api/v1/query
latest value"]
QRange["GET /api/v1/query_range
aggregated range"]
PromQL["GET /prometheus/api/v1/query_range
PromQL (Grafana)"]
Exp --> QEng["Engine.query_multi → fan-out → merge"]
QLatest --> QEng
QRange --> QEng
PromQL --> PQL["PromQL.parse → PromQL.execute"]
PQL --> QEng
QEng --> Resp["JSON response
VM format or Prometheus format"]
end
subgraph Metadata
LblV["GET /api/v1/label/:name/values"]
SeriesL["GET /api/v1/series"]
Meta["GET|POST /api/v1/metadata"]
LblV --> DBR["DB read → list/filter"]
SeriesL --> DBR
Meta --> DBR
end
subgraph Operational
Health["GET /health
no auth required"]
Backup["POST /api/v1/backup
VACUUM INTO snapshot"]
Metrics["GET /metrics
Prometheus exposition"]
end
Auth["Bearer token check"] -.->|guards| Ingest
Auth -.->|guards| Query
Auth -.->|guards| Metadata
Rollup and Retention Lifecycle
Both Rollup and Retention are tick-driven GenServers with wall-clock-anchored
scheduling – each tick computes the next interval boundary and sleeps until then,
so processing time never causes drift. Rollup fires on 5-minute boundaries: it reads a
watermark to find unprocessed completed days, fans out {:compute_daily, ...} calls to
all series processes in parallel, batch-inserts the aggregated rows into tier_daily,
and advances the watermark. Retention fires on 1-hour boundaries: it calculates a raw
cutoff (default 7 days), fans out {:enforce_retention, cutoff} to drop expired
blocks/buffers, deletes old daily rollups (default 1 year), and cleans up orphaned series
that have no remaining data. AlertEvaluator uses the same anchored pattern on 60-second
boundaries.
sequenceDiagram
participant RT as Rollup (5-min tick)
participant DB as SQLite
participant SS as SeriesServer (each)
participant RET as Retention (1-hr tick)
participant DynSup as DynamicSupervisor
participant ETS as ETS Index
Note over RT: :tick fires every 5 minutes
RT->>DB: Read watermark (last completed day)
DB-->>RT: last_bucket timestamp
RT->>RT: Compute completed_days(watermark, today_start)
loop Each unprocessed day
RT->>SS: Task.async_stream: call {:compute_daily, day_start, day_end}
SS-->>RT: %{avg, min, max, count, sum, last}
RT->>DB: INSERT OR REPLACE INTO tier_daily (batch)
RT->>DB: Update watermark → day_start
end
Note over RET: :tick fires every 1 hour
RET->>RET: raw_cutoff = now - 7 days
RET->>SS: Task.async_stream: call {:enforce_retention, raw_cutoff}
SS-->>RET: {:ok, dropped_count, empty?}
RET->>RET: Collect orphans (empty series)
RET->>DB: DELETE FROM tier_daily WHERE bucket < daily_cutoff
loop Each orphan series
RET->>DynSup: terminate_child(pid)
RET->>ETS: match_delete(:actor_index, {:_, series_id})
RET->>DB: DELETE FROM series WHERE id = ?
RET->>RET: File.rm(series_N.dat)
end
Scraper Subsystem
The Scraper GenServer manages target CRUD and worker lifecycle. On init it loads
enabled targets from SQLite and starts a Scraper.Worker under DynamicSupervisor
for each. Workers schedule scrapes on fixed intervals with initial jitter to avoid
thundering herd. Each scrape tick fetches the target URL via HTTP, parses the
Prometheus exposition text, applies the metric relabel pipeline, and writes the
resulting entries through Engine.write_batch. Workers track their own health state
and flush it to scrape_health periodically.
sequenceDiagram
participant API as HTTP API
participant S as Scraper GenServer
participant DB as SQLite
participant DSup as DynamicSupervisor (:scrape_sup)
participant W as Scraper.Worker
participant Target as Scrape Target (HTTP)
participant R as Relabel
participant Eng as Engine
API->>S: add_target(params)
S->>DB: INSERT INTO scrape_targets
S->>DSup: start_child({Worker, target: t, store: s})
DSup-->>S: {:ok, pid}
Note over W: Initial scrape with random jitter
W->>W: Process.send_after(:scrape, jitter)
loop Every scrape_interval (default 30s)
W->>Target: Req.get!(url, timeout: scrape_timeout)
Target-->>W: Prometheus text body
W->>W: Parse Prometheus text → [{metric, labels, value, ts}]
W->>R: apply_configs(labels, metric_relabel_configs)
R-->>W: relabeled entries (or :drop)
W->>W: Apply honor_labels, target labels, timestamps
W->>Eng: TimelessMetrics.write_batch(store, entries)
W->>W: Write self-metrics (up, scrape_duration, samples_scraped)
W->>W: Auto-register metric metadata from name suffixes
W->>DB: Flush scrape_health (on state change or every 10 scrapes)
W->>W: Reschedule :scrape on fixed anchor
end
API->>S: delete_target(id)
S->>DSup: terminate_child(worker_pid)
S->>DB: DELETE FROM scrape_targets WHERE id = ?
Alert System
The AlertEvaluator GenServer ticks every 60 seconds and calls Alert.evaluate/1. For
each enabled rule, it queries recent metric data via query_aggregate_multi, then checks
the last bucket value against the rule’s condition (above/below threshold). A per-series
state machine tracks transitions through ok → pending → firing → resolved. The pending
state implements configurable duration-based delays before firing. State changes to firing
or resolved trigger async webhook delivery and are logged to alert_history.
stateDiagram-v2
[*] --> ok
ok --> pending: breaching AND duration > 0
ok --> firing: breaching AND duration = 0
ok --> ok: not breaching
pending --> firing: breaching AND elapsed >= duration
pending --> pending: breaching AND elapsed < duration
pending --> ok: not breaching
firing --> firing: still breaching
firing --> resolved: not breaching
resolved --> pending: breaching AND duration > 0
resolved --> firing: breaching AND duration = 0
resolved --> ok: not breaching
note right of firing: webhook dispatched on entry
note right of resolved: webhook dispatched on entry
sequenceDiagram
participant AE as AlertEvaluator (60s tick)
participant A as Alert
participant DB as SQLite
participant Eng as Engine
participant SS as SeriesServer(s)
participant WH as Webhook (HTTP POST)
Note over AE: :tick fires every 60 seconds
AE->>A: Alert.evaluate(store)
A->>DB: list_rules → SELECT alert_rules + alert_state
DB-->>A: [%{rule with states}, ...]
loop Each enabled rule
A->>Eng: query_aggregate_multi(store, metric, labels, lookback)
Eng->>SS: Fan-out {:query_aggregate, ...}
SS-->>Eng: [{bucket_ts, agg_value}, ...]
Eng-->>A: [%{labels, data}, ...]
loop Each matching series
A->>A: check_condition(last_value, condition, threshold)
A->>A: update_state(current, breaching?, duration, now)
alt Transition to firing
A->>DB: UPDATE alert_state SET state='firing'
A->>DB: INSERT INTO alert_history (state='firing')
A->>WH: Task.start → POST {alert, metric, labels, value, state}
else Transition to resolved
A->>DB: UPDATE alert_state SET state='resolved'
A->>DB: INSERT INTO alert_history (state='resolved')
A->>WH: Task.start → POST {alert, metric, labels, value, state}
else Return to ok
A->>DB: DELETE FROM alert_state (cleanup)
end
end
end