Powered by AppSignal & Oban Pro

Timeless Observability Demo

livebook/demo.livemd

Timeless Observability Demo

Setup

Install all three observability engines and Kino for rich output. Works from a Livebook container or any machine — deps are pulled from GitHub.

Mix.install(
  [
    {:timeless_metrics, github: "awksedgreep/timeless_metrics"},
    {:timeless_logs, github: "awksedgreep/timeless_logs"},
    {:timeless_traces, github: "awksedgreep/timeless_traces"},
    {:kino, "~> 0.14"}
  ],
  config: [
    opentelemetry: [traces_exporter: {TimelessTraces.Exporter, []}],
    timeless_logs: [compaction_format: :openzl, flush_interval: 5_000, compaction_threshold: 100],
    timeless_traces: [compaction_format: :openzl, flush_interval: 5_000, compaction_threshold: 100]
  ]
)

Start TimelessMetrics as a named store. TimelessLogs and TimelessTraces start automatically as OTP applications — we just need to configure their data directories.

# Use a temp directory so each run starts fresh
data_dir = Path.join(System.tmp_dir!(), "timeless_demo_#{System.os_time(:second)}")
File.mkdir_p!(data_dir)

metrics_dir = Path.join(data_dir, "metrics")
logs_dir = Path.join(data_dir, "logs")
spans_dir = Path.join(data_dir, "spans")

# Stop auto-started engines so we can point them at the temp data directory
Application.stop(:timeless_logs)
Application.stop(:timeless_traces)

Application.put_env(:timeless_logs, :data_dir, logs_dir)
Application.put_env(:timeless_traces, :data_dir, spans_dir)

# Start TimelessMetrics (named instance)
{:ok, _} =
  Supervisor.start_link(
    [{TimelessMetrics, name: :demo, data_dir: metrics_dir}],
    strategy: :one_for_one
  )

# Restart log and trace engines with configured data directories
{:ok, _} = Application.ensure_all_started(:timeless_logs)
{:ok, _} = Application.ensure_all_started(:timeless_traces)

IO.puts("All engines started. Data directory: #{data_dir}")
:ok

A helper to format byte sizes:

defmodule Format do
  def bytes(n) when n >= 1_048_576, do: "#{Float.round(n / 1_048_576, 2)} MB"
  def bytes(n) when n >= 1_024, do: "#{Float.round(n / 1_024, 1)} KB"
  def bytes(n), do: "#{n} B"

  def ratio(raw, compressed) when compressed > 0,
    do: "#{Float.round(raw / compressed, 1)}x"

  def ratio(_, _), do: "N/A"

  def duration_ms(nanos), do: "#{Float.round(nanos / 1_000_000, 1)}ms"
end

:ok

Metrics — Write, Query, Chart

Write a simulated time series — CPU usage for two hosts over the last hour, one point per minute.

now = System.os_time(:second)

# Generate 60 minutes of CPU data for two hosts
for minute <- 0..59 do
  ts = now - (59 - minute) * 60

  # web-1: baseline 65% with some noise
  cpu1 = 65 + :rand.uniform() * 20 - 10
  TimelessMetrics.write(:demo, "cpu_usage", %{"host" => "web-1"}, cpu1, timestamp: ts)

  # web-2: baseline 45% with a spike at minute 35-45
  spike = if minute in 35..45, do: 30, else: 0
  cpu2 = 45 + spike + :rand.uniform() * 10 - 5
  TimelessMetrics.write(:demo, "cpu_usage", %{"host" => "web-2"}, cpu2, timestamp: ts)
end

# Also write some request latency data
for minute <- 0..59 do
  ts = now - (59 - minute) * 60
  latency = 50 + :rand.uniform() * 100
  TimelessMetrics.write(:demo, "http_request_duration_ms", %{"path" => "/api/users"}, latency,
    timestamp: ts
  )
end

IO.puts("Wrote 180 metric points (60 min x 2 CPU hosts + 60 latency points)")
:ok

