Clickhouse Loadtest
Mix.install([
{:ch, "~> 0.3.2"},
{:kino, "~> 0.16.0"},
{:kino_vega_lite, "~> 0.1.11"},
{:vega_lite, "~> 0.1.10"}
])
Setup
defaults = [
scheme: "https",
hostname: System.get_env("LB_SB_CH_HOSTNAME"),
port: 8443,
database: "default",
settings: [],
pool_size: 25,
timeout: :timer.seconds(15),
username: "logflare",
password: System.get_env("LB_SB_CH_AB6JFB0Q6S_LOGFLARE")
]
# note that starting in ClickHouse 25.1.3.23 `default` user doesn't have
# network access by default in the official Docker images
# see https://github.com/ClickHouse/ClickHouse/pull/75259
{:ok, pid} = Ch.start_link(defaults)
%Ch.Result{} = Ch.query!(pid, "select 1")
defmodule Logs do
def get_logs(pid) do
table = "parsed_log_events_0a645cf3_dd0b_447a_8e37_ed7da0433c02"
sql = """
select timestamp, id, event_message
from #{table}
-- where date(timestamp) = current_date()
order by timestamp desc
limit 100;
"""
Ch.query!(pid, sql)
end
end
Logs.get_logs(pid)
Setup GenServer
defmodule ClickhouseLoadtest do
use GenServer
@concurrency 50
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(pid: pid) do
puts_state()
do_requests()
{:ok, %{times: [], pid: pid, times_counter: 0}}
end
def run_requests(pid, concurrency) do
Enum.each(1..concurrency, fn _n ->
Task.start(fn ->
t0 = System.monotonic_time(:millisecond)
try do
Logs.get_logs(pid)
rescue
e ->
IO.inspect(e, label: "ERROR")
:error
end
t1 = System.monotonic_time(:millisecond)
duration = t1 - t0
GenServer.cast(__MODULE__, {:record_time, duration})
end)
end)
end
def get_avg do
GenServer.call(__MODULE__, :get_avg)
end
def handle_info(:do_requests, %{pid: pid} = state) do
run_requests(pid, @concurrency)
do_requests()
{:noreply, state}
end
def handle_info(:puts_state, %{times: times, times_counter: counter} = state) do
last = Enum.take(times, @concurrency)
avg = avg(last)
IO.puts("Completed requests: #{counter}")
IO.puts("Avg response time: #{avg}")
puts_state()
{:noreply, state}
end
def handle_cast({:record_time, duration}, %{times: times, times_counter: counter} = state) do
{:noreply, %{state | times: [duration | times], times_counter: counter + 1}}
end
def handle_call(:get_avg, _from, %{times: times} = state) do
{:reply, avg(times), state}
end
defp avg(times) do
if times == [], do: 0, else: Enum.sum(times) / length(times)
end
defp do_requests() do
Process.send_after(self(), :do_requests, 1_000)
end
defp puts_state() do
Process.send_after(self(), :puts_state, 1_000)
end
end
ClickhouseLoadtest.start_link(pid: pid)
Graph
# todo
Graph Test
alias VegaLite, as: Vl
alias TinyColor.{Conversions, HSL, RGB}
color = fn i -> HSL.new(i, 0.8, 0.7) |> Conversions.to_rgb() |> RGB.to_string() end
chart =
Vl.new(width: 400, height: 400)
|> Vl.mark(:line, stroke: [signal: "stroke"], stroke_width: 3)
|> Vl.param("xTitle", value: "X at iteration 0")
|> Vl.param("stroke", value: "#000")
|> Vl.encode_field(:x, "x", type: :quantitative)
|> Vl.encode_field(:y, "y", type: :quantitative)
|> Kino.VegaLite.new()
|> Kino.render()
for i <- 1..150 do
point = %{x: i / 10, y: :math.sin(i / 10)}
Kino.VegaLite.push(chart, point)
Process.sleep(25)
end