Powered by AppSignal & Oban Pro

TimelessLogs Architecture

livebook/architecture.livemd

TimelessLogs Architecture

Supervision Tree

The application starts a flat supervision tree with :one_for_one strategy. Each child can restart independently without affecting the others. The HTTP server is optional and only started when configured.

graph TD
    S["TimelessLogs.Application<br/><i>Supervisor :one_for_one</i>"]
    S --> R["TimelessLogs.Registry<br/><i>Registry :duplicate</i>"]
    S --> I["TimelessLogs.Index<br/><i>GenServer</i>"]
    S --> FS["FlushSupervisor<br/><i>Task.Supervisor</i>"]
    S --> B["TimelessLogs.Buffer<br/><i>GenServer</i>"]
    S --> C["TimelessLogs.Compactor<br/><i>GenServer</i>"]
    S --> RT["TimelessLogs.Retention<br/><i>GenServer</i>"]
    S -.-> H["TimelessLogs.HTTP<br/><i>Bandit (optional)</i>"]

    style H stroke-dasharray: 5 5
  • Registry — Duplicate-key registry for real-time log subscriptions
  • Index — ETS tables for block metadata and term index, persisted via snapshots + disk log
  • FlushSupervisor — Task.Supervisor managing concurrent async flush tasks
  • Buffer — Accumulates log entries, auto-flushes on size or interval threshold
  • Compactor — Merges raw blocks into compressed blocks (zstd or OpenZL)
  • Retention — Deletes blocks by age or total size on a periodic timer
  • HTTP — VictoriaLogs-compatible REST API (only started when http config is set)

Ingestion Pipeline

Log entries flow from the Elixir Logger through the handler, into the buffer, then are flushed asynchronously to disk and indexed. Subscribers receive entries in real-time before they hit the buffer.

sequenceDiagram
    participant L as Logger
    participant H as Handler
    participant B as Buffer
    participant Sub as Subscribers
    participant FS as FlushSupervisor
    participant W as Writer
    participant D as Disk / Memory
    participant I as Index

    L->>H: Logger event
    H->>B: GenServer.cast(:log, entry)
    B->>Sub: Registry.dispatch (broadcast)
    B->>B: Accumulate in list

    Note over B: Flush triggers:<br/>buffer ≥ 1000 entries<br/>OR 1s timer expires<br/>OR manual flush()

    B->>FS: Task.Supervisor.async
    FS->>W: Write block
    W->>D: Serialize + compress → file
    W->>I: GenServer.cast(:index, block_meta)
    I->>I: ETS insert (hot cache)

    Note over I: Every 100ms:<br/>flush pending to disk log

Backpressure

The flush pipeline limits concurrent flush tasks to System.schedulers_online(). When the limit is reached, the buffer falls back to synchronous flushing, which applies natural backpressure to the Logger handler.

Storage Layer

Blocks are written to disk as individual files under data_dir/blocks/. Each block has a 12-digit numeric ID and a format extension.

flowchart TD
    subgraph "Block Formats"
        RAW[".raw<br/><i>Uncompressed ETF</i>"]
        ZST[".zst<br/><i>Zstandard compressed</i>"]
        OZL[".ozl<br/><i>OpenZL columnar</i>"]
    end

    subgraph "Serialization"
        E["Entries list"] --> ETF[":erlang.term_to_binary"]
        ETF --> RAW
        ETF --> ZSTD["ezstd.compress(level)"]
        ZSTD --> ZST

        E --> COL["Columnar split"]
        COL --> TS["Timestamps<br/>8-byte numeric"]
        COL --> LV["Levels<br/>1-byte numeric"]
        COL --> MSG["Messages<br/>length-prefixed strings"]
        COL --> MD["Metadata<br/>ETF-serialized maps"]
        TS & LV & MSG & MD --> OZL
    end

    subgraph "On-Disk Layout"
        DIR["data_dir/"]
        DIR --> BLK["blocks/<br/>000000000001.raw<br/>000000000002.zst<br/>000000000003.ozl"]
        DIR --> IDX["index.snapshot + index.log<br/><i>ETS persistence</i>"]
    end