Query the data back and render an SVG chart inline:

now = System.os_time(:second)

# Query all cpu_usage series
{:ok, series} =
  TimelessMetrics.query_multi(:demo, "cpu_usage", %{}, from: now - 3600, to: now)

# Chart.render expects %{data: [...]}, query_multi returns %{points: [...]}
chart_series = Enum.map(series, fn s -> %{labels: s.labels, data: s.points} end)

# Render the chart
svg = TimelessMetrics.Chart.render("cpu_usage (%)", chart_series, width: 800, height: 300, theme: :dark)

Kino.HTML.new(svg)

Render the request latency chart:

{:ok, series} =
  TimelessMetrics.query_multi(:demo, "http_request_duration_ms", %{},
    from: now - 3600,
    to: now
  )

chart_series = Enum.map(series, fn s -> %{labels: s.labels, data: s.points} end)

svg =
  TimelessMetrics.Chart.render("http_request_duration_ms", chart_series,
    width: 800,
    height: 250,
    theme: :dark
  )

Kino.HTML.new(svg)

Check metrics storage info:

info = TimelessMetrics.info(:demo)

Kino.DataTable.new([
  %{metric: "Series", value: info.series_count},
  %{metric: "Total points", value: info.total_points},
  %{metric: "Compressed blocks", value: info.block_count},
  %{metric: "Raw buffer points", value: info.raw_buffer_points},
  %{metric: "Compressed bytes", value: Format.bytes(info.compressed_bytes)},
  %{metric: "Bytes per point", value: info.bytes_per_point},
  %{metric: "Storage on disk", value: Format.bytes(info.storage_bytes)}
])

Logs — Write, Query, Compress

Emit a batch of realistic log entries at various levels:

require Logger

# Info logs
for i <- 1..50 do
  Logger.info("GET /api/users/#{i} completed in #{Enum.random(10..200)}ms",
    request_id: Base.hex_encode32(:crypto.strong_rand_bytes(8), case: :lower, padding: false),
    path: "/api/users/#{i}",
    status: 200
  )
end

# Debug logs
for _ <- 1..30 do
  table = Enum.random(["users", "orders", "products", "sessions"])

  Logger.debug("SQL query on #{table} completed in #{Enum.random(1..50)}ms",
    table: table,
    duration_ms: Enum.random(1..50)
  )
end

# Warning logs
for _ <- 1..10 do
  Logger.warning("Connection pool running low",
    pool_size: 10,
    available: Enum.random(1..3)
  )
end

# Error logs
for _ <- 1..10 do
  Logger.error("Connection timeout to external API",
    service: Enum.random(["payments", "auth", "notifications"]),
    timeout_ms: 5000
  )
end

# Flush to ensure all entries are written and indexed
Process.sleep(1_500)
TimelessLogs.flush()
Process.sleep(500)

IO.puts("Emitted 100 log entries (50 info, 30 debug, 10 warning, 10 error)")
:ok

Query logs by level:

{:ok, errors} = TimelessLogs.query(level: :error)

Kino.DataTable.new(
  Enum.map(errors.entries, fn e ->
    %{
      level: e.level,
      message: String.slice(e.message, 0..80),
      metadata: inspect(e.metadata, limit: 3)
    }
  end)
)

Search by message:

{:ok, result} = TimelessLogs.query(message: "timeout")

IO.puts("Found #{result.total} entries matching 'timeout'")

Kino.DataTable.new(
  Enum.map(result.entries, fn e ->
    %{level: e.level, message: String.slice(e.message, 0..80)}
  end)
)

Trigger compaction and check compression stats:

# Trigger compaction (raw → compressed) and merge (small → large)
TimelessLogs.Compactor.compact_now()
TimelessLogs.merge_now()
Process.sleep(1_000)

{:ok, stats} = TimelessLogs.stats()

