Powered by AppSignal & Oban Pro

TimelessMetrics Architecture

livebook/architecture.livemd

TimelessMetrics Architecture

Supervision Tree

The application starts a top-level one_for_one supervisor containing TimelessMetrics.Supervisor (the data store) and TimelessMetrics.HTTP (the API server). The data store supervisor uses a rest_for_one strategy so that if a foundational child (like DB or Registry) crashes, all children started after it are restarted in order. SelfMonitor and the scraper subsystem are conditionally included based on configuration.

graph TD
    App["TimelessMetrics.Application
one_for_one"] App --> Sup App --> HTTP["TimelessMetrics.HTTP
Bandit Plug server"] Sup["TimelessMetrics.Supervisor
rest_for_one"] Sup --> DB["TimelessMetrics.DB
SQLite writer + reader pool"] Sup --> Reg["Registry
:actor_registry, keys: :unique"] Sup --> DynSup["DynamicSupervisor
:actor_sup, one_for_one"] Sup --> SM["TimelessMetrics.Actor.SeriesManager
ETS index + series lifecycle"] Sup --> Rollup["TimelessMetrics.Actor.Rollup
5-min tick"] Sup --> Ret["TimelessMetrics.Actor.Retention
1-hr tick"] Sup --> AE["TimelessMetrics.AlertEvaluator
60-sec tick"] Sup --> SelfMon["TimelessMetrics.SelfMonitor
optional, default: on"] Sup --> ScrapeSup["DynamicSupervisor
:scrape_sup, one_for_one"] Sup --> Scraper["TimelessMetrics.Scraper
optional, default: on"] DynSup --> SS1["SeriesServer
series 1"] DynSup --> SS2["SeriesServer
series 2"] DynSup --> SSN["SeriesServer
series N"] ScrapeSup --> W1["Scraper.Worker
target 1"] ScrapeSup --> W2["Scraper.Worker
target 2"] style SelfMon stroke-dasharray: 5 5 style ScrapeSup stroke-dasharray: 5 5 style Scraper stroke-dasharray: 5 5

Write Path

A write enters through TimelessMetrics.write/4 or write_batch/2, which delegates to Engine. The engine resolves each {metric, labels} pair to a SeriesServer pid via SeriesManager.get_or_start – a hot path using ETS + Registry that completes in microseconds. The actual write is a non-blocking GenServer.cast, so callers never wait. Inside the SeriesServer, points accumulate in raw_buffer until block_size (default

  1. is reached, at which point they are Gorilla + zstd compressed into a block. A periodic flush timer (default 60s) persists blocks and buffer to a .dat file via BlockStore, and a stale-buffer timer (30s) compresses idle buffers early.
flowchart TD
    API["TimelessMetrics.write(store, metric, labels, value)"]
    API --> Eng["Engine.write/5"]
    Eng --> TS["Extract/default timestamp"]
    TS --> GOS["SeriesManager.get_or_start(manager, metric, labels)"]

    subgraph Hot Path
        GOS --> ETS[":ets.lookup(:actor_index,
{metric, encoded_labels})"] ETS -->|hit| RegL["Registry.lookup(:actor_registry, series_id)"] RegL -->|pid found| PID["Return {series_id, pid}"] end subgraph Cold Path ETS -->|miss| Call["GenServer.call(manager, {:get_or_start, ...})"] RegL -->|not found| Call Call --> SQLIns["INSERT INTO series (metric_name, labels)"] SQLIns --> ETSIns[":ets.insert(:actor_index, {key, series_id})"] ETSIns --> StartChild["DynamicSupervisor.start_child → SeriesServer"] StartChild --> PID end PID --> Cast["GenServer.cast(pid, {:write, ts, val})"] subgraph SeriesServer Cast --> Prepend["Prepend {ts, val} to raw_buffer"] Prepend --> Check{"raw_count >= block_size?"} Check -->|yes| Compress["compress_buffer/1"] Check -->|no| Wait["Await more writes"] Compress --> Sort["Sort raw_buffer by timestamp"] Sort --> Gorilla["GorillaStream.compress(sorted, compression: :zstd)"] Gorilla --> Enqueue[":queue.in(block, blocks)"] Enqueue --> Ring{"block_count > max_blocks?"} Ring -->|yes| Drop["Drop oldest block"] Ring -->|no| Clear["Clear raw_buffer"] Drop --> Clear end subgraph Periodic Timers FlushTimer[":flush_to_disk every 60s"] FlushTimer --> BStore["BlockStore.write(path, blocks, raw_buffer)
atomic tmp + rename"] StaleTimer[":maybe_compress_stale every 30s"] StaleTimer --> StaleCheck{"raw_count > 0 AND
idle > 30s?"} StaleCheck -->|yes| Compress MergeTimer[":maybe_merge_blocks every 5min"] MergeTimer --> MergeCheck{"old blocks ≥
min_count?"} MergeCheck -->|yes| MergeExec["Decompress → merge → recompress"] end

Read / Query Path

Queries enter through TimelessMetrics.query*/4 functions. For multi-series queries, SeriesManager.find_series pattern-matches the ETS index to discover all series matching a metric name and label filter. The engine then fans out GenServer.call requests in parallel via Task.async_stream (concurrency = scheduler count). Each SeriesServer decompresses overlapping blocks, filters the raw buffer by time range, merges and sorts the results. For aggregation queries, bucketing and aggregate computation happen in-process before the reply. Cross-series aggregation (group-by) merges results after the fan-out completes.

sequenceDiagram
    participant C as Caller
    participant E as Engine
    participant SM as SeriesManager
    participant ETS as ETS Index
    participant T as Task.async_stream
    participant SS1 as SeriesServer 1
    participant SS2 as SeriesServer N
    participant A as Aggregation

    C->>E: query_aggregate_multi(store, metric, label_filter, opts)
    E->>SM: find_series(manager, metric, label_filter)
    SM->>ETS: match_object({metric, :_}, :_)
    ETS-->>SM: [{key, series_id}, ...]
    SM->>SM: decode labels, filter by label_filter
    SM->>SM: Registry.lookup each series_id
    SM-->>E: [{series_id, labels, pid}, ...]

    E->>T: async_stream(matching, max_concurrency: schedulers)

    par Fan-out to all matching series
        T->>SS1: GenServer.call({:query_aggregate, from, to, bucket, agg_fn})
        SS1->>SS1: Decompress overlapping blocks
        SS1->>SS1: Filter raw_buffer by [from, to]
        SS1->>SS1: Merge + sort by timestamp
        SS1->>A: bucket_points(points, bucket_seconds, agg_fn)
        A-->>SS1: [{bucket_ts, agg_value}, ...]
        SS1-->>T: {:ok, buckets}
    and
        T->>SS2: GenServer.call({:query_aggregate, from, to, bucket, agg_fn})
        SS2-->>T: {:ok, buckets}
    end

    T-->>E: [%{labels: l, data: [...]}, ...]
    E-->>C: {:ok, results}

Storage Layer

Data lives in two places. Per-series .dat files hold the in-memory ring buffer of compressed blocks plus the unflushed raw buffer tail – written atomically via tmp-file + rename by BlockStore. SQLite (WAL mode, single writer + pooled readers) stores series metadata, daily rollups, alert rules, scrape targets, and watermarks. Blocks use a two-stage compression pipeline: Gorilla time-series encoding (delta-of-delta timestamps, XOR-based values) followed by zstd container compression.

flowchart LR
    subgraph Per-Series .dat File
        Header["Header (12 bytes)
'AM' magic | version 1
block_count u32 | raw_count u32 | flags"] Block1["Block 0
start_ts i64 | end_ts i64
point_count u32 | data_len u32
data: Gorilla + zstd"] BlockN["Block N
..."] Raw["Raw Buffer Tail
repeated raw_count×
timestamp i64 | value f64"] Header --- Block1 --- BlockN --- Raw end subgraph SQLite metrics.db Series["series
id, metric_name, labels, created_at"] Metadata["metric_metadata
metric_name, type, unit, description"] TierDaily["tier_daily
series_id, bucket, avg, min, max, count, sum, last"] AlertRules["alert_rules
id, name, metric, condition, threshold, ..."] AlertState["alert_state
rule_id, series_labels, state, triggered_at"] AlertHistory["alert_history
id, rule_id, state, value, created_at"] ScrapeTargets["scrape_targets
id, job_name, address, metrics_path, ..."] ScrapeHealth["scrape_health
target_id, health, last_scrape, last_error"] Watermarks["_watermarks
tier, last_bucket"] Annotations["annotations
id, timestamp, title, tags"] end SS["SeriesServer"] -->|"flush every 60s"| Header SS -->|"read on startup"| Header SM["SeriesManager"] -->|"INSERT series"| Series Rollup["Actor.Rollup"] -->|"INSERT tier_daily"| TierDaily Rollup -->|"read/write watermark"| Watermarks Ret["Actor.Retention"] -->|"DELETE old rows"| TierDaily Ret -->|"DELETE orphan series"| Series

Block Merge Compaction

Each series actor accumulates many small compressed blocks over time (one per block_size points or stale-buffer flush). Merge compaction consolidates adjacent old blocks into fewer, larger blocks for better compression and faster large-range queries. Unlike TimelessLogs/Traces, merge runs inside each SeriesServer process – there is no centralized compactor. It triggers on a periodic timer (default every 5 minutes) and can also be forced via TimelessMetrics.merge_now(store), which fans out to all series processes.

flowchart TD
    subgraph "Per-SeriesServer"
        TIMER[":maybe_merge_blocks
every 5 min"] TIMER --> AGE["Filter: block.end_ts < now - min_age
default: 300s"] AGE --> COUNT{"eligible blocks ≥
min_count?
default: 4"} COUNT -->|no| NOOP[":noop"] COUNT -->|yes| GROUP["Group into batches
≈ max_points each
default: 10,000"] GROUP --> B1["Batch 1"] GROUP --> BN["Batch N"] B1 & BN --> DECOMP["GorillaStream.decompress
each block in batch"] DECOMP --> SORT["Concatenate + sort
by timestamp"] SORT --> RECOMP["GorillaStream.compress
(Gorilla + zstd)"] RECOMP --> REPLACE["Replace batch entries
in :queue with
single merged block"] REPLACE --> DIRTY["Mark dirty for
next disk flush"] end API["TimelessMetrics.merge_now(store)"] -->|"fan-out via
SeriesManager.merge_all"| TIMER
Option Default Description
merge_block_min_count 4 Min eligible blocks before merge triggers
merge_block_max_points 10,000 Target points per merged block
merge_block_min_age_seconds 300 Only merge blocks older than this
merge_interval 300,000 Merge check timer interval (ms)

HTTP Ingest and Query

TimelessMetrics.HTTP is a Plug.Router served by Bandit. Ingest endpoints accept InfluxDB line protocol, VictoriaMetrics JSON lines, and Prometheus text exposition format – all parsed (with optional parallel parsing above 2000 lines) into {metric, labels, value, timestamp} tuples and dispatched via Engine.write_batch. Query endpoints support raw export, latest-value, and range queries with aggregation; a /prometheus/api/v1/query_range endpoint provides Grafana-compatible PromQL. Optional bearer-token auth protects all endpoints except /health.

flowchart TD
    subgraph Ingest
        ILP["POST /write
InfluxDB line protocol"] VMJ["POST /api/v1/import
VM JSON lines"] Prom["POST /api/v1/import/prometheus
Prometheus text"] ILP --> Parse["Parse → [{metric, labels, value, ts}]"] VMJ --> Parse Prom --> Parse Parse --> WB["TimelessMetrics.write_batch(store, entries)"] WB --> Eng["Engine.write_batch → SeriesManager → SeriesServer cast"] end subgraph Query Exp["GET /api/v1/export
raw points"] QLatest["GET /api/v1/query
latest value"] QRange["GET /api/v1/query_range
aggregated range"] PromQL["GET /prometheus/api/v1/query_range
PromQL (Grafana)"] Exp --> QEng["Engine.query_multi → fan-out → merge"] QLatest --> QEng QRange --> QEng PromQL --> PQL["PromQL.parse → PromQL.execute"] PQL --> QEng QEng --> Resp["JSON response
VM format or Prometheus format"] end subgraph Metadata LblV["GET /api/v1/label/:name/values"] SeriesL["GET /api/v1/series"] Meta["GET|POST /api/v1/metadata"] LblV --> DBR["DB read → list/filter"] SeriesL --> DBR Meta --> DBR end subgraph Operational Health["GET /health
no auth required"] Backup["POST /api/v1/backup
VACUUM INTO snapshot"] Metrics["GET /metrics
Prometheus exposition"] end Auth["Bearer token check"] -.->|guards| Ingest Auth -.->|guards| Query Auth -.->|guards| Metadata

Rollup and Retention Lifecycle

Both Rollup and Retention are tick-driven GenServers with wall-clock-anchored scheduling – each tick computes the next interval boundary and sleeps until then, so processing time never causes drift. Rollup fires on 5-minute boundaries: it reads a watermark to find unprocessed completed days, fans out {:compute_daily, ...} calls to all series processes in parallel, batch-inserts the aggregated rows into tier_daily, and advances the watermark. Retention fires on 1-hour boundaries: it calculates a raw cutoff (default 7 days), fans out {:enforce_retention, cutoff} to drop expired blocks/buffers, deletes old daily rollups (default 1 year), and cleans up orphaned series that have no remaining data. AlertEvaluator uses the same anchored pattern on 60-second boundaries.

sequenceDiagram
    participant RT as Rollup (5-min tick)
    participant DB as SQLite
    participant SS as SeriesServer (each)
    participant RET as Retention (1-hr tick)
    participant DynSup as DynamicSupervisor
    participant ETS as ETS Index

    Note over RT: :tick fires every 5 minutes
    RT->>DB: Read watermark (last completed day)
    DB-->>RT: last_bucket timestamp
    RT->>RT: Compute completed_days(watermark, today_start)

    loop Each unprocessed day
        RT->>SS: Task.async_stream: call {:compute_daily, day_start, day_end}
        SS-->>RT: %{avg, min, max, count, sum, last}
        RT->>DB: INSERT OR REPLACE INTO tier_daily (batch)
        RT->>DB: Update watermark → day_start
    end

    Note over RET: :tick fires every 1 hour
    RET->>RET: raw_cutoff = now - 7 days

    RET->>SS: Task.async_stream: call {:enforce_retention, raw_cutoff}
    SS-->>RET: {:ok, dropped_count, empty?}
    RET->>RET: Collect orphans (empty series)

    RET->>DB: DELETE FROM tier_daily WHERE bucket < daily_cutoff

    loop Each orphan series
        RET->>DynSup: terminate_child(pid)
        RET->>ETS: match_delete(:actor_index, {:_, series_id})
        RET->>DB: DELETE FROM series WHERE id = ?
        RET->>RET: File.rm(series_N.dat)
    end

Scraper Subsystem

The Scraper GenServer manages target CRUD and worker lifecycle. On init it loads enabled targets from SQLite and starts a Scraper.Worker under DynamicSupervisor for each. Workers schedule scrapes on fixed intervals with initial jitter to avoid thundering herd. Each scrape tick fetches the target URL via HTTP, parses the Prometheus exposition text, applies the metric relabel pipeline, and writes the resulting entries through Engine.write_batch. Workers track their own health state and flush it to scrape_health periodically.

sequenceDiagram
    participant API as HTTP API
    participant S as Scraper GenServer
    participant DB as SQLite
    participant DSup as DynamicSupervisor (:scrape_sup)
    participant W as Scraper.Worker
    participant Target as Scrape Target (HTTP)
    participant R as Relabel
    participant Eng as Engine

    API->>S: add_target(params)
    S->>DB: INSERT INTO scrape_targets
    S->>DSup: start_child({Worker, target: t, store: s})
    DSup-->>S: {:ok, pid}

    Note over W: Initial scrape with random jitter
    W->>W: Process.send_after(:scrape, jitter)

    loop Every scrape_interval (default 30s)
        W->>Target: Req.get!(url, timeout: scrape_timeout)
        Target-->>W: Prometheus text body
        W->>W: Parse Prometheus text → [{metric, labels, value, ts}]
        W->>R: apply_configs(labels, metric_relabel_configs)
        R-->>W: relabeled entries (or :drop)
        W->>W: Apply honor_labels, target labels, timestamps
        W->>Eng: TimelessMetrics.write_batch(store, entries)
        W->>W: Write self-metrics (up, scrape_duration, samples_scraped)
        W->>W: Auto-register metric metadata from name suffixes
        W->>DB: Flush scrape_health (on state change or every 10 scrapes)
        W->>W: Reschedule :scrape on fixed anchor
    end

    API->>S: delete_target(id)
    S->>DSup: terminate_child(worker_pid)
    S->>DB: DELETE FROM scrape_targets WHERE id = ?

Alert System

The AlertEvaluator GenServer ticks every 60 seconds and calls Alert.evaluate/1. For each enabled rule, it queries recent metric data via query_aggregate_multi, then checks the last bucket value against the rule’s condition (above/below threshold). A per-series state machine tracks transitions through ok → pending → firing → resolved. The pending state implements configurable duration-based delays before firing. State changes to firing or resolved trigger async webhook delivery and are logged to alert_history.

stateDiagram-v2
    [*] --> ok
    ok --> pending: breaching AND duration > 0
    ok --> firing: breaching AND duration = 0
    ok --> ok: not breaching
    pending --> firing: breaching AND elapsed >= duration
    pending --> pending: breaching AND elapsed < duration
    pending --> ok: not breaching
    firing --> firing: still breaching
    firing --> resolved: not breaching
    resolved --> pending: breaching AND duration > 0
    resolved --> firing: breaching AND duration = 0
    resolved --> ok: not breaching

    note right of firing: webhook dispatched on entry
    note right of resolved: webhook dispatched on entry
sequenceDiagram
    participant AE as AlertEvaluator (60s tick)
    participant A as Alert
    participant DB as SQLite
    participant Eng as Engine
    participant SS as SeriesServer(s)
    participant WH as Webhook (HTTP POST)

    Note over AE: :tick fires every 60 seconds
    AE->>A: Alert.evaluate(store)
    A->>DB: list_rules → SELECT alert_rules + alert_state
    DB-->>A: [%{rule with states}, ...]

    loop Each enabled rule
        A->>Eng: query_aggregate_multi(store, metric, labels, lookback)
        Eng->>SS: Fan-out {:query_aggregate, ...}
        SS-->>Eng: [{bucket_ts, agg_value}, ...]
        Eng-->>A: [%{labels, data}, ...]

        loop Each matching series
            A->>A: check_condition(last_value, condition, threshold)
            A->>A: update_state(current, breaching?, duration, now)

            alt Transition to firing
                A->>DB: UPDATE alert_state SET state='firing'
                A->>DB: INSERT INTO alert_history (state='firing')
                A->>WH: Task.start → POST {alert, metric, labels, value, state}
            else Transition to resolved
                A->>DB: UPDATE alert_state SET state='resolved'
                A->>DB: INSERT INTO alert_history (state='resolved')
                A->>WH: Task.start → POST {alert, metric, labels, value, state}
            else Return to ok
                A->>DB: DELETE FROM alert_state (cleanup)
            end
        end
    end