Memory Mode

When storage: :memory is configured, block data is stored as BLOBs inside ETS tables only. No files are written to disk. Data does not survive restarts.

Indexing Strategy

The index uses ETS tables as the authoritative runtime state, with a snapshot + write-ahead log for persistence across restarts.

flowchart LR
    subgraph "ETS (Hot Cache)"
        BT["blocks<br/><i>ordered_set</i><br/>block_id → metadata"]
        TI["term_index<br/><i>bag</i><br/>term → block_id"]
    end

    subgraph "Persistence"
        SNAP["index.snapshot<br/><i>periodic ETS dump</i>"]
        LOG["index.log<br/><i>write-ahead log</i>"]
    end

    W["Writer"] -->|"cast :index"| I["Index GenServer"]
    I -->|"immediate"| BT
    I -->|"immediate"| TI
    I -->|"journal"| LOG
    I -->|"every 1000 ops"| SNAP

Term Format

Terms are stored as "key:value" strings for inverted-index lookup:

  • "level:error" — level-based filtering
  • "service:api" — indexed low-cardinality metadata
  • "path:/checkout" — indexed low-cardinality metadata

Only a curated set of stable low-cardinality metadata keys is indexed. Full metadata is still stored with every log entry, and substring search still scans message text plus metadata values at query time.

Query Path

Queries are lock-free on the read path. The caller’s process reads directly from ETS without going through a GenServer, keeping latency low.

sequenceDiagram
    participant C as Caller
    participant ETS as ETS Caches
    participant D as Disk
    participant TS as Task.async_stream

    C->>ETS: Lookup terms → block_ids
    Note over C,ETS: MapSet intersection<br/>across all filter terms

    C->>ETS: Filter blocks by ts_min/ts_max
    C->>TS: Parallel block reads
    TS->>D: Read + decompress blocks
    TS-->>C: Entries stream

    C->>C: Apply message filter
    C->>C: Apply metadata filter
    C->>C: Sort + paginate
    C-->>C: Return Result

Query Performance

Query Pattern Latency (1.1M entries)
Specific indexed metadata key 845us
Last 1h + level filter 3.4ms
Last 1 hour, all levels 3.9ms
Level filter, all time 215ms
Message substring 238ms
Last 24 hours 157ms
Full scan, no filters 2.6s

Compaction Lifecycle

Raw blocks are compacted into compressed blocks on a periodic timer. This reduces disk usage by 11-14x while keeping recent data immediately queryable.

flowchart TD
    subgraph "Triggers"
        T1["raw entries ≥ 500"]
        T2["oldest raw block ≥ 60s"]
        T3["30s timer tick"]
    end

    T1 & T2 & T3 --> CHECK["Compactor checks conditions"]

    CHECK -->|"threshold met"| READ["Read all raw blocks"]
    READ --> MERGE["Merge entries"]
    MERGE --> COMPRESS["Compress to zstd or OpenZL"]
    COMPRESS --> WRITE["Write compressed block(s)<br/><i>parallel, chunked by schedulers</i>"]
    WRITE --> REINDEX["Update Index<br/>(ETS + disk log)"]
    REINDEX --> DELETE["Delete raw block files"]
    DELETE --> STATS["Update compression stats"]

Compression Ratios

Format Ratio Throughput
Zstd (level 5) 11.1x 1.2M entries/sec
OpenZL (level 1) 11.2x 706K entries/sec
OpenZL (level 5) 11.6x 1.2M entries/sec
OpenZL (level 9) 12.8x 702K entries/sec

Merge Compaction

After initial compaction, many small compressed blocks accumulate (one per flush cycle). The Compactor runs a second merge pass that consolidates them into fewer, larger blocks for better compression ratios and reduced per-block I/O during reads. The merge pass runs automatically after every compaction timer tick and can also be triggered manually via TimelessLogs.merge_now/0.

