Powered by AppSignal & Oban Pro

TimelessTraces Architecture

livebook/architecture.livemd

TimelessTraces Architecture

Supervision Tree

The application starts a flat supervision tree with :one_for_one strategy. Each child can restart independently. The HTTP server is optional and only started when configured.

graph TD
    S["TimelessTraces.Application
Supervisor :one_for_one"] S --> R["TimelessTraces.Registry
Registry :duplicate"] S --> I["TimelessTraces.Index
GenServer"] S --> FS["FlushSupervisor
Task.Supervisor"] S --> B["TimelessTraces.Buffer
GenServer"] S --> C["TimelessTraces.Compactor
GenServer"] S --> RT["TimelessTraces.Retention
GenServer"] S -.-> H["TimelessTraces.HTTP
Bandit (optional)"] style H stroke-dasharray: 5 5
  • Registry — Duplicate-key registry for real-time span subscriptions
  • Index — ETS tables for block metadata, term index, and trace index, persisted via snapshots + disk log
  • FlushSupervisor — Task.Supervisor managing concurrent async flush tasks
  • Buffer — Accumulates spans, auto-flushes on size or interval threshold
  • Compactor — Merges raw blocks into compressed blocks (zstd or OpenZL)
  • Retention — Deletes blocks by age or total size on a periodic timer
  • HTTP — Jaeger-compatible REST API (only started when http config is set)

OpenTelemetry Integration

TimelessTraces plugs into the OpenTelemetry SDK as a span exporter. When configured, all spans produced by the OTel SDK are automatically ingested into local storage.

flowchart LR
    subgraph "Your Application"
        APP["Instrumented Code"]
        SDK["OpenTelemetry SDK"]
    end

    subgraph "TimelessTraces"
        EXP["Exporter
:otel_exporter_traces"] BUF["Buffer"] end APP -->|"start_span / end_span"| SDK SDK -->|"export(spans)"| EXP EXP -->|"ingest(spans)"| BUF

The exporter normalizes OTel span records into %TimelessTraces.Span{} structs:

  • Integer trace/span IDs → 32/16-char hex strings
  • Kind integers → atoms (:internal, :server, :client, :producer, :consumer)
  • Status codes (0/1/2) → atoms (:unset, :ok, :error)
  • ETS attribute format → plain maps
  • Events, links, resource, instrumentation scope → normalized maps

Span Data Model

Each span captures a unit of work in a distributed trace:

classDiagram
    class Span {
        trace_id: String (32 hex chars)
        span_id: String (16 hex chars)
        parent_span_id: String | nil
        name: String
        kind: :internal | :server | :client | :producer | :consumer
        start_time: integer (nanoseconds)
        end_time: integer (nanoseconds)
        duration_ns: integer
        status: :ok | :error | :unset
        status_message: String | nil
        attributes: map
        events: list
        resource: map
        instrumentation_scope: map | nil
    }

Spans sharing the same trace_id form a trace. The parent_span_id field links child spans to their parent, forming a tree structure.

Ingestion Pipeline

Spans flow from the exporter through the buffer into storage and indexing. Subscribers receive spans in real-time before they hit the buffer.

sequenceDiagram
    participant OTel as OpenTelemetry SDK
    participant EXP as Exporter
    participant B as Buffer
    participant Sub as Subscribers
    participant FS as FlushSupervisor
    participant W as Writer
    participant D as Disk / Memory
    participant I as Index

    OTel->>EXP: export(ets_tab, resource)
    EXP->>EXP: Normalize spans
    EXP->>B: Buffer.ingest(spans)
    B->>Sub: Registry.dispatch (broadcast)
    B->>B: Accumulate in list

    Note over B: Flush triggers:
buffer ≥ 1000 spans
OR 1s timer expires
OR manual flush() B->>B: Index.precompute(entries) Note over B: Extract terms + trace rows
before flush for fast indexing B->>FS: Task.Supervisor.async FS->>W: write_block(entries, :raw) W->>D: Serialize → .raw file W->>I: index_block_async(meta, terms, trace_rows) I->>I: ETS insert (hot cache) Note over I: Every 100ms:
flush pending to disk log

Backpressure

Concurrent flush tasks are capped at System.schedulers_online(). When the limit is reached, the buffer falls back to synchronous flushing, applying natural backpressure to the exporter.

Storage Layer

Blocks are written to disk as individual files under data_dir/blocks/. Each block has a 12-digit numeric ID and a format extension.

flowchart TD
    subgraph "Block Formats"
        RAW[".raw
Uncompressed ETF"] ZST[".zst
Zstandard compressed
~6.8x ratio"] OZL[".ozl
OpenZL columnar
~10x ratio"] end subgraph "OpenZL Columnar Layout" E["Span list"] --> COL["Columnar split"] COL --> TS["Timestamps
numeric columns"] COL --> STR["Names, IDs
string columns"] COL --> BLOB["Attributes, Events,
Resource
ETF blob"] end subgraph "On-Disk Layout" DIR["data_dir/"] DIR --> BLK["blocks/
000000000001.raw
000000000002.zst
000000000003.ozl"] DIR --> IDX["index.snapshot + index.log
ETS persistence"] end

Memory Mode

When storage: :memory is configured, block data is stored as BLOBs inside ETS tables only. No files are written to disk. Data does not survive restarts.

Indexing Strategy

The index maintains three layers: block metadata, a term inverted index, and a trace-level index for fast trace lookups.

flowchart LR
    subgraph "ETS (Hot Cache)"
        BT["blocks
ordered_set
block_id → metadata"] TI["term_index
bag
term → block_id"] TR["trace_index
bag
trace_id → block_id"] end subgraph "Persistence" SNAP["index.snapshot
periodic ETS dump"] LOG["index.log
write-ahead log"] end W["Writer"] -->|"index_block_async"| I["Index GenServer"] I -->|"immediate"| BT & TI & TR I -->|"journal"| LOG I -->|"every 1000 ops"| SNAP

Indexed Terms

Terms are extracted from span attributes for inverted-index lookup:

Term Pattern Source
name:{span_name} Span name
kind:{kind} Span kind
status:{status} Span status
service.name:{name} Service name from attributes or resource
http.method:{method} HTTP method
http.status_code:{code} HTTP status code
http.route:{route} HTTP route
db.system:{system} Database system
rpc.system:{system} RPC system
messaging.system:{system} Messaging system

Trace Index

The trace_index table provides fast trace-level lookups without reading blocks. It stores pre-computed metadata per trace per block:

  • span_count — number of spans in this block for this trace
  • root_span_name — name of the root span (if present in this block)
  • duration_ns — trace duration (if root span is in this block)
  • has_error — whether any span in this trace has status: :error

Query Path

Queries are lock-free on the read path. The caller’s process reads directly from ETS without GenServer calls.

sequenceDiagram
    participant C as Caller
    participant ETS as ETS Caches
    participant D as Disk
    participant TS as Task.async_stream

    C->>ETS: Lookup terms → block_ids
    Note over C,ETS: MapSet intersection
across all filter terms C->>ETS: Filter blocks by ts_min/ts_max C->>TS: Parallel block reads TS->>D: Read + decompress blocks TS-->>C: Spans stream C->>C: Apply in-memory filters
(duration, attributes, etc.) C->>C: Sort by start_time C->>C: Paginate (limit/offset) C-->>C: Return Result

Trace Lookup

Looking up a trace by ID uses the dedicated trace_index ETS table, skipping the term index entirely:

flowchart LR
    TID["trace_id"] --> ETS["trace_index ETS
trace_id → block_ids"] ETS --> READ["Read only those blocks"] READ --> FILTER["Filter spans by trace_id"] FILTER --> SORT["Sort by start_time"] SORT --> RESULT["All spans for trace"]

Compaction Lifecycle

Raw blocks are compacted into compressed blocks on a periodic timer.

flowchart TD
    subgraph "Triggers"
        T1["raw entries ≥ 500"]
        T2["oldest raw block ≥ 60s"]
        T3["30s timer tick"]
    end

    T1 & T2 & T3 --> CHECK["Compactor checks conditions"]

    CHECK -->|"threshold met"| READ["Read all raw blocks"]
    READ --> SORT["Sort by start_time"]
    SORT --> COMPRESS["Compress to zstd or OpenZL"]
    COMPRESS --> WRITE["Write compressed block"]
    WRITE --> REINDEX["Update Index
(ETS + disk log)"] REINDEX --> DELETE["Delete raw block files"] DELETE --> STATS["Update compression stats"]

Compression Performance (500K spans)

Format Ratio Query (status=error)
Zstd (level 6) ~6.8x 327ms
OpenZL (level 6) ~10.2x 158ms

OpenZL achieves better compression and faster queries due to columnar layout.

Merge Compaction

After initial compaction, many small compressed blocks accumulate (one per flush cycle). The Compactor runs a second merge pass that consolidates them into fewer, larger blocks for better compression ratios and reduced per-block I/O during reads. The merge pass runs automatically after every compaction timer tick and can also be triggered manually via TimelessTraces.merge_now/0.

flowchart TD
    subgraph "Trigger"
        T1["compressed blocks with
entry_count < target_size"] T2["block count ≥ min_blocks
default: 4"] end T1 & T2 --> GROUP["Group by ts_min into batches
targeting ~2000 entries each"] GROUP --> B1["Batch 1"] GROUP --> B2["Batch 2"] GROUP --> BN["Batch N"] B1 & B2 & BN --> DECOMP["Decompress all blocks in batch"] DECOMP --> SORT["Sort spans by start_time"] SORT --> RECOMP["Recompress as single block
zstd or OpenZL"] RECOMP --> REINDEX["Update Index
(ETS + disk log + trace_index)"] REINDEX --> DELETE["Delete old block files"] DELETE --> TEL["Emit telemetry:
[:timeless_traces, :merge_compaction, :stop]"]
Configuration Default Description
merge_compaction_target_size 2000 Target entries per merged block
merge_compaction_min_blocks 4 Min small blocks before merge triggers

Retention Policy

Retention runs on a 5-minute timer and enforces two independent limits:

flowchart TD
    TICK["5-minute timer tick"] --> AGE["Age check:
delete blocks where
ts_max < now - max_age"] TICK --> SIZE["Size check:
delete oldest blocks
until total ≤ max_bytes"] AGE --> DEL["Delete block files"] SIZE --> DEL DEL --> IDX["Remove from Index
(blocks + terms + trace_index)"] IDX --> TEL["Emit telemetry:
[:timeless_traces, :retention, :stop]"]

Defaults: 7-day max age, 512 MB max size.

Real-Time Subscriptions

Subscribers receive spans in real-time via the Registry. Spans are broadcast before they enter the buffer.

sequenceDiagram
    participant EXP as Exporter
    participant B as Buffer
    participant R as Registry
    participant S1 as Subscriber 1
    participant S2 as Subscriber 2

    EXP->>B: ingest(spans)
    B->>R: Registry.dispatch
    R->>S1: {:timeless_traces, :span, span}
    R->>S2: {:timeless_traces, :span, span}
    B->>B: Accumulate in buffer

    Note over S1,S2: Subscribers can filter
by :name, :kind, :status, :service

HTTP API

The optional HTTP layer provides Jaeger-compatible endpoints for trace visualization, plus OTLP JSON ingest.

flowchart LR
    subgraph "Ingest"
        OTLP["/insert/opentelemetry/v1/traces
POST OTLP JSON"] end subgraph "Jaeger Query API" SVC["/select/jaeger/api/services
GET"] OPS["/select/jaeger/api/services/:svc/operations
GET"] TRS["/select/jaeger/api/traces
GET (search)"] TR["/select/jaeger/api/traces/:id
GET (single trace)"] end subgraph "Operations" FL["/api/v1/flush
GET"] BK["/api/v1/backup
POST"] HE["/health
GET"] end OTLP --> B["Buffer"] SVC & OPS & TRS & TR --> IDX["Index + Blocks"] FL --> B BK --> IDX

Telemetry Events

TimelessTraces emits telemetry events at key points in the pipeline:

flowchart TD
    FLUSH["[:timeless_traces, :flush, :stop]
duration, entry_count, byte_size"] QUERY["[:timeless_traces, :query, :stop]
duration, total, blocks_read"] COMP["[:timeless_traces, :compaction, :stop]
duration, raw_blocks, entry_count, byte_size"] MCOMP["[:timeless_traces, :merge_compaction, :stop]
duration, batches_merged, blocks_consumed"] RET["[:timeless_traces, :retention, :stop]
duration, blocks_deleted"] ERR["[:timeless_traces, :block, :error]
file_path, reason"] B["Buffer flush"] --> FLUSH Q["Query execution"] --> QUERY C["Compaction run"] --> COMP MC["Merge compaction run"] --> MCOMP RT["Retention run"] --> RET R["Block read failure"] --> ERR