Powered by AppSignal & Oban Pro

TimelessLogs User's Guide

livebook/users_guide.livemd

TimelessLogs User’s Guide

Setup

Mix.install([
  {:timeless_logs, github: "awksedgreep/timeless_logs"}
])

Start the application (this boots the full supervision tree — buffer, index, compactor, retention):

Application.ensure_all_started(:timeless_logs)

Seed some log data so the query examples below have something to return:

require Logger

for i <- 1..100 do
  level = Enum.random([:debug, :info, :warning, :error])
  service = Enum.random(["api", "payments", "auth", "web"])
  host = Enum.random(["web-1", "web-2", "worker-1"])

  Logger.log(level, "Request #{i} processed",
    service: service,
    host: host,
    request_id: "req-#{i}",
    duration_ms: :rand.uniform(500)
  )
end

# Give the buffer time to flush
Process.sleep(2_000)
TimelessLogs.flush()

Getting Started

TimelessLogs is an embedded log compression and indexing library for Elixir. It hooks into the standard Logger, compresses logs with zstd or OpenZL (~11-14x compression), indexes them in SQLite, and provides querying with sub-millisecond latency. Zero external infrastructure required.

Add the Dependency

# mix.exs
defp deps do
  [
    {:timeless_logs, "~> 0.10"}
  ]
end

Configuration

TimelessLogs is configured via application config. All options have sensible defaults — only data_dir is typically set explicitly:

# config/config.exs
config :timeless_logs,
  data_dir: "priv/log_stream"

Configuration Options

Option Default Description
:data_dir "priv/log_stream" Directory for block files and SQLite index
:storage :disk :disk or :memory (memory stores blocks as SQLite BLOBs)
:flush_interval 1_000 Buffer flush interval (ms)
:max_buffer_size 1_000 Entries before auto-flush
:query_timeout 30_000 Query operation timeout (ms)
:retention_max_age 604_800 Delete logs older than this (seconds, default 7 days)
:retention_max_size 536_870_912 Max total block size (bytes, default 512 MB)
:retention_check_interval 300_000 Retention check interval (ms, default 5 min)
:compaction_threshold 500 Min raw entries to trigger compaction
:compaction_interval 30_000 Compaction check interval (ms)
:compaction_max_raw_age 60 Force compact raw blocks older than this (seconds)
:compaction_format :openzl Compression format: :zstd or :openzl
:zstd_compression_level 5 Zstd compression level (1-22)
:openzl_compression_level 9 OpenZL compression level (1-22)
:index_publish_interval 2_000 ETS → SQLite batch flush interval (ms)
:http false true, or [port: 9428, bearer_token: "secret"]

Full Configuration Example

# config/config.exs
config :timeless_logs,
  data_dir: "/var/data/logs",
  storage: :disk,
  flush_interval: 1_000,
  max_buffer_size: 1_000,
  retention_max_age: 30 * 86_400,
  retention_max_size: 2 * 1_073_741_824,
  compaction_format: :openzl,
  openzl_compression_level: 9,
  http: [port: 9428, bearer_token: "my-secret-token"]

Enable the HTTP API

Set the :http config option to start the VictoriaLogs-compatible HTTP server:

# Defaults to port 9428, no auth
config :timeless_logs, http: true

# Custom port + bearer token auth
config :timeless_logs, http: [port: 9500, bearer_token: "secret"]

Writing Logs (Elixir)

TimelessLogs installs a Logger handler automatically on startup. Any standard Logger call is captured, indexed, and compressed:

require Logger

Logger.info("Request completed", request_id: "abc123", duration_ms: 42)
Logger.warning("Slow query detected", query: "SELECT *", duration_ms: 1500)
Logger.error("Connection timeout", service: "payments", host: "pay-1")

All metadata key/value pairs are indexed and queryable. There is no need to call any TimelessLogs function directly for ingestion — just use Logger.

Flush the Buffer

Logs are buffered and flushed automatically every second or when the buffer reaches 1000 entries. To force an immediate flush:

TimelessLogs.flush()

Writing Logs (HTTP)

The HTTP ingest endpoint accepts NDJSON (newline-delimited JSON), compatible with VictoriaLogs:

