Powered by AppSignal & Oban Pro

TimelessLogs Architecture

livebook/architecture.livemd

TimelessLogs Architecture

Supervision Tree

The application starts a flat supervision tree with :one_for_one strategy. Each child can restart independently without affecting the others. The HTTP server is optional and only started when configured.

graph TD
    S["TimelessLogs.Application
Supervisor :one_for_one"] S --> R["TimelessLogs.Registry
Registry :duplicate"] S --> I["TimelessLogs.Index
GenServer"] S --> FS["FlushSupervisor
Task.Supervisor"] S --> B["TimelessLogs.Buffer
GenServer"] S --> C["TimelessLogs.Compactor
GenServer"] S --> RT["TimelessLogs.Retention
GenServer"] S -.-> H["TimelessLogs.HTTP
Bandit (optional)"] style H stroke-dasharray: 5 5
  • Registry — Duplicate-key registry for real-time log subscriptions
  • Index — SQLite persistence + ETS caching for block metadata and term index
  • FlushSupervisor — Task.Supervisor managing concurrent async flush tasks
  • Buffer — Accumulates log entries, auto-flushes on size or interval threshold
  • Compactor — Merges raw blocks into compressed blocks (zstd or OpenZL)
  • Retention — Deletes blocks by age or total size on a periodic timer
  • HTTP — VictoriaLogs-compatible REST API (only started when http config is set)

Ingestion Pipeline

Log entries flow from the Elixir Logger through the handler, into the buffer, then are flushed asynchronously to disk and indexed. Subscribers receive entries in real-time before they hit the buffer.

sequenceDiagram
    participant L as Logger
    participant H as Handler
    participant B as Buffer
    participant Sub as Subscribers
    participant FS as FlushSupervisor
    participant W as Writer
    participant D as Disk / Memory
    participant I as Index

    L->>H: Logger event
    H->>B: GenServer.cast(:log, entry)
    B->>Sub: Registry.dispatch (broadcast)
    B->>B: Accumulate in list

    Note over B: Flush triggers:
buffer ≥ 1000 entries
OR 1s timer expires
OR manual flush() B->>FS: Task.Supervisor.async FS->>W: Write block W->>D: Serialize + compress → file W->>I: GenServer.cast(:index, block_meta) I->>I: ETS insert (hot cache) Note over I: Every 100-500ms:
ETS → SQLite batch persist

Backpressure

The flush pipeline limits concurrent flush tasks to System.schedulers_online(). When the limit is reached, the buffer falls back to synchronous flushing, which applies natural backpressure to the Logger handler.

Storage Layer

Blocks are written to disk as individual files under data_dir/blocks/. Each block has a 12-digit numeric ID and a format extension.

flowchart TD
    subgraph "Block Formats"
        RAW[".raw
Uncompressed ETF"] ZST[".zst
Zstandard compressed"] OZL[".ozl
OpenZL columnar"] end subgraph "Serialization" E["Entries list"] --> ETF[":erlang.term_to_binary"] ETF --> RAW ETF --> ZSTD["ezstd.compress(level)"] ZSTD --> ZST E --> COL["Columnar split"] COL --> TS["Timestamps
8-byte numeric"] COL --> LV["Levels
1-byte numeric"] COL --> MSG["Messages
length-prefixed strings"] COL --> MD["Metadata
ETF-serialized maps"] TS & LV & MSG & MD --> OZL end subgraph "On-Disk Layout" DIR["data_dir/"] DIR --> BLK["blocks/
000000000001.raw
000000000002.zst
000000000003.ozl"] DIR --> IDX["index.db
SQLite"] end

Memory Mode

When storage: :memory is configured, block data is stored as BLOBs inside SQLite instead of individual files. The index database and ETS caches still operate identically.

Indexing Strategy

The index maintains two layers: a hot ETS cache for fast reads and a SQLite database for persistence across restarts.

flowchart LR
    subgraph "ETS (Hot Cache)"
        BT["blocks
ordered_set
block_id → metadata"] TI["term_index
bag
term → block_id"] end subgraph "SQLite (Persistent)" BM["blocks table
id, file_path, format,
byte_size, entry_count,
ts_min, ts_max"] BTM["block_terms table
block_id, term"] end W["Writer"] -->|"cast :index"| I["Index GenServer"] I -->|"immediate"| BT I -->|"immediate"| TI I -->|"batched 100-500ms"| BM I -->|"batched 100-500ms"| BTM

Term Format

Terms are stored as "key:value" strings for inverted-index lookup:

  • "level:error" — level-based filtering
  • "request_id:abc123" — metadata key/value pair
  • "service:api" — any metadata field

Query Path

Queries are lock-free on the read path. The caller’s process reads directly from ETS without going through a GenServer, keeping latency low.

sequenceDiagram
    participant C as Caller
    participant ETS as ETS Caches
    participant D as Disk
    participant TS as Task.async_stream

    C->>ETS: Lookup terms → block_ids
    Note over C,ETS: MapSet intersection
across all filter terms C->>ETS: Filter blocks by ts_min/ts_max C->>TS: Parallel block reads TS->>D: Read + decompress blocks TS-->>C: Entries stream C->>C: Apply message filter C->>C: Apply metadata filter C->>C: Sort + paginate C-->>C: Return Result

Query Performance

Query Pattern Latency (1.1M entries)
Specific metadata key 0.6ms
Last 1h + level filter 2.4ms
Last 1 hour, all levels 4.4ms
Level filter, all time 226ms
Message substring 420ms
Last 24 hours 244ms
Full scan, no filters 1.4s

Compaction Lifecycle

Raw blocks are compacted into compressed blocks on a periodic timer. This reduces disk usage by 11-14x while keeping recent data immediately queryable.

flowchart TD
    subgraph "Triggers"
        T1["raw entries ≥ 500"]
        T2["oldest raw block ≥ 60s"]
        T3["30s timer tick"]
    end

    T1 & T2 & T3 --> CHECK["Compactor checks conditions"]

    CHECK -->|"threshold met"| READ["Read all raw blocks"]
    READ --> MERGE["Merge entries"]
    MERGE --> COMPRESS["Compress to zstd or OpenZL"]
    COMPRESS --> WRITE["Write compressed block(s)
parallel, chunked by schedulers"] WRITE --> REINDEX["Update Index
(ETS + SQLite)"] REINDEX --> DELETE["Delete raw block files"] DELETE --> STATS["Update compression stats"]

Compression Ratios

Format Ratio Throughput
Zstd (level 3) 11.2x 500K entries/sec
OpenZL (level 1) 10.9x 1.7M entries/sec
OpenZL (level 5) 11.4x 1.2M entries/sec
OpenZL (level 9) 12.5x 763K entries/sec

Merge Compaction

After initial compaction, many small compressed blocks accumulate (one per flush cycle). The Compactor runs a second merge pass that consolidates them into fewer, larger blocks for better compression ratios and reduced per-block I/O during reads. The merge pass runs automatically after every compaction timer tick and can also be triggered manually via TimelessLogs.merge_now/0.

flowchart TD
    subgraph "Trigger"
        T1["compressed blocks with
entry_count < target_size"] T2["block count ≥ min_blocks
default: 4"] end T1 & T2 --> GROUP["Group by ts_min into batches
targeting ~2000 entries each"] GROUP --> B1["Batch 1"] GROUP --> B2["Batch 2"] GROUP --> BN["Batch N"] B1 & B2 & BN --> DECOMP["Decompress all blocks in batch"] DECOMP --> MERGE["Merge entries"] MERGE --> RECOMP["Recompress as single block
zstd or OpenZL"] RECOMP --> REINDEX["Update Index
(ETS + SQLite)"] REINDEX --> DELETE["Delete old block files"] DELETE --> TEL["Emit telemetry:
[:timeless_logs, :merge_compaction, :stop]"]
Configuration Default Description
merge_compaction_target_size 2000 Target entries per merged block
merge_compaction_min_blocks 4 Min small blocks before merge triggers

Retention Policy

Retention runs on a 5-minute timer and enforces two independent limits: age-based and size-based. Blocks are deleted oldest-first.

flowchart TD
    TICK["5-minute timer tick"] --> AGE["Age check:
delete blocks where
ts_max < now - max_age"] TICK --> SIZE["Size check:
delete oldest blocks
until total ≤ max_bytes"] AGE --> DEL["Delete block files"] SIZE --> DEL DEL --> IDX["Remove from Index
(ETS + SQLite)"] IDX --> TEL["Emit telemetry:
[:timeless_logs, :retention, :stop]"]

Defaults: 7-day max age, 512 MB max size.

Real-Time Subscriptions

Subscribers receive log entries in real-time via the Registry. Entries are broadcast before they enter the buffer, so subscribers see them with minimal latency.

sequenceDiagram
    participant H as Handler
    participant B as Buffer
    participant R as Registry
    participant S1 as Subscriber 1
    participant S2 as Subscriber 2

    H->>B: cast(:log, entry)
    B->>R: Registry.dispatch
    R->>S1: {:timeless_logs, :entry, entry}
    R->>S2: {:timeless_logs, :entry, entry}
    B->>B: Accumulate in buffer

    Note over S1,S2: Subscribers can filter
by :level and :metadata

HTTP API

The optional HTTP layer provides a VictoriaLogs-compatible interface for ingestion and querying from external tools.

flowchart LR
    subgraph "Ingest"
        POST["/insert/jsonline
POST NDJSON"] end subgraph "Query" Q["/select/logsql/query
GET → NDJSON"] ST["/select/logsql/stats
GET → JSON"] end subgraph "Operations" FL["/api/v1/flush
GET → JSON"] BK["/api/v1/backup
POST → JSON"] HE["/health
GET → JSON"] end POST --> B["Buffer"] Q --> IDX["Index + Blocks"] ST --> IDX FL --> B BK --> IDX

Telemetry Events

TimelessLogs emits telemetry events at key points in the pipeline for monitoring and alerting integration.

flowchart TD
    FLUSH["[:timeless_logs, :flush, :stop]
duration, entry_count, byte_size"] QUERY["[:timeless_logs, :query, :stop]
duration, total, blocks_read"] RET["[:timeless_logs, :retention, :stop]
duration, blocks_deleted"] COMP["[:timeless_logs, :compaction, :stop]
duration, raw_blocks, entry_count, byte_size"] MCOMP["[:timeless_logs, :merge_compaction, :stop]
duration, batches_merged, blocks_consumed"] ERR["[:timeless_logs, :block, :error]
file_path, reason"] B["Buffer flush"] --> FLUSH Q["Query execution"] --> QUERY RT["Retention run"] --> RET C["Compaction run"] --> COMP MC["Merge compaction run"] --> MCOMP R["Block read failure"] --> ERR