Gust vs Airflow Benchmark
This Livebook runs the local Docker benchmark used to compare Gust and Apache Airflow.
It is designed to:
- trigger concurrent workflow runs
-
collect
docker statssamples during execution - write CSV files with timing and container resource data
Mix.install([
{:jason, "~> 1.4"},
{:kino_vega_lite, "~> 0.1.11"},
{:nimble_csv, "~> 1.2"}
])
Setup Overview
The goal is to compare resource consumption between Gust and Airflow while running the same mock I/O workflow.
The benchmark scenarios are:
- 50 parallel runs
- 80 parallel runs
- 100 parallel runs
In the environment used for the original benchmark, Airflow did not complete the 100-run scenario, while Gust did.
For consistent results:
- restart the Docker stack between scenarios
- run each scenario independently
- collect results from the same host environment
Generated CSV files are written to benchmark_results/.
Benchmark Helpers
defmodule Benchmark do
defstruct proj_path: nil,
warmup_time: 10_000,
output_dir: "benchmark_results",
concurrency: 1,
run_id: nil,
target: nil,
cmd_args: nil
def setup_run(%__MODULE__{output_dir: output_dir, target: target, proj_path: path} = bm, cmd) do
File.mkdir_p!(output_dir)
run_id = "#{target}_#{DateTime.utc_now() |> DateTime.to_iso8601() |> String.replace([":", "."], "-")}"
%{bm | run_id: run_id, proj_path: Path.expand("~/#{path}/#{target}"), cmd_args: cmd |> String.split(" ")}
end
def collect_status(%__MODULE__{output_dir: output_dir, target: target, run_id: run_id}, tasks, monitoring_task) do
stats_rows = flatten_stats_samples(monitoring_task, run_id, target)
trigger_rows = format_trigger_results(tasks, run_id, target)
stats_csv_path = Path.join(output_dir, "#{run_id}_docker_stats.csv")
triggers_csv_path = Path.join(output_dir, "#{run_id}_triggers.csv")
write_docker_stats_csv!(stats_csv_path, stats_rows)
write_trigger_results_csv!(triggers_csv_path, trigger_rows)
IO.puts("Done.")
IO.puts("Docker stats CSV: #{stats_csv_path}")
IO.puts("Trigger results CSV: #{triggers_csv_path}")
%{
run_id: run_id,
target: target,
stats_csv_path: stats_csv_path,
triggers_csv_path: triggers_csv_path,
stats_samples_count: length(monitoring_task),
trigger_results: tasks
}
end
defp flatten_stats_samples(stats_samples, run_id, target_name) do
stats_samples
|> Enum.flat_map(fn sample ->
Enum.map(sample.rows, fn row ->
%{
run_id: run_id,
target: target_name,
captured_at: sample.captured_at,
container_id: row["ID"],
container_name: row["Name"],
cpu_perc: row["CPUPerc"],
mem_usage: row["MemUsage"],
mem_perc: row["MemPerc"],
net_io: row["NetIO"],
block_io: row["BlockIO"],
pids: row["PIDs"]
}
end)
end)
end
defp format_trigger_results(trigger_results, run_id, target_name) do
Enum.map(trigger_results, fn result ->
%{
run_id: run_id,
target: target_name,
trigger_number: result.trigger_number,
started_at: result.started_at,
finished_at: result.finished_at,
duration_ms: result.duration_ms,
exit_code: result.exit_code,
output: clean_output(result.output)
}
end)
end
defp write_docker_stats_csv!(path, rows) do
headers = [
"run_id",
"target",
"captured_at",
"container_id",
"container_name",
"cpu_perc",
"mem_usage",
"mem_perc",
"net_io",
"block_io",
"pids"
]
data =
Enum.map(rows, fn row ->
[
row.run_id,
row.target,
row.captured_at,
row.container_id,
row.container_name,
row.cpu_perc,
row.mem_usage,
row.mem_perc,
row.net_io,
row.block_io,
row.pids
]
end)
write_csv!(path, headers, data)
end
defp write_trigger_results_csv!(path, rows) do
headers = [
"run_id",
"target",
"trigger_number",
"started_at",
"finished_at",
"duration_ms",
"exit_code",
"output"
]
data =
Enum.map(rows, fn row ->
[
row.run_id,
row.target,
row.trigger_number,
row.started_at,
row.finished_at,
row.duration_ms,
row.exit_code,
row.output
]
end)
write_csv!(path, headers, data)
end
defp write_csv!(path, headers, rows) do
csv =
[headers | rows]
|> NimbleCSV.RFC4180.dump_to_iodata()
File.write!(path, csv)
end
defp clean_output(nil), do: ""
defp clean_output(output) do
output
|> String.replace("\n", " ")
|> String.trim()
end
end
Docker Wrapper
defmodule Docker do
defstruct bin: nil, benchmark: nil
def new(%Benchmark{} = bm) do
%__MODULE__{
benchmark: bm,
bin: System.find_executable("docker") || "/usr/local/bin/docker"
}
end
end
Triggering And Monitoring
defmodule DockerRunner do
def trigger_run_stream(%Docker{
bin: docker_bin,
benchmark: %Benchmark{
proj_path: proj_path,
concurrency: concurrency,
cmd_args: cmd_args
}}, docker_service) do
1..concurrency
|> Task.async_stream(
fn i ->
IO.puts("Trigger: #{i}")
started_at = DateTime.utc_now()
started_monotonic = System.monotonic_time(:millisecond)
cmd = ["compose", "exec", "-T", docker_service] ++ cmd_args
{output, exit_code} =
System.cmd(
docker_bin,
cmd,
cd: proj_path,
stderr_to_stdout: true
)
if exit_code != 0, do: raise "Trigger failed: Exit: #{exit_code}; Output: #{output}"
finished_at = DateTime.utc_now()
duration_ms = System.monotonic_time(:millisecond) - started_monotonic
%{
trigger_number: i,
started_at: started_at,
finished_at: finished_at,
duration_ms: duration_ms,
output: output,
exit_code: exit_code
}
end,
ordered: true,
timeout: 90_000,
max_concurrency: concurrency
)
|> Enum.map(fn
{:ok, result} ->
result
{:exit, reason} ->
%{
trigger_number: nil,
started_at: nil,
finished_at: DateTime.utc_now(),
duration_ms: nil,
output: inspect(reason),
exit_code: -1
}
end)
end
def async_monitor_stats(%Docker{bin: docker_bin, benchmark: %Benchmark{proj_path: proj_path}}, %{every: stats_interval, stop_after: stop_after}) do
Task.async(fn ->
started_at = System.monotonic_time(:millisecond)
Stream.interval(stats_interval)
|> Task.async_stream(
fn _tick ->
{output, exit_code} =
System.cmd(
docker_bin,
["stats", "--no-stream", "--format", "{{json .}}"],
cd: proj_path,
stderr_to_stdout: true
)
IO.puts("Collecting docker stats...")
rows =
if exit_code == 0 do
output
|> String.split("\n", trim: true)
|> Enum.map(&Jason.decode!/1)
else
raise "Docker stats failed: EXIT #{exit_code} OUTPUT: #{output}"
end
%{
captured_at: DateTime.utc_now(),
rows: rows
}
end,
ordered: true,
timeout: 90_000,
max_concurrency: 1
)
|> Enum.reduce_while([], fn
{:ok, sample}, acc ->
elapsed = System.monotonic_time(:millisecond) - started_at
if elapsed >= stop_after do
IO.puts("Stoped monitoring stats...")
{:halt, Enum.reverse([sample | acc])}
else
{:cont, [sample | acc]}
end
{:exit, reason}, acc ->
error_sample = %{
captured_at: DateTime.utc_now(),
error: inspect(reason),
rows: []
}
{:cont, [error_sample | acc]}
end)
end)
end
end
Run Helper
defmodule Runner do
def run(%{cmd: cmd, proj_path: proj_path, target: target, service: service}) do
bm = %Benchmark{
proj_path: proj_path,
target: target,
} |> Benchmark.setup_run(cmd)
docker = Docker.new(bm)
monitor_task = docker |> DockerRunner.async_monitor_stats(%{every: 1_000, stop_after: 10_000})
tasks = docker |> DockerRunner.trigger_run_stream(service)
stats_samples = Task.await(monitor_task, :infinity)
bm |> Benchmark.collect_status(tasks, stats_samples)
end
end
Example Invocations
Uncomment the target you want to run. In the sample below, the Airflow invocation is enabled and the Gust example remains commented out.
# Runner.run(%{
# proj_path: "Projects/gust-benchmark/",
# target: :gust,
# cmd: "/app/bin/gust-cli trigger_run benchmark_mock_io",
# service: "gust"
# })
Runner.run(%{
proj_path: "Projects/gust-benchmark/",
target: :airflow,
cmd: "airflow dags trigger benchmark_mock_io",
service: "airflow-scheduler"
})