Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Clickhouse Loadtest

books/clickhouse-loadtest.livemd

Clickhouse Loadtest

Mix.install([
  {:ch, "~> 0.3.2"},
  {:kino, "~> 0.16.0"},
  {:kino_vega_lite, "~> 0.1.11"},
  {:vega_lite, "~> 0.1.10"}
])

Setup

defaults = [
  scheme: "https",
  hostname: System.get_env("LB_SB_CH_HOSTNAME"),
  port: 8443,
  database: "default",
  settings: [],
  pool_size: 25,
  timeout: :timer.seconds(15),
  username: "logflare",
  password: System.get_env("LB_SB_CH_AB6JFB0Q6S_LOGFLARE")
]

# note that starting in ClickHouse 25.1.3.23 `default` user doesn't have
# network access by default in the official Docker images
# see https://github.com/ClickHouse/ClickHouse/pull/75259
{:ok, pid} = Ch.start_link(defaults)
%Ch.Result{} = Ch.query!(pid, "select 1")
defmodule Logs do
  def get_logs(pid) do
    table = "parsed_log_events_0a645cf3_dd0b_447a_8e37_ed7da0433c02"

    sql = """
    select timestamp, id, event_message
    from #{table}
    -- where date(timestamp) = current_date()
    order by timestamp desc
    limit 100;
    """

    Ch.query!(pid, sql)
  end
end

Logs.get_logs(pid)

Setup GenServer

defmodule ClickhouseLoadtest do
  use GenServer

  @concurrency 50

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(pid: pid) do
    puts_state()
    do_requests()
    {:ok, %{times: [], pid: pid, times_counter: 0}}
  end

  def run_requests(pid, concurrency) do
    Enum.each(1..concurrency, fn _n ->
      Task.start(fn ->
        t0 = System.monotonic_time(:millisecond)

        try do
          Logs.get_logs(pid)
        rescue
          e ->
            IO.inspect(e, label: "ERROR")
            :error
        end

        t1 = System.monotonic_time(:millisecond)
        duration = t1 - t0

        GenServer.cast(__MODULE__, {:record_time, duration})
      end)
    end)
  end

  def get_avg do
    GenServer.call(__MODULE__, :get_avg)
  end

  def handle_info(:do_requests, %{pid: pid} = state) do
    run_requests(pid, @concurrency)
    do_requests()
    {:noreply, state}
  end

  def handle_info(:puts_state, %{times: times, times_counter: counter} = state) do
    last = Enum.take(times, @concurrency)
    avg = avg(last)
    IO.puts("Completed requests: #{counter}")
    IO.puts("Avg response time: #{avg}")

    puts_state()
    {:noreply, state}
  end

  def handle_cast({:record_time, duration}, %{times: times, times_counter: counter} = state) do
    {:noreply, %{state | times: [duration | times], times_counter: counter + 1}}
  end

  def handle_call(:get_avg, _from, %{times: times} = state) do
    {:reply, avg(times), state}
  end

  defp avg(times) do
    if times == [], do: 0, else: Enum.sum(times) / length(times)
  end

  defp do_requests() do
    Process.send_after(self(), :do_requests, 1_000)
  end

  defp puts_state() do
    Process.send_after(self(), :puts_state, 1_000)
  end
end
ClickhouseLoadtest.start_link(pid: pid)

Graph

# todo

Graph Test

alias VegaLite, as: Vl

alias TinyColor.{Conversions, HSL, RGB}
color = fn i -> HSL.new(i, 0.8, 0.7) |> Conversions.to_rgb() |> RGB.to_string() end

chart =
  Vl.new(width: 400, height: 400)
  |> Vl.mark(:line, stroke: [signal: "stroke"], stroke_width: 3)
  |> Vl.param("xTitle", value: "X at iteration 0")
  |> Vl.param("stroke", value: "#000")
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative)
  |> Kino.VegaLite.new()
  |> Kino.render()

for i <- 1..150 do
  point = %{x: i / 10, y: :math.sin(i / 10)}
  Kino.VegaLite.push(chart, point)
  Process.sleep(25)
end