curl -X POST "http://localhost:9428/insert/jsonline?_msg_field=_msg&_time_field=_time" \
  --data-binary '
{"_msg":"Request completed","_time":"2024-01-15T10:30:00Z","level":"info","request_id":"abc123"}
{"_msg":"Connection timeout","_time":"2024-01-15T10:30:05Z","level":"error","service":"payments"}
'

Query parameters control which JSON fields map to the message and timestamp:

Parameter Default Description
_msg_field "_msg" JSON field containing the log message
_time_field "_time" JSON field containing the timestamp

All other JSON fields become searchable metadata. Returns 204 No Content on success.

Querying Logs (Elixir)

Basic Query

query/1 returns matching log entries with pagination:

{:ok, result} = TimelessLogs.query(level: :error, limit: 10)

The result is a %TimelessLogs.Result{} struct:

%TimelessLogs.Result{
  entries: [%TimelessLogs.Entry{...}, ...],
  total: 42,
  limit: 10,
  offset: 0
}

Query Filters

All filters are optional and can be combined:

# By level
{:ok, result} = TimelessLogs.query(level: :error)

# By message substring (case-insensitive)
{:ok, result} = TimelessLogs.query(message: "timeout")

# By time range
{:ok, result} = TimelessLogs.query(
  since: DateTime.utc_now() |> DateTime.add(-3600, :second),
  until: DateTime.utc_now()
)

# By metadata key/value
{:ok, result} = TimelessLogs.query(metadata: %{"service" => "payments"})

# Combined filters with pagination
{:ok, result} = TimelessLogs.query(
  level: :error,
  message: "timeout",
  since: DateTime.utc_now() |> DateTime.add(-86_400, :second),
  metadata: %{"service" => "payments"},
  limit: 50,
  offset: 0,
  order: :desc
)

Filter Reference

Filter Type Description
:level atom :debug, :info, :warning, :error
:message string Case-insensitive substring match
:since DateTime or unix seconds Lower time bound
:until DateTime or unix seconds Upper time bound
:metadata map Exact key/value matches
:limit integer Max entries returned (default 100)
:offset integer Skip N entries (default 0)
:order atom :asc (oldest first) or :desc (newest first, default)

Working with Entries

Each entry is a %TimelessLogs.Entry{} struct:

{:ok, result} = TimelessLogs.query(level: :error, limit: 5)

for entry <- result.entries do
  IO.puts("#{entry.timestamp} [#{entry.level}] #{entry.message}")
  IO.inspect(entry.metadata, label: "  metadata")
end

Streaming

stream/1 returns a lazy Stream that decompresses blocks on demand. Use this for large result sets to avoid loading everything into memory:

TimelessLogs.stream(level: :error, since: DateTime.utc_now() |> DateTime.add(-86_400, :second))
|> Stream.filter(fn entry -> String.contains?(entry.message, "timeout") end)
|> Enum.take(100)

The stream accepts all query filters except :limit, :offset, and :order. Use Enum.take/2 and Stream.drop/2 for pagination instead.

Querying Logs (HTTP)

Query Endpoint

curl "http://localhost:9428/select/logsql/query?\
level=error&\
message=timeout&\
start=2024-01-15T00:00:00Z&\
end=2024-01-16T00:00:00Z&\
limit=50&\
order=desc"

Response is NDJSON (one JSON object per line):

{"_time":"2024-01-15T10:30:05Z","_msg":"Connection timeout","level":"error","service":"payments"}
{"_time":"2024-01-15T09:15:22Z","_msg":"Read timeout","level":"error","service":"api"}

Query Parameters

Parameter Description
level Filter by level
message Substring search
start Lower time bound (ISO8601 or unix seconds)
end Upper time bound (ISO8601 or unix seconds)
limit Max entries
offset Skip N entries
order "asc" or "desc"

Real-Time Subscriptions

Subscribe to receive log entries as they arrive, before they are buffered and flushed to disk:

{:ok, _pid} = TimelessLogs.subscribe(level: :error)

# Generate some errors so we see them come through
Task.start(fn ->
  Process.sleep(500)
  require Logger
  Logger.error("Disk full on /data", host: "worker-1")
  Logger.error("Connection refused", service: "payments", host: "pay-1")
end)

# Collect the entries that arrive
for _ <- 1..2 do
  receive do
    {:timeless_logs, :entry, entry} ->
      IO.puts("[#{entry.level}] #{entry.message} #{inspect(entry.metadata)}")
  after
    3_000 -> IO.puts("(no more entries)")
  end
