Powered by AppSignal & Oban Pro

Clickhouse Loadtest

books/clickhouse-loadtest.livemd

Clickhouse Loadtest

Section

A load testing tool for ClickHouse databases. Simulates concurrent query load by repeatedly fetching logs for random projects, measuring response times, and tracking bytes read. Useful for benchmarking query performance and testing ClickHouse cluster capacity.

Mix.install([
  {:ch, "~> 0.3.2"},
  {:kino, "~> 0.16.0"},
  {:kino_vega_lite, "~> 0.1.11"},
  {:vega_lite, "~> 0.1.10"},
  {:jason, "~> 1.4"}
])

Setup

Establishes the ClickHouse connection using credentials from environment variables. Creates a connection pool with 25 connections and a 15-second timeout. The pid returned is used by all subsequent queries.

defaults = [
  scheme: "https",
  hostname: System.get_env("LB_SB_CH_HOSTNAME"),
  port: 8443,
  database: "default",
  settings: [],
  pool_size: 25,
  timeout: :timer.seconds(15),
  username: "chase_read_only",
  password: System.get_env("LB_SB_CH_AB6JFB0Q6S_LOGFLARE"),
  queue_target: 1_000
]

# note that starting in ClickHouse 25.1.3.23 `default` user doesn't have
# network access by default in the official Docker images
# see https://github.com/ClickHouse/ClickHouse/pull/75259
{:ok, pid} = Ch.start_link(defaults)
%Ch.Result{} = Ch.query!(pid, "select 1")

SqlBuilder

A composable SQL query builder for constructing dynamic ClickHouse queries. Uses the builder pattern with pipe-friendly functions. Supports named parameter substitution ({param} syntax) with automatic quoting for strings.