flowchart TD
    subgraph "Trigger"
        T1["compressed blocks with<br/>entry_count < target_size"]
        T2["block count ≥ min_blocks<br/><i>default: 4</i>"]
    end

    T1 & T2 --> GROUP["Group by ts_min into batches<br/>targeting ~2000 entries each"]

    GROUP --> B1["Batch 1"]
    GROUP --> B2["Batch 2"]
    GROUP --> BN["Batch N"]

    B1 & B2 & BN --> DECOMP["Decompress all blocks in batch"]
    DECOMP --> MERGE["Merge entries"]
    MERGE --> RECOMP["Recompress as single block<br/><i>zstd or OpenZL</i>"]
    RECOMP --> REINDEX["Update Index<br/>(ETS + disk log)"]
    REINDEX --> DELETE["Delete old block files"]
    DELETE --> TEL["Emit telemetry:<br/>[:timeless_logs, :merge_compaction, :stop]"]
Configuration Default Description
merge_compaction_target_size 2000 Target entries per merged block
merge_compaction_min_blocks 4 Min small blocks before merge triggers

Retention Policy

Retention runs on a 5-minute timer and enforces two independent limits: age-based and size-based. Blocks are deleted oldest-first.

flowchart TD
    TICK["5-minute timer tick"] --> AGE["Age check:<br/>delete blocks where<br/>ts_max < now - max_age"]
    TICK --> SIZE["Size check:<br/>delete oldest blocks<br/>until total ≤ max_bytes"]

    AGE --> DEL["Delete block files"]
    SIZE --> DEL
    DEL --> IDX["Remove from Index<br/>(ETS + disk log)"]
    IDX --> TEL["Emit telemetry:<br/>[:timeless_logs, :retention, :stop]"]

Defaults: 7-day max age, 512 MB max size.

Real-Time Subscriptions

Subscribers receive log entries in real-time via the Registry. Entries are broadcast before they enter the buffer, so subscribers see them with minimal latency.

sequenceDiagram
    participant H as Handler
    participant B as Buffer
    participant R as Registry
    participant S1 as Subscriber 1
    participant S2 as Subscriber 2

    H->>B: cast(:log, entry)
    B->>R: Registry.dispatch
    R->>S1: {:timeless_logs, :entry, entry}
    R->>S2: {:timeless_logs, :entry, entry}
    B->>B: Accumulate in buffer

    Note over S1,S2: Subscribers can filter<br/>by :level and :metadata

HTTP API

The optional HTTP layer provides a VictoriaLogs-compatible interface for ingestion and querying from external tools.

flowchart LR
    subgraph "Ingest"
        POST["/insert/jsonline<br/>POST NDJSON"]
    end

    subgraph "Query"
        Q["/select/logsql/query<br/>GET → NDJSON"]
        ST["/select/logsql/stats<br/>GET → JSON"]
    end

    subgraph "Operations"
        FL["/api/v1/flush<br/>GET → JSON"]
        BK["/api/v1/backup<br/>POST → JSON"]
        HE["/health<br/>GET → JSON"]
    end

    POST --> B["Buffer"]
    Q --> IDX["Index + Blocks"]
    ST --> IDX
    FL --> B
    BK --> IDX

Telemetry Events

TimelessLogs emits telemetry events at key points in the pipeline for monitoring and alerting integration.

flowchart TD
    FLUSH["[:timeless_logs, :flush, :stop]<br/><i>duration, entry_count, byte_size</i>"]
    QUERY["[:timeless_logs, :query, :stop]<br/><i>duration, total, blocks_read</i>"]
    RET["[:timeless_logs, :retention, :stop]<br/><i>duration, blocks_deleted</i>"]
    COMP["[:timeless_logs, :compaction, :stop]<br/><i>duration, raw_blocks, entry_count, byte_size</i>"]
    MCOMP["[:timeless_logs, :merge_compaction, :stop]<br/><i>duration, batches_merged, blocks_consumed</i>"]
    ERR["[:timeless_logs, :block, :error]<br/><i>file_path, reason</i>"]

    B["Buffer flush"] --> FLUSH
    Q["Query execution"] --> QUERY
    RT["Retention run"] --> RET
    C["Compaction run"] --> COMP
    MC["Merge compaction run"] --> MCOMP
    R["Block read failure"] --> ERR