rows = [
  %{field: "Total blocks", value: to_string(stats.total_blocks)},
  %{field: "Total entries", value: to_string(stats.total_entries)},
  %{field: "Raw blocks", value: to_string(stats.raw_blocks)},
  %{field: "Raw bytes", value: Format.bytes(stats.raw_bytes)},
  %{field: "OpenZL blocks", value: to_string(stats.openzl_blocks)},
  %{field: "OpenZL bytes", value: Format.bytes(stats.openzl_bytes)},
  %{field: "Total storage", value: Format.bytes(stats.total_bytes)},
  %{field: "Disk size", value: Format.bytes(stats.disk_size)}
]

rows =
  if stats.compression_raw_bytes_in > 0 do
    rows ++
      [
        %{
          field: "Compression ratio",
          value: Format.ratio(stats.compression_raw_bytes_in, stats.compression_compressed_bytes_out)
        }
      ]
  else
    rows
  end

Kino.DataTable.new(rows)

Traces — Create, Query, Explore

Create spans programmatically. We’ll simulate a trace with parent/child relationships — an HTTP request that makes a database query and a cache lookup.

now_ns = System.os_time(:nanosecond)
trace_id = Base.hex_encode32(:crypto.strong_rand_bytes(16), case: :lower, padding: false)

# Helper to generate span IDs
span_id = fn -> Base.hex_encode32(:crypto.strong_rand_bytes(8), case: :lower, padding: false) end

# Build a batch of realistic traces
spans =
  for i <- 1..20 do
    tid = Base.hex_encode32(:crypto.strong_rand_bytes(16), case: :lower, padding: false)
    root_id = span_id.()
    db_id = span_id.()
    cache_id = span_id.()

    service = Enum.random(["api-gateway", "user-service", "order-service"])
    path = Enum.random(["/users", "/orders", "/products", "/health"])
    method = Enum.random(["GET", "POST", "PUT"])
    status = if rem(i, 7) == 0, do: :error, else: :ok

    base_time = now_ns - (20 - i) * 1_000_000_000
    root_duration = Enum.random(50..300) * 1_000_000
    db_duration = Enum.random(5..50) * 1_000_000
    cache_duration = Enum.random(1..10) * 1_000_000

    [
      # Root span: HTTP request
      %{
        trace_id: tid,
        span_id: root_id,
        parent_span_id: nil,
        name: "#{method} #{path}",
        kind: :server,
        start_time: base_time,
        end_time: base_time + root_duration,
        duration_ns: root_duration,
        status: status,
        status_message: if(status == :error, do: "Internal Server Error", else: nil),
        attributes: %{
          "service.name" => service,
          "http.method" => method,
          "http.target" => path,
          "http.status_code" => if(status == :error, do: "500", else: "200")
        },
        events: [],
        resource: %{"service.name" => service},
        instrumentation_scope: %{name: "timeless_demo", version: "1.0.0"}
      },
      # Child span: DB query
      %{
        trace_id: tid,
        span_id: db_id,
        parent_span_id: root_id,
        name: "SELECT users",
        kind: :client,
        start_time: base_time + 5_000_000,
        end_time: base_time + 5_000_000 + db_duration,
        duration_ns: db_duration,
        status: :ok,
        status_message: nil,
        attributes: %{
          "service.name" => service,
          "db.system" => "postgresql",
          "db.sql.table" => "users"
        },
        events: [],
        resource: %{"service.name" => service},
        instrumentation_scope: %{name: "timeless_demo", version: "1.0.0"}
      },
      # Child span: cache lookup
      %{
        trace_id: tid,
        span_id: cache_id,
        parent_span_id: root_id,
        name: "cache.get",
        kind: :internal,
        start_time: base_time + 5_000_000 + db_duration + 1_000_000,
        end_time: base_time + 5_000_000 + db_duration + 1_000_000 + cache_duration,
        duration_ns: cache_duration,
        status: :ok,
        status_message: nil,
        attributes: %{
          "service.name" => service,
          "cache.hit" => to_string(Enum.random([true, false]))
        },
        events: [],
        resource: %{"service.name" => service},
        instrumentation_scope: %{name: "timeless_demo", version: "1.0.0"}
      }
    ]
  end
  |> List.flatten()