defmodule SqlBuilder do
  @moduledoc """
  A composable SQL query builder for constructing dynamic queries.

  ## Example

      SqlBuilder.new()
      |> SqlBuilder.select([:timestamp, :project, :id, :event_message])
      |> SqlBuilder.from("my_table")
      |> SqlBuilder.where("timestamp >= now() - interval {minutes} minute", minutes: 10)
      |> SqlBuilder.where("project = {project}", project: "my_project")
      |> SqlBuilder.where_in("source_name", ["deno-relay-logs", "deno-subhosting-events"])
      |> SqlBuilder.order_by(:timestamp, :desc)
      |> SqlBuilder.limit(100)
      |> SqlBuilder.setting(:query_plan_optimize_lazy_materialization, 1)
      |> SqlBuilder.to_sql()
  """

  @type t :: %__MODULE__{
          select: [String.t()],
          from: String.t() | nil,
          where: [String.t()],
          params: keyword(),
          order_by: [{String.t(), :asc | :desc}],
          limit: pos_integer() | nil,
          settings: keyword()
        }

  defstruct select: [],
            from: nil,
            where: [],
            params: [],
            order_by: [],
            group_by: [],
            limit: nil,
            settings: []

  @doc """
  Create a new empty query builder.

  ## Examples

      iex> SqlBuilder.new()
      %SqlBuilder{select: [], from: nil, where: [], params: [], order_by: [], group_by: [], limit: nil, settings: []}
  """
  @spec new() :: t()
  def new, do: %__MODULE__{}

  @doc """
  Set the SELECT columns. Accepts atoms or strings.

  ## Examples

      iex> SqlBuilder.new() |> SqlBuilder.select(:id) |> Map.get(:select)
      ["id"]

      iex> SqlBuilder.new() |> SqlBuilder.select([:id, :name, "count(*)"]) |> Map.get(:select)
      ["id", "name", "count(*)"]
  """
  @spec select(t(), [atom() | String.t()] | atom() | String.t()) :: t()
  def select(%__MODULE__{} = q, columns) when is_list(columns) do
    cols = Enum.map(columns, &to_string/1)
    %{q | select: q.select ++ cols}
  end

  def select(%__MODULE__{} = q, column), do: select(q, [column])

  @doc """
  Set the FROM table. Accepts a string or atom.

  ## Examples

      iex> SqlBuilder.new() |> SqlBuilder.from(:users) |> Map.get(:from)
      "users"

      iex> SqlBuilder.new() |> SqlBuilder.from("events") |> Map.get(:from)
      "events"
  """
  @spec from(t(), String.t() | atom()) :: t()
  def from(%__MODULE__{} = q, table), do: %{q | from: to_string(table)}

  @doc """
  Add a WHERE clause with optional named parameters.

  Parameters are written as `{param_name}` in the clause string and
  passed as a keyword list. String values are automatically quoted.

  ## Examples

      iex> SqlBuilder.new() |> SqlBuilder.where("active = true") |> Map.get(:where)
      ["active = true"]

      iex> SqlBuilder.new() |> SqlBuilder.where("name = {name}", name: "alice") |> Map.get(:where)
      ["name = 'alice'"]

      iex> SqlBuilder.new() |> SqlBuilder.where("count > {n}", n: 10) |> Map.get(:where)
      ["count > 10"]
  """
  @spec where(t(), String.t(), keyword()) :: t()
  def where(%__MODULE__{} = q, clause, params \\ []) do
    rendered = render_params(clause, params)
    %{q | where: q.where ++ [rendered]}
  end

  @doc """
  Add a WHERE col IN (...) clause.

  ## Examples

      iex> SqlBuilder.new() |> SqlBuilder.where_in(:status, ["active", "pending"]) |> Map.get(:where)
      ["status in ('active', 'pending')"]

      iex> SqlBuilder.new() |> SqlBuilder.where_in("id", [1, 2, 3]) |> Map.get(:where)
      ["id in (1, 2, 3)"]
  """
  @spec where_in(t(), String.t() | atom(), [term()]) :: t()
  def where_in(%__MODULE__{} = q, column, values) when is_list(values) do
    formatted =
      values
      |> Enum.map(&format_value/1)
      |> Enum.join(", ")

    clause = "#{column} in (#{formatted})"
    %{q | where: q.where ++ [clause]}
  end

  @doc """
  Add an ORDER BY clause.

  ## Examples

      iex> SqlBuilder.new() |> SqlBuilder.order_by(:created_at) |> Map.get(:order_by)
      [{"created_at", :asc}]

      iex> SqlBuilder.new() |> SqlBuilder.order_by(:timestamp, :desc) |> Map.get(:order_by)
      [{"timestamp", :desc}]
  """
  @spec order_by(t(), String.t() | atom(), :asc | :desc) :: t()
  def order_by(%__MODULE__{} = q, column, direction \\ :asc)
      when direction in [:asc, :desc] do
    %{q | order_by: q.order_by ++ [{to_string(column), direction}]}
  end

  @doc """
  Add a GROUP BY column.

  ## Examples

      iex> SqlBuilder.new() |> SqlBuilder.group_by(:project) |> Map.get(:group_by)
      ["project"]

      iex> SqlBuilder.new() |> SqlBuilder.group_by(:project) |> SqlBuilder.group_by(:status) |> Map.get(:group_by)
      ["project", "status"]
  """
  @spec group_by(t(), String.t() | atom()) :: t()
  def group_by(%__MODULE__{} = q, column) do
    %{q | group_by: q.group_by ++ [to_string(column)]}
  end

  @doc """
  Set the LIMIT.

  ## Examples

      iex> SqlBuilder.new() |> SqlBuilder.limit(100) |> Map.get(:limit)
      100
  """
  @spec limit(t(), pos_integer()) :: t()
  def limit(%__MODULE__{} = q, n) when is_integer(n) and n > 0, do: %{q | limit: n}

  @doc """
  Add a ClickHouse SETTINGS key-value pair.

  ## Examples

      iex> SqlBuilder.new() |> SqlBuilder.setting(:max_threads, 4) |> Map.get(:settings)
      [max_threads: 4]
  """
  @spec setting(t(), atom() | String.t(), term()) :: t()
  def setting(%__MODULE__{} = q, key, value) do
    %{q | settings: q.settings ++ [{key, value}]}
  end

  @doc """
  Add multiple ClickHouse SETTINGS at once from a keyword list.

  ## Examples

      iex> SqlBuilder.new() |> SqlBuilder.settings(max_threads: 4, timeout: 30) |> Map.get(:settings)
      [max_threads: 4, timeout: 30]
  """
  @spec settings(t(), keyword()) :: t()
  def settings(%__MODULE__{} = q, kv) when is_list(kv) do
    Enum.reduce(kv, q, fn {k, v}, acc -> setting(acc, k, v) end)
  end

  @doc """
  Build and return the final SQL string.

  ## Examples

      iex> SqlBuilder.new() |> SqlBuilder.to_sql()
      "select *"

      iex> SqlBuilder.new() |> SqlBuilder.select([:id, :name]) |> SqlBuilder.from(:users) |> SqlBuilder.to_sql()
      "select\\n  id\\n  ,name\\nfrom users"

      iex> SqlBuilder.new()
      ...> |> SqlBuilder.select(:id)
      ...> |> SqlBuilder.from(:users)
      ...> |> SqlBuilder.where("active = {active}", active: true)
      ...> |> SqlBuilder.order_by(:id, :desc)
      ...> |> SqlBuilder.limit(10)
      ...> |> SqlBuilder.to_sql()
      "select\\n  id\\nfrom users\\nwhere true\\n  and active = true\\norder by id desc\\nlimit 10"
  """
  @spec to_sql(t()) :: String.t()
  def to_sql(%__MODULE__{} = q) do
    [
      build_select(q),
      build_from(q),
      build_where(q),
      build_group_by(q),
      build_order_by(q),
      build_limit(q),
      build_settings(q)
    ]
    |> Enum.reject(&is_nil/1)
    |> Enum.join("\n")
  end

  # ── Private helpers ──────────────────────────────────────────────────────────

  defp build_select(%{select: []}), do: "select *"

  defp build_select(%{select: cols}) do
    indented = cols |> Enum.join("\n  ,")
    "select\n  #{indented}"
  end

  defp build_from(%{from: nil}), do: nil
  defp build_from(%{from: table}), do: "from #{table}"

  defp build_where(%{where: []}), do: nil

  defp build_where(%{where: clauses}) do
    lines = clauses |> Enum.map(&"  and #{&1}") |> Enum.join("\n")
    "where true\n#{lines}"
  end

  defp build_group_by(%{group_by: []}), do: nil
  defp build_group_by(%{group_by: cols}), do: "group by #{Enum.join(cols, ", ")}"

  defp build_order_by(%{order_by: []}), do: nil

  defp build_order_by(%{order_by: cols}) do
    parts = Enum.map(cols, fn {col, dir} -> "#{col} #{dir}" end)
    "order by #{Enum.join(parts, ", ")}"
  end

  defp build_limit(%{limit: nil}), do: nil
  defp build_limit(%{limit: n}), do: "limit #{n}"

  defp build_settings(%{settings: []}), do: nil

  defp build_settings(%{settings: kv}) do
    lines =
      kv
      |> Enum.map(fn {k, v} -> "  #{k}=#{v}" end)
      |> Enum.join(",\n")

    "settings\n#{lines}"
  end

  # Replace {param} tokens with formatted values
  defp render_params(clause, []), do: clause

  defp render_params(clause, params) do
    Enum.reduce(params, clause, fn {key, value}, acc ->
      String.replace(acc, "{#{key}}", format_value(value))
    end)
  end

  defp format_value(v) when is_binary(v), do: "'#{v}'"
  defp format_value(v) when is_boolean(v), do: to_string(v)
  defp format_value(v) when is_atom(v), do: "'#{v}'"
  defp format_value(v), do: to_string(v)