end

Filter subscriptions by level or metadata:

# Only entries with specific metadata
{:ok, _pid} = TimelessLogs.subscribe(metadata: %{"service" => "payments"})

Task.start(fn ->
  Process.sleep(500)
  require Logger
  Logger.error("Payment failed", service: "payments", order_id: "ord-99")
  Logger.info("Health check ok", service: "api")
end)

receive do
  {:timeless_logs, :entry, entry} ->
    IO.puts("Got: [#{entry.level}] #{entry.message}")
after
  3_000 -> IO.puts("(no entry)")
end

Unsubscribe when done:

TimelessLogs.unsubscribe()

Statistics

Elixir API

{:ok, stats} = TimelessLogs.stats()

Returns a %TimelessLogs.Stats{} struct with fields:

Field Description
total_blocks Total block count
total_entries Total log entries stored
total_bytes Total block data bytes
disk_size On-disk storage size
index_size SQLite index size
oldest_timestamp Oldest entry timestamp (microseconds)
newest_timestamp Newest entry timestamp (microseconds)
raw_blocks / raw_bytes / raw_entries Uncompressed block stats
zstd_blocks / zstd_bytes / zstd_entries Zstd-compressed block stats
openzl_blocks / openzl_bytes / openzl_entries OpenZL-compressed block stats
compression_raw_bytes_in Total bytes before compression
compression_compressed_bytes_out Total bytes after compression
compaction_count Number of compaction runs

HTTP API

curl "http://localhost:9428/select/logsql/stats"
{
  "total_blocks": 48,
  "total_entries": 125000,
  "total_bytes": 24000000,
  "disk_size": 24000000,
  "index_size": 3200000,
  "oldest_timestamp": 1700000000000000,
  "newest_timestamp": 1700086400000000,
  "raw_blocks": 2,
  "raw_bytes": 50000,
  "zstd_blocks": 46,
  "zstd_bytes": 23950000,
  "openzl_blocks": 0,
  "openzl_bytes": 0
}

Operations

Flush

Force all buffered entries to disk immediately:

TimelessLogs.flush()

Via HTTP:

curl "http://localhost:9428/api/v1/flush"
{"status": "ok"}

Backup

Create a consistent online backup without stopping the application. Uses SQLite VACUUM INTO for an atomic index snapshot and copies block files in parallel:

{:ok, result} = TimelessLogs.backup("/tmp/logs_backup")
# result => %{path: "/tmp/logs_backup", files: ["index.db", ...], total_bytes: 24000000}

Via HTTP:

curl -X POST http://localhost:9428/api/v1/backup \
  -H "Content-Type: application/json" \
  -d '{"path": "/tmp/logs_backup"}'
{"status":"ok","path":"/tmp/logs_backup","files":["index.db","blocks"],"total_bytes":24000000}

To download the backup, archive it from the server filesystem:

tar czf logs_backup.tar.gz -C /tmp/logs_backup .

To restore, stop the application, replace the data_dir contents with the backup files, and restart.

Health Endpoint

Always accessible without authentication:

curl "http://localhost:9428/health"
{"status":"ok","blocks":48,"entries":125000,"disk_size":24000000}

Authentication

All endpoints except /health support optional Bearer token authentication when configured:

# Via header
curl -H "Authorization: Bearer my-secret-token" \
  "http://localhost:9428/select/logsql/query?level=error"

# Via query parameter
curl "http://localhost:9428/select/logsql/query?level=error&token=my-secret-token"

Telemetry

TimelessLogs emits telemetry events for monitoring integration:

Event Measurements Metadata
[:timeless_logs, :flush, :stop] duration, entry_count, byte_size block_id
[:timeless_logs, :query, :stop] duration, total, blocks_read filters
[:timeless_logs, :retention, :stop] duration, blocks_deleted
[:timeless_logs, :compaction, :stop] duration, raw_blocks, entry_count, byte_size
[:timeless_logs, :block, :error] file_path, reason

Attach a handler to monitor query performance:

:telemetry.attach("log-query-monitor", [:timeless_logs, :query, :stop], fn _event, measurements, metadata, _config ->
  Logger.info("Query took #{measurements.duration}ns, scanned #{measurements.blocks_read} blocks, found #{measurements.total} entries")
end, nil)