TimelessTraces.Buffer.ingest(spans)
Process.sleep(1_500)
TimelessTraces.flush()
Process.sleep(500)

IO.puts("Created #{length(spans)} spans across 20 traces (3 spans each)")
:ok

Query error spans:

{:ok, result} = TimelessTraces.query(status: :error)

IO.puts("Found #{result.total} error spans")

Kino.DataTable.new(
  Enum.map(result.entries, fn s ->
    %{
      name: s.name,
      service: Map.get(s.attributes, "service.name", ""),
      kind: s.kind,
      status: s.status,
      duration_ms: Float.round(s.duration_ns / 1_000_000, 1)
    }
  end)
)

Look up a full trace:

# Get one trace ID from the results
{:ok, result} = TimelessTraces.query(limit: 1)
first_span = hd(result.entries)
trace_id = first_span.trace_id

{:ok, trace_spans} = TimelessTraces.trace(trace_id)

IO.puts("Trace #{trace_id}#{length(trace_spans)} spans:")

Kino.DataTable.new(
  Enum.map(trace_spans, fn s ->
    %{
      name: s.name,
      kind: s.kind,
      parent: s.parent_span_id || "(root)",
      duration_ms: Float.round(s.duration_ns / 1_000_000, 1),
      status: s.status
    }
  end)
)

List services and operations:

{:ok, services} = TimelessTraces.services()
IO.puts("Services: #{Enum.join(services, ", ")}")

for svc <- services do
  {:ok, ops} = TimelessTraces.operations(svc)
  IO.puts("  #{svc}: #{Enum.join(ops, ", ")}")
end

:ok

Trigger compaction and check trace compression stats:

TimelessTraces.Compactor.compact_now()
TimelessTraces.merge_now()
Process.sleep(1_000)

{:ok, stats} = TimelessTraces.stats()

rows = [
  %{field: "Total blocks", value: to_string(stats.total_blocks)},
  %{field: "Total spans", value: to_string(stats.total_entries)},
  %{field: "Raw blocks", value: to_string(stats.raw_blocks)},
  %{field: "OpenZL blocks", value: to_string(stats.openzl_blocks)},
  %{field: "Total storage", value: Format.bytes(stats.total_bytes)},
  %{field: "Disk size", value: Format.bytes(stats.disk_size)}
]

rows =
  if stats.compression_raw_bytes_in > 0 do
    rows ++
      [
        %{
          field: "Compression ratio",
          value: Format.ratio(stats.compression_raw_bytes_in, stats.compression_compressed_bytes_out)
        }
      ]
  else
    rows
  end

Kino.DataTable.new(rows)

Scale Test — Bulk Data & Compression

Generate a larger volume of data to see compression ratios at scale.

Metrics: 10,000 points

now = System.os_time(:second)

# 5 metrics x 2 hosts x 1000 points = 10,000 total
metrics = ["cpu_usage", "mem_usage", "disk_io", "net_rx_bytes", "net_tx_bytes"]
hosts = ["web-1", "web-2"]

{elapsed_us, :ok} =
  :timer.tc(fn ->
    batch =
      for metric <- metrics, host <- hosts, i <- 0..999 do
        ts = now - (999 - i) * 60
        value = :rand.uniform() * 100
        {metric, %{"host" => host}, value, ts}
      end

    TimelessMetrics.write_batch(:demo, batch)
    :ok
  end)

IO.puts("Wrote 10,000 metric points in #{Format.duration_ms(elapsed_us * 1000)}")

info = TimelessMetrics.info(:demo)

Kino.DataTable.new([
  %{metric: "Series", value: info.series_count},
  %{metric: "Total points", value: info.total_points},
  %{metric: "Compressed bytes", value: Format.bytes(info.compressed_bytes)},
  %{metric: "Bytes per point", value: info.bytes_per_point},
  %{metric: "Storage on disk", value: Format.bytes(info.storage_bytes)}
])