end

Doctests

Runs the doctests defined in SqlBuilder to verify the query builder works correctly. All tests should pass before proceeding.

ExUnit.start(autorun: false)

defmodule SqlBuilderTest do
  use ExUnit.Case, async: true
  doctest SqlBuilder
end

ExUnit.run()

Projects

Fetches a list of active projects from ClickHouse. Queries the OTEL logs table for projects with events in the last 10 minutes, returning both the highest and lowest activity projects. This list is used by the load tester to randomly select projects for querying.

defmodule Projects do
  def get_projects(pid) do
    table = "otel_logs_e2338e81_aed4_4de0_a5a0_358084c77b85"
    sources = ["deno-relay-logs", "deno-subhosting-events"]

    base =
      SqlBuilder.new()
      |> SqlBuilder.select([:project, "count(id) as count"])
      |> SqlBuilder.from(table)
      |> SqlBuilder.where("timestamp >= now() - interval {minutes} minute", minutes: 10)
      # |> SqlBuilder.where_in("source_name", sources)
      |> SqlBuilder.where("project is not null")
      |> SqlBuilder.where("project != ''")
      |> SqlBuilder.group_by(:project)
      |> SqlBuilder.limit(1000)
      |> SqlBuilder.settings(
        query_plan_optimize_lazy_materialization: 1,
        use_skip_indexes_for_top_k: 1,
        use_skip_indexes_on_data_read: 1,
        use_top_k_dynamic_filtering: 1,
        query_plan_max_limit_for_top_k_optimization: 3000
      )

    desc_sql = base |> SqlBuilder.order_by(:count, :desc) |> SqlBuilder.to_sql()
    asc_sql = base |> SqlBuilder.order_by(:count, :asc) |> SqlBuilder.to_sql()

    desc = Ch.query!(pid, desc_sql) |> Projects.map_projects()
    asc = Ch.query!(pid, asc_sql) |> Projects.map_projects()

    desc ++ asc
  end

  def map_projects(%Ch.Result{rows: rows}) do
    Enum.map(rows, &hd/1)
  end
