Clickhouse Loadtest
Section
A load testing tool for ClickHouse databases. Simulates concurrent query load by repeatedly fetching logs for random projects, measuring response times, and tracking bytes read. Useful for benchmarking query performance and testing ClickHouse cluster capacity.
Mix.install([
{:ch, "~> 0.3.2"},
{:kino, "~> 0.16.0"},
{:kino_vega_lite, "~> 0.1.11"},
{:vega_lite, "~> 0.1.10"},
{:jason, "~> 1.4"}
])
Setup
Establishes the ClickHouse connection using credentials from environment variables. Creates a connection pool with 25 connections and a 15-second timeout. The pid returned is used by all subsequent queries.
defaults = [
scheme: "https",
hostname: System.get_env("LB_SB_CH_HOSTNAME"),
port: 8443,
database: "default",
settings: [],
pool_size: 25,
timeout: :timer.seconds(15),
username: "chase_read_only",
password: System.get_env("LB_SB_CH_AB6JFB0Q6S_LOGFLARE"),
queue_target: 1_000
]
# note that starting in ClickHouse 25.1.3.23 `default` user doesn't have
# network access by default in the official Docker images
# see https://github.com/ClickHouse/ClickHouse/pull/75259
{:ok, pid} = Ch.start_link(defaults)
%Ch.Result{} = Ch.query!(pid, "select 1")
SqlBuilder
A composable SQL query builder for constructing dynamic ClickHouse queries. Uses the builder pattern with pipe-friendly functions. Supports named parameter substitution ({param} syntax) with automatic quoting for strings.
defmodule SqlBuilder do
@moduledoc """
A composable SQL query builder for constructing dynamic queries.
## Example
SqlBuilder.new()
|> SqlBuilder.select([:timestamp, :project, :id, :event_message])
|> SqlBuilder.from("my_table")
|> SqlBuilder.where("timestamp >= now() - interval {minutes} minute", minutes: 10)
|> SqlBuilder.where("project = {project}", project: "my_project")
|> SqlBuilder.where_in("source_name", ["deno-relay-logs", "deno-subhosting-events"])
|> SqlBuilder.order_by(:timestamp, :desc)
|> SqlBuilder.limit(100)
|> SqlBuilder.setting(:query_plan_optimize_lazy_materialization, 1)
|> SqlBuilder.to_sql()
"""
@type t :: %__MODULE__{
select: [String.t()],
from: String.t() | nil,
where: [String.t()],
params: keyword(),
order_by: [{String.t(), :asc | :desc}],
limit: pos_integer() | nil,
settings: keyword()
}
defstruct select: [],
from: nil,
where: [],
params: [],
order_by: [],
group_by: [],
limit: nil,
settings: []
@doc """
Create a new empty query builder.
## Examples
iex> SqlBuilder.new()
%SqlBuilder{select: [], from: nil, where: [], params: [], order_by: [], group_by: [], limit: nil, settings: []}
"""
@spec new() :: t()
def new, do: %__MODULE__{}
@doc """
Set the SELECT columns. Accepts atoms or strings.
## Examples
iex> SqlBuilder.new() |> SqlBuilder.select(:id) |> Map.get(:select)
["id"]
iex> SqlBuilder.new() |> SqlBuilder.select([:id, :name, "count(*)"]) |> Map.get(:select)
["id", "name", "count(*)"]
"""
@spec select(t(), [atom() | String.t()] | atom() | String.t()) :: t()
def select(%__MODULE__{} = q, columns) when is_list(columns) do
cols = Enum.map(columns, &to_string/1)
%{q | select: q.select ++ cols}
end
def select(%__MODULE__{} = q, column), do: select(q, [column])
@doc """
Set the FROM table. Accepts a string or atom.
## Examples
iex> SqlBuilder.new() |> SqlBuilder.from(:users) |> Map.get(:from)
"users"
iex> SqlBuilder.new() |> SqlBuilder.from("events") |> Map.get(:from)
"events"
"""
@spec from(t(), String.t() | atom()) :: t()
def from(%__MODULE__{} = q, table), do: %{q | from: to_string(table)}
@doc """
Add a WHERE clause with optional named parameters.
Parameters are written as `{param_name}` in the clause string and
passed as a keyword list. String values are automatically quoted.
## Examples
iex> SqlBuilder.new() |> SqlBuilder.where("active = true") |> Map.get(:where)
["active = true"]
iex> SqlBuilder.new() |> SqlBuilder.where("name = {name}", name: "alice") |> Map.get(:where)
["name = 'alice'"]
iex> SqlBuilder.new() |> SqlBuilder.where("count > {n}", n: 10) |> Map.get(:where)
["count > 10"]
"""
@spec where(t(), String.t(), keyword()) :: t()
def where(%__MODULE__{} = q, clause, params \\ []) do
rendered = render_params(clause, params)
%{q | where: q.where ++ [rendered]}
end
@doc """
Add a WHERE col IN (...) clause.
## Examples
iex> SqlBuilder.new() |> SqlBuilder.where_in(:status, ["active", "pending"]) |> Map.get(:where)
["status in ('active', 'pending')"]
iex> SqlBuilder.new() |> SqlBuilder.where_in("id", [1, 2, 3]) |> Map.get(:where)
["id in (1, 2, 3)"]
"""
@spec where_in(t(), String.t() | atom(), [term()]) :: t()
def where_in(%__MODULE__{} = q, column, values) when is_list(values) do
formatted =
values
|> Enum.map(&format_value/1)
|> Enum.join(", ")
clause = "#{column} in (#{formatted})"
%{q | where: q.where ++ [clause]}
end
@doc """
Add an ORDER BY clause.
## Examples
iex> SqlBuilder.new() |> SqlBuilder.order_by(:created_at) |> Map.get(:order_by)
[{"created_at", :asc}]
iex> SqlBuilder.new() |> SqlBuilder.order_by(:timestamp, :desc) |> Map.get(:order_by)
[{"timestamp", :desc}]
"""
@spec order_by(t(), String.t() | atom(), :asc | :desc) :: t()
def order_by(%__MODULE__{} = q, column, direction \\ :asc)
when direction in [:asc, :desc] do
%{q | order_by: q.order_by ++ [{to_string(column), direction}]}
end
@doc """
Add a GROUP BY column.
## Examples
iex> SqlBuilder.new() |> SqlBuilder.group_by(:project) |> Map.get(:group_by)
["project"]
iex> SqlBuilder.new() |> SqlBuilder.group_by(:project) |> SqlBuilder.group_by(:status) |> Map.get(:group_by)
["project", "status"]
"""
@spec group_by(t(), String.t() | atom()) :: t()
def group_by(%__MODULE__{} = q, column) do
%{q | group_by: q.group_by ++ [to_string(column)]}
end
@doc """
Set the LIMIT.
## Examples
iex> SqlBuilder.new() |> SqlBuilder.limit(100) |> Map.get(:limit)
100
"""
@spec limit(t(), pos_integer()) :: t()
def limit(%__MODULE__{} = q, n) when is_integer(n) and n > 0, do: %{q | limit: n}
@doc """
Add a ClickHouse SETTINGS key-value pair.
## Examples
iex> SqlBuilder.new() |> SqlBuilder.setting(:max_threads, 4) |> Map.get(:settings)
[max_threads: 4]
"""
@spec setting(t(), atom() | String.t(), term()) :: t()
def setting(%__MODULE__{} = q, key, value) do
%{q | settings: q.settings ++ [{key, value}]}
end
@doc """
Add multiple ClickHouse SETTINGS at once from a keyword list.
## Examples
iex> SqlBuilder.new() |> SqlBuilder.settings(max_threads: 4, timeout: 30) |> Map.get(:settings)
[max_threads: 4, timeout: 30]
"""
@spec settings(t(), keyword()) :: t()
def settings(%__MODULE__{} = q, kv) when is_list(kv) do
Enum.reduce(kv, q, fn {k, v}, acc -> setting(acc, k, v) end)
end
@doc """
Build and return the final SQL string.
## Examples
iex> SqlBuilder.new() |> SqlBuilder.to_sql()
"select *"
iex> SqlBuilder.new() |> SqlBuilder.select([:id, :name]) |> SqlBuilder.from(:users) |> SqlBuilder.to_sql()
"select\\n id\\n ,name\\nfrom users"
iex> SqlBuilder.new()
...> |> SqlBuilder.select(:id)
...> |> SqlBuilder.from(:users)
...> |> SqlBuilder.where("active = {active}", active: true)
...> |> SqlBuilder.order_by(:id, :desc)
...> |> SqlBuilder.limit(10)
...> |> SqlBuilder.to_sql()
"select\\n id\\nfrom users\\nwhere true\\n and active = true\\norder by id desc\\nlimit 10"
"""
@spec to_sql(t()) :: String.t()
def to_sql(%__MODULE__{} = q) do
[
build_select(q),
build_from(q),
build_where(q),
build_group_by(q),
build_order_by(q),
build_limit(q),
build_settings(q)
]
|> Enum.reject(&is_nil/1)
|> Enum.join("\n")
end
# ── Private helpers ──────────────────────────────────────────────────────────
defp build_select(%{select: []}), do: "select *"
defp build_select(%{select: cols}) do
indented = cols |> Enum.join("\n ,")
"select\n #{indented}"
end
defp build_from(%{from: nil}), do: nil
defp build_from(%{from: table}), do: "from #{table}"
defp build_where(%{where: []}), do: nil
defp build_where(%{where: clauses}) do
lines = clauses |> Enum.map(&" and #{&1}") |> Enum.join("\n")
"where true\n#{lines}"
end
defp build_group_by(%{group_by: []}), do: nil
defp build_group_by(%{group_by: cols}), do: "group by #{Enum.join(cols, ", ")}"
defp build_order_by(%{order_by: []}), do: nil
defp build_order_by(%{order_by: cols}) do
parts = Enum.map(cols, fn {col, dir} -> "#{col} #{dir}" end)
"order by #{Enum.join(parts, ", ")}"
end
defp build_limit(%{limit: nil}), do: nil
defp build_limit(%{limit: n}), do: "limit #{n}"
defp build_settings(%{settings: []}), do: nil
defp build_settings(%{settings: kv}) do
lines =
kv
|> Enum.map(fn {k, v} -> " #{k}=#{v}" end)
|> Enum.join(",\n")
"settings\n#{lines}"
end
# Replace {param} tokens with formatted values
defp render_params(clause, []), do: clause
defp render_params(clause, params) do
Enum.reduce(params, clause, fn {key, value}, acc ->
String.replace(acc, "{#{key}}", format_value(value))
end)
end
defp format_value(v) when is_binary(v), do: "'#{v}'"
defp format_value(v) when is_boolean(v), do: to_string(v)
defp format_value(v) when is_atom(v), do: "'#{v}'"
defp format_value(v), do: to_string(v)
end
Doctests
Runs the doctests defined in SqlBuilder to verify the query builder works correctly. All tests should pass before proceeding.
ExUnit.start(autorun: false)
defmodule SqlBuilderTest do
use ExUnit.Case, async: true
doctest SqlBuilder
end
ExUnit.run()
Projects
Fetches a list of active projects from ClickHouse. Queries the OTEL logs table for projects with events in the last 10 minutes, returning both the highest and lowest activity projects. This list is used by the load tester to randomly select projects for querying.
defmodule Projects do
def get_projects(pid) do
table = "otel_logs_e2338e81_aed4_4de0_a5a0_358084c77b85"
sources = ["deno-relay-logs", "deno-subhosting-events"]
base =
SqlBuilder.new()
|> SqlBuilder.select([:project, "count(id) as count"])
|> SqlBuilder.from(table)
|> SqlBuilder.where("timestamp >= now() - interval {minutes} minute", minutes: 10)
# |> SqlBuilder.where_in("source_name", sources)
|> SqlBuilder.where("project is not null")
|> SqlBuilder.where("project != ''")
|> SqlBuilder.group_by(:project)
|> SqlBuilder.limit(1000)
|> SqlBuilder.settings(
query_plan_optimize_lazy_materialization: 1,
use_skip_indexes_for_top_k: 1,
use_skip_indexes_on_data_read: 1,
use_top_k_dynamic_filtering: 1,
query_plan_max_limit_for_top_k_optimization: 3000
)
desc_sql = base |> SqlBuilder.order_by(:count, :desc) |> SqlBuilder.to_sql()
asc_sql = base |> SqlBuilder.order_by(:count, :asc) |> SqlBuilder.to_sql()
desc = Ch.query!(pid, desc_sql) |> Projects.map_projects()
asc = Ch.query!(pid, asc_sql) |> Projects.map_projects()
desc ++ asc
end
def map_projects(%Ch.Result{rows: rows}) do
Enum.map(rows, &hd/1)
end
end
projects = Projects.get_projects(pid)
Logs
Fetches recent log entries for a specific project. Returns the last 100 logs from the past minute, ordered by timestamp descending. This is the query executed repeatedly during load testing.
defmodule Logs do
def get_logs!(pid, project \\ "project") do
table = "otel_logs_alt_e2338e81_aed4_4de0_a5a0_358084c77b85"
sources = ["deno-relay-logs", "deno-subhosting-events"]
sql =
SqlBuilder.new()
|> SqlBuilder.select([:timestamp, :project, :id, :event_message])
|> SqlBuilder.from(table)
|> SqlBuilder.where("timestamp >= now() - interval {minutes} minute", minutes: 1)
|> SqlBuilder.where("project = {project}", project: project)
# |> SqlBuilder.where_in("source_name", sources)
|> SqlBuilder.order_by(:timestamp, :desc)
|> SqlBuilder.limit(100)
|> SqlBuilder.settings(
query_plan_optimize_lazy_materialization: 1,
use_skip_indexes_for_top_k: 1,
use_skip_indexes_on_data_read: 1,
use_top_k_dynamic_filtering: 1,
query_plan_max_limit_for_top_k_optimization: 3000
)
|> SqlBuilder.to_sql()
Ch.query!(pid, sql)
end
end
Logs.get_logs!(pid)
Setup GenServer
Defines the load testing GenServer that orchestrates concurrent queries. Spawns @concurrency (7) parallel tasks every @tick (1 second), each querying logs for a random project. Tracks response times, request counts, and total bytes read from ClickHouse. Periodically prints statistics to the console.
defmodule ClickhouseLoadtest do
use GenServer
@concurrency 7
@requests_per_task 1
@tick 1_000
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(pid: pid, projects: projects, charts: charts) do
puts_state()
do_requests()
{:ok,
%{
times: [],
pid: pid,
times_counter: 0,
projects: projects,
total_read_bytes: 0,
charts: charts,
start_time: System.monotonic_time(:second)
}}
end
def run_requests(pid, concurrency, projects) do
Enum.each(1..concurrency, fn _n ->
project = Enum.random(projects)
Task.start(fn ->
for _n <- 1..@requests_per_task do
:rand.uniform(100) |> Process.sleep()
t0 = System.monotonic_time(:millisecond)
ch_resp =
try do
Logs.get_logs!(pid, project)
rescue
e ->
IO.inspect(e, label: "ERROR")
:error
end
t1 = System.monotonic_time(:millisecond)
duration = t1 - t0
{:ok, %{"read_bytes" => read_bytes}} = parse_ch_response(ch_resp)
read_bytes = String.to_integer(read_bytes)
GenServer.cast(__MODULE__, {:record, {duration, read_bytes}})
end
end)
end)
end
def get_avg do
GenServer.call(__MODULE__, :get_avg)
end
def handle_info(:do_requests, %{pid: pid, projects: projects} = state) do
run_requests(pid, @concurrency, projects)
do_requests()
{:noreply, state}
end
def handle_info(
:puts_state,
%{
times: times,
times_counter: counter,
total_read_bytes: total_read_bytes,
charts: %{response_time: rt_chart, throughput: tp_chart},
start_time: start_time
} = state
) do
last = Enum.take(times, @concurrency)
avg_time = avg(last)
elapsed = System.monotonic_time(:second) - start_time
read_gb = bytes_to_gb(total_read_bytes)
IO.puts("Completed requests: #{counter}")
IO.puts("Avg response time: #{avg_time}")
IO.puts("Total read GBs: #{read_gb}")
# Push aggregated stats to charts
Kino.VegaLite.push(rt_chart, %{time: elapsed, avg_response_time: avg_time})
Kino.VegaLite.push(tp_chart, %{time: elapsed, value: counter, metric: "Requests"})
Kino.VegaLite.push(tp_chart, %{
time: elapsed,
value: Float.round(read_gb, 2),
metric: "GB Read"
})
puts_state()
{:noreply, state}
end
def handle_cast(
{:record, {duration, read_bytes}},
%{times: times, times_counter: counter, total_read_bytes: total_read_bytes} = state
) do
{:noreply,
%{
state
| times: [duration | times],
times_counter: counter + 1,
total_read_bytes: read_bytes + total_read_bytes
}}
end
def handle_call(:get_avg, _from, %{times: times} = state) do
{:reply, avg(times), state}
end
defp parse_ch_response(%Ch.Result{headers: headers}) do
for {"x-clickhouse-summary", summary} <- headers do
Jason.decode(summary)
end
|> hd()
end
defp avg(times) do
if times == [], do: 0, else: Enum.sum(times) / length(times)
end
defp do_requests() do
Process.send_after(self(), :do_requests, @tick)
end
defp puts_state() do
Process.send_after(self(), :puts_state, @tick)
end
defp bytes_to_gb(bytes) do
bytes / 1000 / 1000 / 1000
end
end
Graph
Creates live-updating charts that display metrics as the load test runs. The charts update every tick with aggregated statistics: average response time, total requests, and data read.
alias VegaLite, as: Vl
response_time_chart =
Vl.new(width: 600, height: 250, title: "Average Response Time (ms)")
|> Vl.mark(:line, point: true, tooltip: true, color: "#3b82f6")
|> Vl.encode_field(:x, "time",
type: :quantitative,
title: "Elapsed Time (seconds)"
)
|> Vl.encode_field(:y, "avg_response_time",
type: :quantitative,
title: "Avg Response Time (ms)"
)
|> Kino.VegaLite.new()
throughput_chart =
Vl.new(width: 600, height: 250, title: "Cumulative Requests & Data Read")
|> Vl.mark(:line, point: true, tooltip: true)
|> Vl.encode_field(:x, "time", type: :quantitative, title: "Elapsed Time (seconds)")
|> Vl.encode_field(:y, "value", type: :quantitative, title: "Value")
|> Vl.encode_field(:color, "metric", type: :nominal, title: "Metric")
|> Kino.VegaLite.new()
Kino.Layout.grid([response_time_chart, throughput_chart], columns: 1)
|> Kino.render()
charts = %{response_time: response_time_chart, throughput: throughput_chart}
Start it
Starts the load test GenServer with the live charts. Once started, it immediately begins executing concurrent queries, printing statistics every second, and pushing metrics to the charts in real-time.
ClickhouseLoadtest.start_link(pid: pid, projects: projects, charts: charts)