Logs: 5,000 entries

require Logger

{elapsed_us, :ok} =
  :timer.tc(fn ->
    for i <- 1..5_000 do
      level = Enum.random([:debug, :debug, :info, :info, :info, :warning, :error])
      path = Enum.random(["/users", "/orders", "/products", "/api/health", "/search"])
      method = Enum.random(["GET", "POST", "PUT", "DELETE"])

      case level do
        :debug ->
          Logger.debug("Cache lookup for key:#{i}", key: "item:#{i}")

        :info ->
          Logger.info("#{method} #{path} completed in #{Enum.random(10..200)}ms",
            request_id: "req-#{i}",
            status: Enum.random([200, 201, 301])
          )

        :warning ->
          Logger.warning("Slow query on table=#{Enum.random(["users", "orders"])}",
            duration_ms: Enum.random(500..2000)
          )

        :error ->
          Logger.error("Failed to connect to #{Enum.random(["redis", "postgres", "api"])}",
            retries: Enum.random(1..5)
          )
      end
    end

    :ok
  end)

IO.puts("Emitted 5,000 log entries in #{Format.duration_ms(elapsed_us * 1000)}")

# Wait for flush + compact
Process.sleep(2_000)
TimelessLogs.flush()
Process.sleep(500)
TimelessLogs.Compactor.compact_now()
TimelessLogs.merge_now()
Process.sleep(1_000)

{:ok, stats} = TimelessLogs.stats()

rows = [
  %{field: "Total entries", value: to_string(stats.total_entries)},
  %{field: "Raw blocks", value: to_string(stats.raw_blocks)},
  %{field: "OpenZL blocks", value: to_string(stats.openzl_blocks)},
  %{field: "Total storage", value: Format.bytes(stats.total_bytes)},
  %{field: "Disk size", value: Format.bytes(stats.disk_size)}
]

rows =
  if stats.compression_raw_bytes_in > 0 do
    rows ++
      [
        %{
          field: "Compression ratio",
          value: Format.ratio(stats.compression_raw_bytes_in, stats.compression_compressed_bytes_out)
        }
      ]
  else
    rows
  end

Kino.DataTable.new(rows)

Traces: 1,500 spans (500 traces)

now_ns = System.os_time(:nanosecond)
span_id = fn -> Base.hex_encode32(:crypto.strong_rand_bytes(8), case: :lower, padding: false) end

{elapsed_us, :ok} =
  :timer.tc(fn ->
    spans =
      for i <- 1..500 do
        tid = Base.hex_encode32(:crypto.strong_rand_bytes(16), case: :lower, padding: false)
        root = span_id.()
        child1 = span_id.()
        child2 = span_id.()

        service = Enum.random(["api-gateway", "user-service", "order-service", "payment-service"])
        base = now_ns - (500 - i) * 2_000_000_000
        root_dur = Enum.random(20..500) * 1_000_000
        status = if rem(i, 10) == 0, do: :error, else: :ok

        [
          %{
            trace_id: tid, span_id: root, parent_span_id: nil,
            name: "#{Enum.random(["GET", "POST"])} #{Enum.random(["/users", "/orders"])}",
            kind: :server, start_time: base, end_time: base + root_dur,
            duration_ns: root_dur, status: status, status_message: nil,
            attributes: %{"service.name" => service},
            events: [], resource: %{"service.name" => service},
            instrumentation_scope: %{name: "demo", version: "1.0"}
          },
          %{
            trace_id: tid, span_id: child1, parent_span_id: root,
            name: "DB query", kind: :client,
            start_time: base + 2_000_000,
            end_time: base + 2_000_000 + Enum.random(5..50) * 1_000_000,
            duration_ns: Enum.random(5..50) * 1_000_000, status: :ok, status_message: nil,
            attributes: %{"service.name" => service, "db.system" => "postgresql"},
            events: [], resource: %{"service.name" => service},
            instrumentation_scope: %{name: "demo", version: "1.0"}
          },
          %{
            trace_id: tid, span_id: child2, parent_span_id: root,
            name: "cache.get", kind: :internal,
            start_time: base + 10_000_000,
            end_time: base + 10_000_000 + Enum.random(1..5) * 1_000_000,
            duration_ns: Enum.random(1..5) * 1_000_000, status: :ok, status_message: nil,
            attributes: %{"service.name" => service, "cache.hit" => to_string(Enum.random([true, false]))},
            events: [], resource: %{"service.name" => service},
            instrumentation_scope: %{name: "demo", version: "1.0"}
          }
        ]
      end
      |> List.flatten()

    TimelessTraces.Buffer.ingest(spans)
    :ok
  end)

