Timeless Observability Demo
Setup
Install all three observability engines and Kino for rich output. Works from a Livebook container or any machine — deps are pulled from GitHub.
Mix.install(
[
{:timeless_metrics, github: "awksedgreep/timeless_metrics"},
{:timeless_logs, github: "awksedgreep/timeless_logs"},
{:timeless_traces, github: "awksedgreep/timeless_traces"},
{:kino, "~> 0.14"}
],
config: [
opentelemetry: [traces_exporter: {TimelessTraces.Exporter, []}],
timeless_logs: [compaction_format: :openzl, flush_interval: 5_000, compaction_threshold: 100],
timeless_traces: [compaction_format: :openzl, flush_interval: 5_000, compaction_threshold: 100]
]
)
Start TimelessMetrics as a named store. TimelessLogs and TimelessTraces start automatically as OTP applications — we just need to configure their data directories.
# Use a temp directory so each run starts fresh
data_dir = Path.join(System.tmp_dir!(), "timeless_demo_#{System.os_time(:second)}")
File.mkdir_p!(data_dir)
metrics_dir = Path.join(data_dir, "metrics")
logs_dir = Path.join(data_dir, "logs")
spans_dir = Path.join(data_dir, "spans")
# Stop auto-started engines so we can point them at the temp data directory
Application.stop(:timeless_logs)
Application.stop(:timeless_traces)
Application.put_env(:timeless_logs, :data_dir, logs_dir)
Application.put_env(:timeless_traces, :data_dir, spans_dir)
# Start TimelessMetrics (named instance)
{:ok, _} =
Supervisor.start_link(
[{TimelessMetrics, name: :demo, data_dir: metrics_dir}],
strategy: :one_for_one
)
# Restart log and trace engines with configured data directories
{:ok, _} = Application.ensure_all_started(:timeless_logs)
{:ok, _} = Application.ensure_all_started(:timeless_traces)
IO.puts("All engines started. Data directory: #{data_dir}")
:ok
A helper to format byte sizes:
defmodule Format do
def bytes(n) when n >= 1_048_576, do: "#{Float.round(n / 1_048_576, 2)} MB"
def bytes(n) when n >= 1_024, do: "#{Float.round(n / 1_024, 1)} KB"
def bytes(n), do: "#{n} B"
def ratio(raw, compressed) when compressed > 0,
do: "#{Float.round(raw / compressed, 1)}x"
def ratio(_, _), do: "N/A"
def duration_ms(nanos), do: "#{Float.round(nanos / 1_000_000, 1)}ms"
end
:ok
Metrics — Write, Query, Chart
Write a simulated time series — CPU usage for two hosts over the last hour, one point per minute.
now = System.os_time(:second)
# Generate 60 minutes of CPU data for two hosts
for minute <- 0..59 do
ts = now - (59 - minute) * 60
# web-1: baseline 65% with some noise
cpu1 = 65 + :rand.uniform() * 20 - 10
TimelessMetrics.write(:demo, "cpu_usage", %{"host" => "web-1"}, cpu1, timestamp: ts)
# web-2: baseline 45% with a spike at minute 35-45
spike = if minute in 35..45, do: 30, else: 0
cpu2 = 45 + spike + :rand.uniform() * 10 - 5
TimelessMetrics.write(:demo, "cpu_usage", %{"host" => "web-2"}, cpu2, timestamp: ts)
end
# Also write some request latency data
for minute <- 0..59 do
ts = now - (59 - minute) * 60
latency = 50 + :rand.uniform() * 100
TimelessMetrics.write(:demo, "http_request_duration_ms", %{"path" => "/api/users"}, latency,
timestamp: ts
)
end
IO.puts("Wrote 180 metric points (60 min x 2 CPU hosts + 60 latency points)")
:ok
Query the data back and render an SVG chart inline:
now = System.os_time(:second)
# Query all cpu_usage series
{:ok, series} =
TimelessMetrics.query_multi(:demo, "cpu_usage", %{}, from: now - 3600, to: now)
# Chart.render expects %{data: [...]}, query_multi returns %{points: [...]}
chart_series = Enum.map(series, fn s -> %{labels: s.labels, data: s.points} end)
# Render the chart
svg = TimelessMetrics.Chart.render("cpu_usage (%)", chart_series, width: 800, height: 300, theme: :dark)
Kino.HTML.new(svg)
Render the request latency chart:
{:ok, series} =
TimelessMetrics.query_multi(:demo, "http_request_duration_ms", %{},
from: now - 3600,
to: now
)
chart_series = Enum.map(series, fn s -> %{labels: s.labels, data: s.points} end)
svg =
TimelessMetrics.Chart.render("http_request_duration_ms", chart_series,
width: 800,
height: 250,
theme: :dark
)
Kino.HTML.new(svg)
Check metrics storage info:
info = TimelessMetrics.info(:demo)
Kino.DataTable.new([
%{metric: "Series", value: info.series_count},
%{metric: "Total points", value: info.total_points},
%{metric: "Compressed blocks", value: info.block_count},
%{metric: "Raw buffer points", value: info.raw_buffer_points},
%{metric: "Compressed bytes", value: Format.bytes(info.compressed_bytes)},
%{metric: "Bytes per point", value: info.bytes_per_point},
%{metric: "Storage on disk", value: Format.bytes(info.storage_bytes)}
])
Logs — Write, Query, Compress
Emit a batch of realistic log entries at various levels:
require Logger
# Info logs
for i <- 1..50 do
Logger.info("GET /api/users/#{i} completed in #{Enum.random(10..200)}ms",
request_id: Base.hex_encode32(:crypto.strong_rand_bytes(8), case: :lower, padding: false),
path: "/api/users/#{i}",
status: 200
)
end
# Debug logs
for _ <- 1..30 do
table = Enum.random(["users", "orders", "products", "sessions"])
Logger.debug("SQL query on #{table} completed in #{Enum.random(1..50)}ms",
table: table,
duration_ms: Enum.random(1..50)
)
end
# Warning logs
for _ <- 1..10 do
Logger.warning("Connection pool running low",
pool_size: 10,
available: Enum.random(1..3)
)
end
# Error logs
for _ <- 1..10 do
Logger.error("Connection timeout to external API",
service: Enum.random(["payments", "auth", "notifications"]),
timeout_ms: 5000
)
end
# Flush to ensure all entries are written and indexed
Process.sleep(1_500)
TimelessLogs.flush()
Process.sleep(500)
IO.puts("Emitted 100 log entries (50 info, 30 debug, 10 warning, 10 error)")
:ok
Query logs by level:
{:ok, errors} = TimelessLogs.query(level: :error)
Kino.DataTable.new(
Enum.map(errors.entries, fn e ->
%{
level: e.level,
message: String.slice(e.message, 0..80),
metadata: inspect(e.metadata, limit: 3)
}
end)
)
Search by message:
{:ok, result} = TimelessLogs.query(message: "timeout")
IO.puts("Found #{result.total} entries matching 'timeout'")
Kino.DataTable.new(
Enum.map(result.entries, fn e ->
%{level: e.level, message: String.slice(e.message, 0..80)}
end)
)
Trigger compaction and check compression stats:
# Trigger compaction (raw → compressed) and merge (small → large)
TimelessLogs.Compactor.compact_now()
TimelessLogs.merge_now()
Process.sleep(1_000)
{:ok, stats} = TimelessLogs.stats()
rows = [
%{field: "Total blocks", value: to_string(stats.total_blocks)},
%{field: "Total entries", value: to_string(stats.total_entries)},
%{field: "Raw blocks", value: to_string(stats.raw_blocks)},
%{field: "Raw bytes", value: Format.bytes(stats.raw_bytes)},
%{field: "OpenZL blocks", value: to_string(stats.openzl_blocks)},
%{field: "OpenZL bytes", value: Format.bytes(stats.openzl_bytes)},
%{field: "Total storage", value: Format.bytes(stats.total_bytes)},
%{field: "Disk size", value: Format.bytes(stats.disk_size)}
]
rows =
if stats.compression_raw_bytes_in > 0 do
rows ++
[
%{
field: "Compression ratio",
value: Format.ratio(stats.compression_raw_bytes_in, stats.compression_compressed_bytes_out)
}
]
else
rows
end
Kino.DataTable.new(rows)
Traces — Create, Query, Explore
Create spans programmatically. We’ll simulate a trace with parent/child relationships — an HTTP request that makes a database query and a cache lookup.
now_ns = System.os_time(:nanosecond)
trace_id = Base.hex_encode32(:crypto.strong_rand_bytes(16), case: :lower, padding: false)
# Helper to generate span IDs
span_id = fn -> Base.hex_encode32(:crypto.strong_rand_bytes(8), case: :lower, padding: false) end
# Build a batch of realistic traces
spans =
for i <- 1..20 do
tid = Base.hex_encode32(:crypto.strong_rand_bytes(16), case: :lower, padding: false)
root_id = span_id.()
db_id = span_id.()
cache_id = span_id.()
service = Enum.random(["api-gateway", "user-service", "order-service"])
path = Enum.random(["/users", "/orders", "/products", "/health"])
method = Enum.random(["GET", "POST", "PUT"])
status = if rem(i, 7) == 0, do: :error, else: :ok
base_time = now_ns - (20 - i) * 1_000_000_000
root_duration = Enum.random(50..300) * 1_000_000
db_duration = Enum.random(5..50) * 1_000_000
cache_duration = Enum.random(1..10) * 1_000_000
[
# Root span: HTTP request
%{
trace_id: tid,
span_id: root_id,
parent_span_id: nil,
name: "#{method} #{path}",
kind: :server,
start_time: base_time,
end_time: base_time + root_duration,
duration_ns: root_duration,
status: status,
status_message: if(status == :error, do: "Internal Server Error", else: nil),
attributes: %{
"service.name" => service,
"http.method" => method,
"http.target" => path,
"http.status_code" => if(status == :error, do: "500", else: "200")
},
events: [],
resource: %{"service.name" => service},
instrumentation_scope: %{name: "timeless_demo", version: "1.0.0"}
},
# Child span: DB query
%{
trace_id: tid,
span_id: db_id,
parent_span_id: root_id,
name: "SELECT users",
kind: :client,
start_time: base_time + 5_000_000,
end_time: base_time + 5_000_000 + db_duration,
duration_ns: db_duration,
status: :ok,
status_message: nil,
attributes: %{
"service.name" => service,
"db.system" => "postgresql",
"db.sql.table" => "users"
},
events: [],
resource: %{"service.name" => service},
instrumentation_scope: %{name: "timeless_demo", version: "1.0.0"}
},
# Child span: cache lookup
%{
trace_id: tid,
span_id: cache_id,
parent_span_id: root_id,
name: "cache.get",
kind: :internal,
start_time: base_time + 5_000_000 + db_duration + 1_000_000,
end_time: base_time + 5_000_000 + db_duration + 1_000_000 + cache_duration,
duration_ns: cache_duration,
status: :ok,
status_message: nil,
attributes: %{
"service.name" => service,
"cache.hit" => to_string(Enum.random([true, false]))
},
events: [],
resource: %{"service.name" => service},
instrumentation_scope: %{name: "timeless_demo", version: "1.0.0"}
}
]
end
|> List.flatten()
TimelessTraces.Buffer.ingest(spans)
Process.sleep(1_500)
TimelessTraces.flush()
Process.sleep(500)
IO.puts("Created #{length(spans)} spans across 20 traces (3 spans each)")
:ok
Query error spans:
{:ok, result} = TimelessTraces.query(status: :error)
IO.puts("Found #{result.total} error spans")
Kino.DataTable.new(
Enum.map(result.entries, fn s ->
%{
name: s.name,
service: Map.get(s.attributes, "service.name", ""),
kind: s.kind,
status: s.status,
duration_ms: Float.round(s.duration_ns / 1_000_000, 1)
}
end)
)
Look up a full trace:
# Get one trace ID from the results
{:ok, result} = TimelessTraces.query(limit: 1)
first_span = hd(result.entries)
trace_id = first_span.trace_id
{:ok, trace_spans} = TimelessTraces.trace(trace_id)
IO.puts("Trace #{trace_id} — #{length(trace_spans)} spans:")
Kino.DataTable.new(
Enum.map(trace_spans, fn s ->
%{
name: s.name,
kind: s.kind,
parent: s.parent_span_id || "(root)",
duration_ms: Float.round(s.duration_ns / 1_000_000, 1),
status: s.status
}
end)
)
List services and operations:
{:ok, services} = TimelessTraces.services()
IO.puts("Services: #{Enum.join(services, ", ")}")
for svc <- services do
{:ok, ops} = TimelessTraces.operations(svc)
IO.puts(" #{svc}: #{Enum.join(ops, ", ")}")
end
:ok
Trigger compaction and check trace compression stats:
TimelessTraces.Compactor.compact_now()
TimelessTraces.merge_now()
Process.sleep(1_000)
{:ok, stats} = TimelessTraces.stats()
rows = [
%{field: "Total blocks", value: to_string(stats.total_blocks)},
%{field: "Total spans", value: to_string(stats.total_entries)},
%{field: "Raw blocks", value: to_string(stats.raw_blocks)},
%{field: "OpenZL blocks", value: to_string(stats.openzl_blocks)},
%{field: "Total storage", value: Format.bytes(stats.total_bytes)},
%{field: "Disk size", value: Format.bytes(stats.disk_size)}
]
rows =
if stats.compression_raw_bytes_in > 0 do
rows ++
[
%{
field: "Compression ratio",
value: Format.ratio(stats.compression_raw_bytes_in, stats.compression_compressed_bytes_out)
}
]
else
rows
end
Kino.DataTable.new(rows)
Scale Test — Bulk Data & Compression
Generate a larger volume of data to see compression ratios at scale.
Metrics: 10,000 points
now = System.os_time(:second)
# 5 metrics x 2 hosts x 1000 points = 10,000 total
metrics = ["cpu_usage", "mem_usage", "disk_io", "net_rx_bytes", "net_tx_bytes"]
hosts = ["web-1", "web-2"]
{elapsed_us, :ok} =
:timer.tc(fn ->
batch =
for metric <- metrics, host <- hosts, i <- 0..999 do
ts = now - (999 - i) * 60
value = :rand.uniform() * 100
{metric, %{"host" => host}, value, ts}
end
TimelessMetrics.write_batch(:demo, batch)
:ok
end)
IO.puts("Wrote 10,000 metric points in #{Format.duration_ms(elapsed_us * 1000)}")
info = TimelessMetrics.info(:demo)
Kino.DataTable.new([
%{metric: "Series", value: info.series_count},
%{metric: "Total points", value: info.total_points},
%{metric: "Compressed bytes", value: Format.bytes(info.compressed_bytes)},
%{metric: "Bytes per point", value: info.bytes_per_point},
%{metric: "Storage on disk", value: Format.bytes(info.storage_bytes)}
])
Logs: 5,000 entries
require Logger
{elapsed_us, :ok} =
:timer.tc(fn ->
for i <- 1..5_000 do
level = Enum.random([:debug, :debug, :info, :info, :info, :warning, :error])
path = Enum.random(["/users", "/orders", "/products", "/api/health", "/search"])
method = Enum.random(["GET", "POST", "PUT", "DELETE"])
case level do
:debug ->
Logger.debug("Cache lookup for key:#{i}", key: "item:#{i}")
:info ->
Logger.info("#{method} #{path} completed in #{Enum.random(10..200)}ms",
request_id: "req-#{i}",
status: Enum.random([200, 201, 301])
)
:warning ->
Logger.warning("Slow query on table=#{Enum.random(["users", "orders"])}",
duration_ms: Enum.random(500..2000)
)
:error ->
Logger.error("Failed to connect to #{Enum.random(["redis", "postgres", "api"])}",
retries: Enum.random(1..5)
)
end
end
:ok
end)
IO.puts("Emitted 5,000 log entries in #{Format.duration_ms(elapsed_us * 1000)}")
# Wait for flush + compact
Process.sleep(2_000)
TimelessLogs.flush()
Process.sleep(500)
TimelessLogs.Compactor.compact_now()
TimelessLogs.merge_now()
Process.sleep(1_000)
{:ok, stats} = TimelessLogs.stats()
rows = [
%{field: "Total entries", value: to_string(stats.total_entries)},
%{field: "Raw blocks", value: to_string(stats.raw_blocks)},
%{field: "OpenZL blocks", value: to_string(stats.openzl_blocks)},
%{field: "Total storage", value: Format.bytes(stats.total_bytes)},
%{field: "Disk size", value: Format.bytes(stats.disk_size)}
]
rows =
if stats.compression_raw_bytes_in > 0 do
rows ++
[
%{
field: "Compression ratio",
value: Format.ratio(stats.compression_raw_bytes_in, stats.compression_compressed_bytes_out)
}
]
else
rows
end
Kino.DataTable.new(rows)
Traces: 1,500 spans (500 traces)
now_ns = System.os_time(:nanosecond)
span_id = fn -> Base.hex_encode32(:crypto.strong_rand_bytes(8), case: :lower, padding: false) end
{elapsed_us, :ok} =
:timer.tc(fn ->
spans =
for i <- 1..500 do
tid = Base.hex_encode32(:crypto.strong_rand_bytes(16), case: :lower, padding: false)
root = span_id.()
child1 = span_id.()
child2 = span_id.()
service = Enum.random(["api-gateway", "user-service", "order-service", "payment-service"])
base = now_ns - (500 - i) * 2_000_000_000
root_dur = Enum.random(20..500) * 1_000_000
status = if rem(i, 10) == 0, do: :error, else: :ok
[
%{
trace_id: tid, span_id: root, parent_span_id: nil,
name: "#{Enum.random(["GET", "POST"])} #{Enum.random(["/users", "/orders"])}",
kind: :server, start_time: base, end_time: base + root_dur,
duration_ns: root_dur, status: status, status_message: nil,
attributes: %{"service.name" => service},
events: [], resource: %{"service.name" => service},
instrumentation_scope: %{name: "demo", version: "1.0"}
},
%{
trace_id: tid, span_id: child1, parent_span_id: root,
name: "DB query", kind: :client,
start_time: base + 2_000_000,
end_time: base + 2_000_000 + Enum.random(5..50) * 1_000_000,
duration_ns: Enum.random(5..50) * 1_000_000, status: :ok, status_message: nil,
attributes: %{"service.name" => service, "db.system" => "postgresql"},
events: [], resource: %{"service.name" => service},
instrumentation_scope: %{name: "demo", version: "1.0"}
},
%{
trace_id: tid, span_id: child2, parent_span_id: root,
name: "cache.get", kind: :internal,
start_time: base + 10_000_000,
end_time: base + 10_000_000 + Enum.random(1..5) * 1_000_000,
duration_ns: Enum.random(1..5) * 1_000_000, status: :ok, status_message: nil,
attributes: %{"service.name" => service, "cache.hit" => to_string(Enum.random([true, false]))},
events: [], resource: %{"service.name" => service},
instrumentation_scope: %{name: "demo", version: "1.0"}
}
]
end
|> List.flatten()
TimelessTraces.Buffer.ingest(spans)
:ok
end)
IO.puts("Ingested 1,500 spans in #{Format.duration_ms(elapsed_us * 1000)}")
# Wait for flush + compact
Process.sleep(2_000)
TimelessTraces.flush()
Process.sleep(500)
TimelessTraces.Compactor.compact_now()
TimelessTraces.merge_now()
Process.sleep(1_000)
{:ok, stats} = TimelessTraces.stats()
rows = [
%{field: "Total spans", value: to_string(stats.total_entries)},
%{field: "Raw blocks", value: to_string(stats.raw_blocks)},
%{field: "OpenZL blocks", value: to_string(stats.openzl_blocks)},
%{field: "Total storage", value: Format.bytes(stats.total_bytes)},
%{field: "Disk size", value: Format.bytes(stats.disk_size)}
]
rows =
if stats.compression_raw_bytes_in > 0 do
rows ++
[
%{
field: "Compression ratio",
value: Format.ratio(stats.compression_raw_bytes_in, stats.compression_compressed_bytes_out)
}
]
else
rows
end
Kino.DataTable.new(rows)
Unified Summary
Side-by-side comparison of all three engines:
metrics_info = TimelessMetrics.info(:demo)
{:ok, logs_stats} = TimelessLogs.stats()
{:ok, traces_stats} = TimelessTraces.stats()
logs_ratio =
if logs_stats.compression_raw_bytes_in > 0,
do: Format.ratio(logs_stats.compression_raw_bytes_in, logs_stats.compression_compressed_bytes_out),
else: "N/A"
traces_ratio =
if traces_stats.compression_raw_bytes_in > 0,
do: Format.ratio(traces_stats.compression_raw_bytes_in, traces_stats.compression_compressed_bytes_out),
else: "N/A"
summary = [
%{
engine: "TimelessMetrics",
format: "Gorilla + Zstd",
entries: metrics_info.total_points,
storage: Format.bytes(metrics_info.storage_bytes),
efficiency: "#{metrics_info.bytes_per_point} bytes/point"
},
%{
engine: "TimelessLogs",
format: "OpenZL columnar",
entries: logs_stats.total_entries,
storage: Format.bytes(logs_stats.disk_size),
efficiency: "#{logs_ratio} compression"
},
%{
engine: "TimelessTraces",
format: "OpenZL columnar",
entries: traces_stats.total_entries,
storage: Format.bytes(traces_stats.disk_size),
efficiency: "#{traces_ratio} compression"
}
]
total_entries = metrics_info.total_points + logs_stats.total_entries + traces_stats.total_entries
total_disk = metrics_info.storage_bytes + logs_stats.disk_size + traces_stats.disk_size
Kino.DataTable.new(summary)
Kino.Markdown.new("""
### Totals
| | |
|---|---|
| **Total data points** | #{total_entries} |
| **Total disk usage** | #{Format.bytes(total_disk)} |
| **Data directory** | `#{data_dir}` |
All data is compressed, indexed, and queryable — with zero external infrastructure.
""")
Cleanup
Remove the temporary data directory:
File.rm_rf!(data_dir)
IO.puts("Cleaned up #{data_dir}")
:ok