TimelessLogs User’s Guide
Setup
Mix.install([
{:timeless_logs, github: "awksedgreep/timeless_logs"}
])
Start the application (this boots the full supervision tree — buffer, index, compactor, retention):
Application.ensure_all_started(:timeless_logs)
Seed some log data so the query examples below have something to return:
require Logger
for i <- 1..100 do
level = Enum.random([:debug, :info, :warning, :error])
service = Enum.random(["api", "payments", "auth", "web"])
host = Enum.random(["web-1", "web-2", "worker-1"])
Logger.log(level, "Request #{i} processed",
service: service,
host: host,
request_id: "req-#{i}",
duration_ms: :rand.uniform(500)
)
end
# Give the buffer time to flush
Process.sleep(2_000)
TimelessLogs.flush()
Getting Started
TimelessLogs is an embedded log compression and indexing library for Elixir.
It hooks into the standard Logger, compresses logs with zstd or OpenZL
(~11-14x compression), indexes them in SQLite, and provides querying with
sub-millisecond latency. Zero external infrastructure required.
Add the Dependency
# mix.exs
defp deps do
[
{:timeless_logs, "~> 0.10"}
]
end
Configuration
TimelessLogs is configured via application config. All options have sensible
defaults — only data_dir is typically set explicitly:
# config/config.exs
config :timeless_logs,
data_dir: "priv/log_stream"
Configuration Options
| Option | Default | Description |
|---|---|---|
:data_dir |
"priv/log_stream" |
Directory for block files and SQLite index |
:storage |
:disk |
:disk or :memory (memory stores blocks as SQLite BLOBs) |
:flush_interval |
1_000 |
Buffer flush interval (ms) |
:max_buffer_size |
1_000 |
Entries before auto-flush |
:query_timeout |
30_000 |
Query operation timeout (ms) |
:retention_max_age |
604_800 |
Delete logs older than this (seconds, default 7 days) |
:retention_max_size |
536_870_912 |
Max total block size (bytes, default 512 MB) |
:retention_check_interval |
300_000 |
Retention check interval (ms, default 5 min) |
:compaction_threshold |
500 |
Min raw entries to trigger compaction |
:compaction_interval |
30_000 |
Compaction check interval (ms) |
:compaction_max_raw_age |
60 |
Force compact raw blocks older than this (seconds) |
:compaction_format |
:openzl |
Compression format: :zstd or :openzl |
:zstd_compression_level |
5 |
Zstd compression level (1-22) |
:openzl_compression_level |
9 |
OpenZL compression level (1-22) |
:index_publish_interval |
2_000 |
ETS → SQLite batch flush interval (ms) |
:http |
false |
true, or [port: 9428, bearer_token: "secret"] |
Full Configuration Example
# config/config.exs
config :timeless_logs,
data_dir: "/var/data/logs",
storage: :disk,
flush_interval: 1_000,
max_buffer_size: 1_000,
retention_max_age: 30 * 86_400,
retention_max_size: 2 * 1_073_741_824,
compaction_format: :openzl,
openzl_compression_level: 9,
http: [port: 9428, bearer_token: "my-secret-token"]
Enable the HTTP API
Set the :http config option to start the VictoriaLogs-compatible HTTP server:
# Defaults to port 9428, no auth
config :timeless_logs, http: true
# Custom port + bearer token auth
config :timeless_logs, http: [port: 9500, bearer_token: "secret"]
Writing Logs (Elixir)
TimelessLogs installs a Logger handler automatically on startup. Any standard
Logger call is captured, indexed, and compressed:
require Logger
Logger.info("Request completed", request_id: "abc123", duration_ms: 42)
Logger.warning("Slow query detected", query: "SELECT *", duration_ms: 1500)
Logger.error("Connection timeout", service: "payments", host: "pay-1")
All metadata key/value pairs are indexed and queryable. There is no need to
call any TimelessLogs function directly for ingestion — just use Logger.
Flush the Buffer
Logs are buffered and flushed automatically every second or when the buffer reaches 1000 entries. To force an immediate flush:
TimelessLogs.flush()
Writing Logs (HTTP)
The HTTP ingest endpoint accepts NDJSON (newline-delimited JSON), compatible with VictoriaLogs:
curl -X POST "http://localhost:9428/insert/jsonline?_msg_field=_msg&_time_field=_time" \
--data-binary '
{"_msg":"Request completed","_time":"2024-01-15T10:30:00Z","level":"info","request_id":"abc123"}
{"_msg":"Connection timeout","_time":"2024-01-15T10:30:05Z","level":"error","service":"payments"}
'
Query parameters control which JSON fields map to the message and timestamp:
| Parameter | Default | Description |
|---|---|---|
_msg_field |
"_msg" |
JSON field containing the log message |
_time_field |
"_time" |
JSON field containing the timestamp |
All other JSON fields become searchable metadata. Returns 204 No Content on
success.
Querying Logs (Elixir)
Basic Query
query/1 returns matching log entries with pagination:
{:ok, result} = TimelessLogs.query(level: :error, limit: 10)
The result is a %TimelessLogs.Result{} struct:
%TimelessLogs.Result{
entries: [%TimelessLogs.Entry{...}, ...],
total: 42,
limit: 10,
offset: 0
}
Query Filters
All filters are optional and can be combined:
# By level
{:ok, result} = TimelessLogs.query(level: :error)
# By message substring (case-insensitive)
{:ok, result} = TimelessLogs.query(message: "timeout")
# By time range
{:ok, result} = TimelessLogs.query(
since: DateTime.utc_now() |> DateTime.add(-3600, :second),
until: DateTime.utc_now()
)
# By metadata key/value
{:ok, result} = TimelessLogs.query(metadata: %{"service" => "payments"})
# Combined filters with pagination
{:ok, result} = TimelessLogs.query(
level: :error,
message: "timeout",
since: DateTime.utc_now() |> DateTime.add(-86_400, :second),
metadata: %{"service" => "payments"},
limit: 50,
offset: 0,
order: :desc
)
Filter Reference
| Filter | Type | Description |
|---|---|---|
:level |
atom |
:debug, :info, :warning, :error |
:message |
string | Case-insensitive substring match |
:since |
DateTime or unix seconds | Lower time bound |
:until |
DateTime or unix seconds | Upper time bound |
:metadata |
map | Exact key/value matches |
:limit |
integer | Max entries returned (default 100) |
:offset |
integer | Skip N entries (default 0) |
:order |
atom |
:asc (oldest first) or :desc (newest first, default) |
Working with Entries
Each entry is a %TimelessLogs.Entry{} struct:
{:ok, result} = TimelessLogs.query(level: :error, limit: 5)
for entry <- result.entries do
IO.puts("#{entry.timestamp} [#{entry.level}] #{entry.message}")
IO.inspect(entry.metadata, label: " metadata")
end
Streaming
stream/1 returns a lazy Stream that decompresses blocks on demand. Use
this for large result sets to avoid loading everything into memory:
TimelessLogs.stream(level: :error, since: DateTime.utc_now() |> DateTime.add(-86_400, :second))
|> Stream.filter(fn entry -> String.contains?(entry.message, "timeout") end)
|> Enum.take(100)
The stream accepts all query filters except :limit, :offset, and :order.
Use Enum.take/2 and Stream.drop/2 for pagination instead.
Querying Logs (HTTP)
Query Endpoint
curl "http://localhost:9428/select/logsql/query?\
level=error&\
message=timeout&\
start=2024-01-15T00:00:00Z&\
end=2024-01-16T00:00:00Z&\
limit=50&\
order=desc"
Response is NDJSON (one JSON object per line):
{"_time":"2024-01-15T10:30:05Z","_msg":"Connection timeout","level":"error","service":"payments"}
{"_time":"2024-01-15T09:15:22Z","_msg":"Read timeout","level":"error","service":"api"}
Query Parameters
| Parameter | Description |
|---|---|
level |
Filter by level |
message |
Substring search |
start |
Lower time bound (ISO8601 or unix seconds) |
end |
Upper time bound (ISO8601 or unix seconds) |
limit |
Max entries |
offset |
Skip N entries |
order |
"asc" or "desc" |
Real-Time Subscriptions
Subscribe to receive log entries as they arrive, before they are buffered and flushed to disk:
{:ok, _pid} = TimelessLogs.subscribe(level: :error)
# Generate some errors so we see them come through
Task.start(fn ->
Process.sleep(500)
require Logger
Logger.error("Disk full on /data", host: "worker-1")
Logger.error("Connection refused", service: "payments", host: "pay-1")
end)
# Collect the entries that arrive
for _ <- 1..2 do
receive do
{:timeless_logs, :entry, entry} ->
IO.puts("[#{entry.level}] #{entry.message} #{inspect(entry.metadata)}")
after
3_000 -> IO.puts("(no more entries)")
end
end
Filter subscriptions by level or metadata:
# Only entries with specific metadata
{:ok, _pid} = TimelessLogs.subscribe(metadata: %{"service" => "payments"})
Task.start(fn ->
Process.sleep(500)
require Logger
Logger.error("Payment failed", service: "payments", order_id: "ord-99")
Logger.info("Health check ok", service: "api")
end)
receive do
{:timeless_logs, :entry, entry} ->
IO.puts("Got: [#{entry.level}] #{entry.message}")
after
3_000 -> IO.puts("(no entry)")
end
Unsubscribe when done:
TimelessLogs.unsubscribe()
Statistics
Elixir API
{:ok, stats} = TimelessLogs.stats()
Returns a %TimelessLogs.Stats{} struct with fields:
| Field | Description |
|---|---|
total_blocks |
Total block count |
total_entries |
Total log entries stored |
total_bytes |
Total block data bytes |
disk_size |
On-disk storage size |
index_size |
SQLite index size |
oldest_timestamp |
Oldest entry timestamp (microseconds) |
newest_timestamp |
Newest entry timestamp (microseconds) |
raw_blocks / raw_bytes / raw_entries |
Uncompressed block stats |
zstd_blocks / zstd_bytes / zstd_entries |
Zstd-compressed block stats |
openzl_blocks / openzl_bytes / openzl_entries |
OpenZL-compressed block stats |
compression_raw_bytes_in |
Total bytes before compression |
compression_compressed_bytes_out |
Total bytes after compression |
compaction_count |
Number of compaction runs |
HTTP API
curl "http://localhost:9428/select/logsql/stats"
{
"total_blocks": 48,
"total_entries": 125000,
"total_bytes": 24000000,
"disk_size": 24000000,
"index_size": 3200000,
"oldest_timestamp": 1700000000000000,
"newest_timestamp": 1700086400000000,
"raw_blocks": 2,
"raw_bytes": 50000,
"zstd_blocks": 46,
"zstd_bytes": 23950000,
"openzl_blocks": 0,
"openzl_bytes": 0
}
Operations
Flush
Force all buffered entries to disk immediately:
TimelessLogs.flush()
Via HTTP:
curl "http://localhost:9428/api/v1/flush"
{"status": "ok"}
Backup
Create a consistent online backup without stopping the application. Uses
SQLite VACUUM INTO for an atomic index snapshot and copies block files in
parallel:
{:ok, result} = TimelessLogs.backup("/tmp/logs_backup")
# result => %{path: "/tmp/logs_backup", files: ["index.db", ...], total_bytes: 24000000}
Via HTTP:
curl -X POST http://localhost:9428/api/v1/backup \
-H "Content-Type: application/json" \
-d '{"path": "/tmp/logs_backup"}'
{"status":"ok","path":"/tmp/logs_backup","files":["index.db","blocks"],"total_bytes":24000000}
To download the backup, archive it from the server filesystem:
tar czf logs_backup.tar.gz -C /tmp/logs_backup .
To restore, stop the application, replace the data_dir contents with the
backup files, and restart.
Health Endpoint
Always accessible without authentication:
curl "http://localhost:9428/health"
{"status":"ok","blocks":48,"entries":125000,"disk_size":24000000}
Authentication
All endpoints except /health support optional Bearer token authentication
when configured:
# Via header
curl -H "Authorization: Bearer my-secret-token" \
"http://localhost:9428/select/logsql/query?level=error"
# Via query parameter
curl "http://localhost:9428/select/logsql/query?level=error&token=my-secret-token"
Telemetry
TimelessLogs emits telemetry events for monitoring integration:
| Event | Measurements | Metadata |
|---|---|---|
[:timeless_logs, :flush, :stop] |
duration, entry_count, byte_size |
block_id |
[:timeless_logs, :query, :stop] |
duration, total, blocks_read |
filters |
[:timeless_logs, :retention, :stop] |
duration, blocks_deleted |
— |
[:timeless_logs, :compaction, :stop] |
duration, raw_blocks, entry_count, byte_size |
— |
[:timeless_logs, :block, :error] |
— |
file_path, reason |
Attach a handler to monitor query performance:
:telemetry.attach("log-query-monitor", [:timeless_logs, :query, :stop], fn _event, measurements, metadata, _config ->
Logger.info("Query took #{measurements.duration}ns, scanned #{measurements.blocks_read} blocks, found #{measurements.total} entries")
end, nil)