end

projects = Projects.get_projects(pid)

Logs

Fetches recent log entries for a specific project. Returns the last 100 logs from the past minute, ordered by timestamp descending. This is the query executed repeatedly during load testing.

defmodule Logs do
  def get_logs!(pid, project \\ "project") do
    table = "otel_logs_alt_e2338e81_aed4_4de0_a5a0_358084c77b85"
    sources = ["deno-relay-logs", "deno-subhosting-events"]

    sql =
      SqlBuilder.new()
      |> SqlBuilder.select([:timestamp, :project, :id, :event_message])
      |> SqlBuilder.from(table)
      |> SqlBuilder.where("timestamp >= now() - interval {minutes} minute", minutes: 1)
      |> SqlBuilder.where("project = {project}", project: project)
      # |> SqlBuilder.where_in("source_name", sources)
      |> SqlBuilder.order_by(:timestamp, :desc)
      |> SqlBuilder.limit(100)
      |> SqlBuilder.settings(
        query_plan_optimize_lazy_materialization: 1,
        use_skip_indexes_for_top_k: 1,
        use_skip_indexes_on_data_read: 1,
        use_top_k_dynamic_filtering: 1,
        query_plan_max_limit_for_top_k_optimization: 3000
      )
      |> SqlBuilder.to_sql()

    Ch.query!(pid, sql)
  end
end

Logs.get_logs!(pid)

Setup GenServer

Defines the load testing GenServer that orchestrates concurrent queries. Spawns @concurrency (7) parallel tasks every @tick (1 second), each querying logs for a random project. Tracks response times, request counts, and total bytes read from ClickHouse. Periodically prints statistics to the console.