IO.puts("Ingested 1,500 spans in #{Format.duration_ms(elapsed_us * 1000)}")

# Wait for flush + compact
Process.sleep(2_000)
TimelessTraces.flush()
Process.sleep(500)
TimelessTraces.Compactor.compact_now()
TimelessTraces.merge_now()
Process.sleep(1_000)

{:ok, stats} = TimelessTraces.stats()

rows = [
  %{field: "Total spans", value: to_string(stats.total_entries)},
  %{field: "Raw blocks", value: to_string(stats.raw_blocks)},
  %{field: "OpenZL blocks", value: to_string(stats.openzl_blocks)},
  %{field: "Total storage", value: Format.bytes(stats.total_bytes)},
  %{field: "Disk size", value: Format.bytes(stats.disk_size)}
]

rows =
  if stats.compression_raw_bytes_in > 0 do
    rows ++
      [
        %{
          field: "Compression ratio",
          value: Format.ratio(stats.compression_raw_bytes_in, stats.compression_compressed_bytes_out)
        }
      ]
  else
    rows
  end

Kino.DataTable.new(rows)

Unified Summary

Side-by-side comparison of all three engines:

metrics_info = TimelessMetrics.info(:demo)
{:ok, logs_stats} = TimelessLogs.stats()
{:ok, traces_stats} = TimelessTraces.stats()

logs_ratio =
  if logs_stats.compression_raw_bytes_in > 0,
    do: Format.ratio(logs_stats.compression_raw_bytes_in, logs_stats.compression_compressed_bytes_out),
    else: "N/A"

traces_ratio =
  if traces_stats.compression_raw_bytes_in > 0,
    do: Format.ratio(traces_stats.compression_raw_bytes_in, traces_stats.compression_compressed_bytes_out),
    else: "N/A"

summary = [
  %{
    engine: "TimelessMetrics",
    format: "Gorilla + Zstd",
    entries: metrics_info.total_points,
    storage: Format.bytes(metrics_info.storage_bytes),
    efficiency: "#{metrics_info.bytes_per_point} bytes/point"
  },
  %{
    engine: "TimelessLogs",
    format: "OpenZL columnar",
    entries: logs_stats.total_entries,
    storage: Format.bytes(logs_stats.disk_size),
    efficiency: "#{logs_ratio} compression"
  },
  %{
    engine: "TimelessTraces",
    format: "OpenZL columnar",
    entries: traces_stats.total_entries,
    storage: Format.bytes(traces_stats.disk_size),
    efficiency: "#{traces_ratio} compression"
  }
]

total_entries = metrics_info.total_points + logs_stats.total_entries + traces_stats.total_entries
total_disk = metrics_info.storage_bytes + logs_stats.disk_size + traces_stats.disk_size

Kino.DataTable.new(summary)
Kino.Markdown.new("""
### Totals

| | |
|---|---|
| **Total data points** | #{total_entries} |
| **Total disk usage** | #{Format.bytes(total_disk)} |
| **Data directory** | `#{data_dir}` |

All data is compressed, indexed, and queryable — with zero external infrastructure.
""")

Cleanup

Remove the temporary data directory:

File.rm_rf!(data_dir)
IO.puts("Cleaned up #{data_dir}")
:ok