Powered by AppSignal & Oban Pro

TimelessTraces Architecture

livebook/architecture.livemd

TimelessTraces Architecture

Supervision Tree

The application starts a flat supervision tree with :one_for_one strategy. Each child can restart independently. The HTTP server is optional and only started when configured.

graph TD
    S["TimelessTraces.Application
Supervisor :one_for_one"] S --> R["TimelessTraces.Registry
Registry :duplicate"] S --> I["TimelessTraces.Index
GenServer"] S --> FS["FlushSupervisor
Task.Supervisor"] S --> B["TimelessTraces.Buffer
GenServer"] S --> C["TimelessTraces.Compactor
GenServer"] S --> RT["TimelessTraces.Retention
GenServer"] S -.-> H["TimelessTraces.HTTP
Bandit (optional)"] style H stroke-dasharray: 5 5
  • Registry — Duplicate-key registry for real-time span subscriptions
  • Index — SQLite persistence + ETS caching for block metadata, term index, and trace index
  • FlushSupervisor — Task.Supervisor managing concurrent async flush tasks
  • Buffer — Accumulates spans, auto-flushes on size or interval threshold
  • Compactor — Merges raw blocks into compressed blocks (zstd or OpenZL)
  • Retention — Deletes blocks by age or total size on a periodic timer
  • HTTP — Jaeger-compatible REST API (only started when http config is set)

OpenTelemetry Integration

TimelessTraces plugs into the OpenTelemetry SDK as a span exporter. When configured, all spans produced by the OTel SDK are automatically ingested into local storage.

flowchart LR
    subgraph "Your Application"
        APP["Instrumented Code"]
        SDK["OpenTelemetry SDK"]
    end

    subgraph "TimelessTraces"
        EXP["Exporter
:otel_exporter_traces"] BUF["Buffer"] end APP -->|"start_span / end_span"| SDK SDK -->|"export(spans)"| EXP EXP -->|"ingest(spans)"| BUF

The exporter normalizes OTel span records into %TimelessTraces.Span{} structs:

  • Integer trace/span IDs → 32/16-char hex strings
  • Kind integers → atoms (:internal, :server, :client, :producer, :consumer)
  • Status codes (0/1/2) → atoms (:unset, :ok, :error)
  • ETS attribute format → plain maps
  • Events, links, resource, instrumentation scope → normalized maps

Span Data Model

Each span captures a unit of work in a distributed trace:

classDiagram
    class Span {
        trace_id: String (32 hex chars)
        span_id: String (16 hex chars)
        parent_span_id: String | nil
        name: String
        kind: :internal | :server | :client | :producer | :consumer
        start_time: integer (nanoseconds)
        end_time: integer (nanoseconds)
        duration_ns: integer
        status: :ok | :error | :unset
        status_message: String | nil
        attributes: map
        events: list
        resource: map
        instrumentation_scope: map | nil
    }

Spans sharing the same trace_id form a trace. The parent_span_id field links child spans to their parent, forming a tree structure.

Ingestion Pipeline

Spans flow from the exporter through the buffer into storage and indexing. Subscribers receive spans in real-time before they hit the buffer.

sequenceDiagram
    participant OTel as OpenTelemetry SDK
    participant EXP as Exporter
    participant B as Buffer
    participant Sub as Subscribers
    participant FS as FlushSupervisor
    participant W as Writer
    participant D as Disk / Memory
    participant I as Index

    OTel->>EXP: export(ets_tab, resource)
    EXP->>EXP: Normalize spans
    EXP->>B: Buffer.ingest(spans)
    B->>Sub: Registry.dispatch (broadcast)
    B->>B: Accumulate in list

    Note over B: Flush triggers:
buffer ≥ 1000 spans
OR 1s timer expires
OR manual flush() B->>B: Index.precompute(entries) Note over B: Extract terms + trace rows
before flush for fast indexing B->>FS: Task.Supervisor.async FS->>W: write_block(entries, :raw) W->>D: Serialize → .raw file W->>I: index_block_async(meta, terms, trace_rows) I->>I: ETS insert (hot cache) Note over I: Every 2s:
ETS → SQLite batch persist

Backpressure

Concurrent flush tasks are capped at System.schedulers_online(). When the limit is reached, the buffer falls back to synchronous flushing, applying natural backpressure to the exporter.

Storage Layer

Blocks are written to disk as individual files under data_dir/blocks/. Each block has a 12-digit numeric ID and a format extension.

flowchart TD
    subgraph "Block Formats"
        RAW[".raw
Uncompressed ETF"] ZST[".zst
Zstandard compressed
~6.8x ratio"] OZL[".ozl
OpenZL columnar
~10x ratio"] end subgraph "OpenZL Columnar Layout" E["Span list"] --> COL["Columnar split"] COL --> TS["Timestamps
numeric columns"] COL --> STR["Names, IDs
string columns"] COL --> BLOB["Attributes, Events,
Resource
ETF blob"] end subgraph "On-Disk Layout" DIR["data_dir/"] DIR --> BLK["blocks/
000000000001.raw
000000000002.zst
000000000003.ozl"] DIR --> IDX["index.db
SQLite"] end

Memory Mode

When storage: :memory is configured, block data is stored as BLOBs inside SQLite instead of individual files. The index and ETS caches operate identically.

Indexing Strategy

The index maintains three layers: block metadata, a term inverted index, and a trace-level index for fast trace lookups.

flowchart LR
    subgraph "ETS (Hot Cache)"
        BT["blocks
ordered_set
block_id → metadata"] TI["term_index
bag
term → block_id"] TR["trace_index
bag
trace_id → block_id"] end subgraph "SQLite (Persistent)" BM["blocks table
id, file_path, format,
byte_size, entry_count,
ts_min, ts_max"] BTM["block_terms table
term, block_id"] TRM["trace_index table
trace_id, block_id,
span_count, root_span_name,
duration_ns, has_error"] end W["Writer"] -->|"index_block_async"| I["Index GenServer"] I -->|"immediate"| BT & TI & TR I -->|"batched 2s"| BM & BTM & TRM

Indexed Terms

Terms are extracted from span attributes for inverted-index lookup:

Term Pattern Source
name:{span_name} Span name
kind:{kind} Span kind
status:{status} Span status
service.name:{name} Service name from attributes or resource
http.method:{method} HTTP method
http.status_code:{code} HTTP status code
http.route:{route} HTTP route
db.system:{system} Database system
rpc.system:{system} RPC system
messaging.system:{system} Messaging system

Trace Index

The trace_index table provides fast trace-level lookups without reading blocks. It stores pre-computed metadata per trace per block:

  • span_count — number of spans in this block for this trace
  • root_span_name — name of the root span (if present in this block)
  • duration_ns — trace duration (if root span is in this block)
  • has_error — whether any span in this trace has status: :error

Query Path

Queries are lock-free on the read path. The caller’s process reads directly from ETS without GenServer calls.

sequenceDiagram
    participant C as Caller
    participant ETS as ETS Caches
    participant D as Disk
    participant TS as Task.async_stream

    C->>ETS: Lookup terms → block_ids
    Note over C,ETS: MapSet intersection
across all filter terms C->>ETS: Filter blocks by ts_min/ts_max C->>TS: Parallel block reads TS->>D: Read + decompress blocks TS-->>C: Spans stream C->>C: Apply in-memory filters
(duration, attributes, etc.) C->>C: Sort by start_time C->>C: Paginate (limit/offset) C-->>C: Return Result

Trace Lookup

Looking up a trace by ID uses the dedicated trace_index ETS table, skipping the term index entirely:

flowchart LR
    TID["trace_id"] --> ETS["trace_index ETS
trace_id → block_ids"] ETS --> READ["Read only those blocks"] READ --> FILTER["Filter spans by trace_id"] FILTER --> SORT["Sort by start_time"] SORT --> RESULT["All spans for trace"]

Compaction Lifecycle

Raw blocks are compacted into compressed blocks on a periodic timer.

flowchart TD
    subgraph "Triggers"
        T1["raw entries ≥ 500"]
        T2["oldest raw block ≥ 60s"]
        T3["30s timer tick"]
    end

    T1 & T2 & T3 --> CHECK["Compactor checks conditions"]

    CHECK -->|"threshold met"| READ["Read all raw blocks"]
    READ --> SORT["Sort by start_time"]
    SORT --> COMPRESS["Compress to zstd or OpenZL"]
    COMPRESS --> WRITE["Write compressed block"]
    WRITE --> REINDEX["Update Index
(ETS + SQLite)"] REINDEX --> DELETE["Delete raw block files"] DELETE --> STATS["Update compression stats"]

Compression Performance (500K spans)

Format Ratio Query (status=error)
Zstd (level 6) ~6.8x 289ms
OpenZL (level 6) ~10x 148ms

OpenZL achieves better compression and faster queries due to columnar layout.

Merge Compaction

After initial compaction, many small compressed blocks accumulate (one per flush cycle). The Compactor runs a second merge pass that consolidates them into fewer, larger blocks for better compression ratios and reduced per-block I/O during reads. The merge pass runs automatically after every compaction timer tick and can also be triggered manually via TimelessTraces.merge_now/0.

flowchart TD
    subgraph "Trigger"
        T1["compressed blocks with
entry_count < target_size"] T2["block count ≥ min_blocks
default: 4"] end T1 & T2 --> GROUP["Group by ts_min into batches
targeting ~2000 entries each"] GROUP --> B1["Batch 1"] GROUP --> B2["Batch 2"] GROUP --> BN["Batch N"] B1 & B2 & BN --> DECOMP["Decompress all blocks in batch"] DECOMP --> SORT["Sort spans by start_time"] SORT --> RECOMP["Recompress as single block
zstd or OpenZL"] RECOMP --> REINDEX["Update Index
(ETS + SQLite + trace_index)"] REINDEX --> DELETE["Delete old block files"] DELETE --> TEL["Emit telemetry:
[:timeless_traces, :merge_compaction, :stop]"]
Configuration Default Description
merge_compaction_target_size 2000 Target entries per merged block
merge_compaction_min_blocks 4 Min small blocks before merge triggers

Retention Policy

Retention runs on a 5-minute timer and enforces two independent limits:

flowchart TD
    TICK["5-minute timer tick"] --> AGE["Age check:
delete blocks where
ts_max < now - max_age"] TICK --> SIZE["Size check:
delete oldest blocks
until total ≤ max_bytes"] AGE --> DEL["Delete block files"] SIZE --> DEL DEL --> IDX["Remove from Index
(blocks + terms + trace_index)"] IDX --> TEL["Emit telemetry:
[:timeless_traces, :retention, :stop]"]

Defaults: 7-day max age, 512 MB max size.

Real-Time Subscriptions

Subscribers receive spans in real-time via the Registry. Spans are broadcast before they enter the buffer.

sequenceDiagram
    participant EXP as Exporter
    participant B as Buffer
    participant R as Registry
    participant S1 as Subscriber 1
    participant S2 as Subscriber 2

    EXP->>B: ingest(spans)
    B->>R: Registry.dispatch
    R->>S1: {:timeless_traces, :span, span}
    R->>S2: {:timeless_traces, :span, span}
    B->>B: Accumulate in buffer

    Note over S1,S2: Subscribers can filter
by :name, :kind, :status, :service

HTTP API

The optional HTTP layer provides Jaeger-compatible endpoints for trace visualization, plus OTLP JSON ingest.

flowchart LR
    subgraph "Ingest"
        OTLP["/insert/opentelemetry/v1/traces
POST OTLP JSON"] end subgraph "Jaeger Query API" SVC["/select/jaeger/api/services
GET"] OPS["/select/jaeger/api/services/:svc/operations
GET"] TRS["/select/jaeger/api/traces
GET (search)"] TR["/select/jaeger/api/traces/:id
GET (single trace)"] end subgraph "Operations" FL["/api/v1/flush
GET"] BK["/api/v1/backup
POST"] HE["/health
GET"] end OTLP --> B["Buffer"] SVC & OPS & TRS & TR --> IDX["Index + Blocks"] FL --> B BK --> IDX

Telemetry Events

TimelessTraces emits telemetry events at key points in the pipeline:

flowchart TD
    FLUSH["[:timeless_traces, :flush, :stop]
duration, entry_count, byte_size"] QUERY["[:timeless_traces, :query, :stop]
duration, total, blocks_read"] COMP["[:timeless_traces, :compaction, :stop]
duration, raw_blocks, entry_count, byte_size"] MCOMP["[:timeless_traces, :merge_compaction, :stop]
duration, batches_merged, blocks_consumed"] RET["[:timeless_traces, :retention, :stop]
duration, blocks_deleted"] ERR["[:timeless_traces, :block, :error]
file_path, reason"] B["Buffer flush"] --> FLUSH Q["Query execution"] --> QUERY C["Compaction run"] --> COMP MC["Merge compaction run"] --> MCOMP RT["Retention run"] --> RET R["Block read failure"] --> ERR