defmodule ClickhouseLoadtest do
  use GenServer

  @concurrency 7
  @requests_per_task 1
  @tick 1_000

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(pid: pid, projects: projects, charts: charts) do
    puts_state()
    do_requests()

    {:ok,
     %{
       times: [],
       pid: pid,
       times_counter: 0,
       projects: projects,
       total_read_bytes: 0,
       charts: charts,
       start_time: System.monotonic_time(:second)
     }}
  end

  def run_requests(pid, concurrency, projects) do
    Enum.each(1..concurrency, fn _n ->
      project = Enum.random(projects)

      Task.start(fn ->
        for _n <- 1..@requests_per_task do
          :rand.uniform(100) |> Process.sleep()
          t0 = System.monotonic_time(:millisecond)

          ch_resp =
            try do
              Logs.get_logs!(pid, project)
            rescue
              e ->
                IO.inspect(e, label: "ERROR")
                :error
            end

          t1 = System.monotonic_time(:millisecond)
          duration = t1 - t0
          {:ok, %{"read_bytes" => read_bytes}} = parse_ch_response(ch_resp)
          read_bytes = String.to_integer(read_bytes)

          GenServer.cast(__MODULE__, {:record, {duration, read_bytes}})
        end
      end)
    end)
  end

  def get_avg do
    GenServer.call(__MODULE__, :get_avg)
  end

  def handle_info(:do_requests, %{pid: pid, projects: projects} = state) do
    run_requests(pid, @concurrency, projects)
    do_requests()
    {:noreply, state}
  end

  def handle_info(
        :puts_state,
        %{
          times: times,
          times_counter: counter,
          total_read_bytes: total_read_bytes,
          charts: %{response_time: rt_chart, throughput: tp_chart},
          start_time: start_time
        } = state
      ) do
    last = Enum.take(times, @concurrency)
    avg_time = avg(last)
    elapsed = System.monotonic_time(:second) - start_time
    read_gb = bytes_to_gb(total_read_bytes)

    IO.puts("Completed requests: #{counter}")
    IO.puts("Avg response time: #{avg_time}")
    IO.puts("Total read GBs: #{read_gb}")

    # Push aggregated stats to charts
    Kino.VegaLite.push(rt_chart, %{time: elapsed, avg_response_time: avg_time})

    Kino.VegaLite.push(tp_chart, %{time: elapsed, value: counter, metric: "Requests"})

    Kino.VegaLite.push(tp_chart, %{
      time: elapsed,
      value: Float.round(read_gb, 2),
      metric: "GB Read"
    })

    puts_state()
    {:noreply, state}
  end

  def handle_cast(
        {:record, {duration, read_bytes}},
        %{times: times, times_counter: counter, total_read_bytes: total_read_bytes} = state
      ) do
    {:noreply,
     %{
       state
       | times: [duration | times],
         times_counter: counter + 1,
         total_read_bytes: read_bytes + total_read_bytes
     }}
  end

  def handle_call(:get_avg, _from, %{times: times} = state) do
    {:reply, avg(times), state}
  end

  defp parse_ch_response(%Ch.Result{headers: headers}) do
    for {"x-clickhouse-summary", summary} <- headers do
      Jason.decode(summary)
    end
    |> hd()
  end

  defp avg(times) do
    if times == [], do: 0, else: Enum.sum(times) / length(times)
  end

  defp do_requests() do
    Process.send_after(self(), :do_requests, @tick)
  end

  defp puts_state() do
    Process.send_after(self(), :puts_state, @tick)
  end

  defp bytes_to_gb(bytes) do
    bytes / 1000 / 1000 / 1000
  end
end

Graph

Creates live-updating charts that display metrics as the load test runs. The charts update every tick with aggregated statistics: average response time, total requests, and data read.

alias VegaLite, as: Vl

response_time_chart =
  Vl.new(width: 600, height: 250, title: "Average Response Time (ms)")
  |> Vl.mark(:line, point: true, tooltip: true, color: "#3b82f6")
  |> Vl.encode_field(:x, "time",
    type: :quantitative,
    title: "Elapsed Time (seconds)"
  )
  |> Vl.encode_field(:y, "avg_response_time",
    type: :quantitative,
    title: "Avg Response Time (ms)"
  )
  |> Kino.VegaLite.new()

throughput_chart =
  Vl.new(width: 600, height: 250, title: "Cumulative Requests & Data Read")
  |> Vl.mark(:line, point: true, tooltip: true)
  |> Vl.encode_field(:x, "time", type: :quantitative, title: "Elapsed Time (seconds)")
  |> Vl.encode_field(:y, "value", type: :quantitative, title: "Value")
  |> Vl.encode_field(:color, "metric", type: :nominal, title: "Metric")
  |> Kino.VegaLite.new()

Kino.Layout.grid([response_time_chart, throughput_chart], columns: 1)
|> Kino.render()

charts = %{response_time: response_time_chart, throughput: throughput_chart}

Start it

Starts the load test GenServer with the live charts. Once started, it immediately begins executing concurrent queries, printing statistics every second, and pushing metrics to the charts in real-time.

ClickhouseLoadtest.start_link(pid: pid, projects: projects, charts: charts)