Powered by AppSignal & Oban Pro

Gust vs Airflow Benchmark

gust_vs_airflow_benchmark.livemd

Gust vs Airflow Benchmark

This Livebook runs the local Docker benchmark used to compare Gust and Apache Airflow.

It is designed to:

  • trigger concurrent workflow runs
  • collect docker stats samples during execution
  • write CSV files with timing and container resource data
Mix.install([
  {:jason, "~> 1.4"},
  {:kino_vega_lite, "~> 0.1.11"},
    {:nimble_csv, "~> 1.2"}
])

Setup Overview

The goal is to compare resource consumption between Gust and Airflow while running the same mock I/O workflow.

The benchmark scenarios are:

  • 50 parallel runs
  • 80 parallel runs
  • 100 parallel runs

In the environment used for the original benchmark, Airflow did not complete the 100-run scenario, while Gust did.

For consistent results:

  • restart the Docker stack between scenarios
  • run each scenario independently
  • collect results from the same host environment

Generated CSV files are written to benchmark_results/.

Benchmark Helpers

defmodule Benchmark do

  defstruct proj_path: nil,
    warmup_time: 10_000,
    output_dir: "benchmark_results",
    concurrency: 1,
    run_id: nil,
    target: nil,
    cmd_args: nil

  def setup_run(%__MODULE__{output_dir: output_dir, target: target, proj_path: path} = bm, cmd) do
    File.mkdir_p!(output_dir)
    run_id = "#{target}_#{DateTime.utc_now() |> DateTime.to_iso8601() |> String.replace([":", "."], "-")}"
    %{bm | run_id: run_id, proj_path: Path.expand("~/#{path}/#{target}"), cmd_args: cmd |> String.split(" ")}
  end

  def collect_status(%__MODULE__{output_dir: output_dir, target: target, run_id: run_id}, tasks, monitoring_task) do
    stats_rows = flatten_stats_samples(monitoring_task, run_id, target)
    trigger_rows = format_trigger_results(tasks, run_id, target)

    stats_csv_path = Path.join(output_dir, "#{run_id}_docker_stats.csv")
    triggers_csv_path = Path.join(output_dir, "#{run_id}_triggers.csv")

    write_docker_stats_csv!(stats_csv_path, stats_rows)
    write_trigger_results_csv!(triggers_csv_path, trigger_rows)

    IO.puts("Done.")
    IO.puts("Docker stats CSV: #{stats_csv_path}")
    IO.puts("Trigger results CSV: #{triggers_csv_path}")

    %{
      run_id: run_id,
      target: target,
      stats_csv_path: stats_csv_path,
      triggers_csv_path: triggers_csv_path,
      stats_samples_count: length(monitoring_task),
      trigger_results: tasks
    }
  end

    defp flatten_stats_samples(stats_samples, run_id, target_name) do
    stats_samples
    |> Enum.flat_map(fn sample ->
      Enum.map(sample.rows, fn row ->
        %{
          run_id: run_id,
          target: target_name,
          captured_at: sample.captured_at,
          container_id: row["ID"],
          container_name: row["Name"],
          cpu_perc: row["CPUPerc"],
          mem_usage: row["MemUsage"],
          mem_perc: row["MemPerc"],
          net_io: row["NetIO"],
          block_io: row["BlockIO"],
          pids: row["PIDs"]
        }
      end)
    end)
  end

  defp format_trigger_results(trigger_results, run_id, target_name) do
    Enum.map(trigger_results, fn result ->
      %{
        run_id: run_id,
        target: target_name,
        trigger_number: result.trigger_number,
        started_at: result.started_at,
        finished_at: result.finished_at,
        duration_ms: result.duration_ms,
        exit_code: result.exit_code,
        output: clean_output(result.output)
      }
    end)
  end

  defp write_docker_stats_csv!(path, rows) do
    headers = [
      "run_id",
      "target",
      "captured_at",
      "container_id",
      "container_name",
      "cpu_perc",
      "mem_usage",
      "mem_perc",
      "net_io",
      "block_io",
      "pids"
    ]

    data =
      Enum.map(rows, fn row ->
        [
          row.run_id,
          row.target,
          row.captured_at,
          row.container_id,
          row.container_name,
          row.cpu_perc,
          row.mem_usage,
          row.mem_perc,
          row.net_io,
          row.block_io,
          row.pids
        ]
      end)

    write_csv!(path, headers, data)
  end

  defp write_trigger_results_csv!(path, rows) do
    headers = [
      "run_id",
      "target",
      "trigger_number",
      "started_at",
      "finished_at",
      "duration_ms",
      "exit_code",
      "output"
    ]

    data =
      Enum.map(rows, fn row ->
        [
          row.run_id,
          row.target,
          row.trigger_number,
          row.started_at,
          row.finished_at,
          row.duration_ms,
          row.exit_code,
          row.output
        ]
      end)

    write_csv!(path, headers, data)
  end

  defp write_csv!(path, headers, rows) do
    csv =
      [headers | rows]
      |> NimbleCSV.RFC4180.dump_to_iodata()

    File.write!(path, csv)
  end

  defp clean_output(nil), do: ""

  defp clean_output(output) do
    output
    |> String.replace("\n", " ")
    |> String.trim()
  end
end

Docker Wrapper

defmodule Docker do
  defstruct bin: nil, benchmark: nil

  def new(%Benchmark{} = bm) do
    %__MODULE__{
      benchmark: bm,
      bin: System.find_executable("docker") || "/usr/local/bin/docker"
    }
  end
end

Triggering And Monitoring

defmodule DockerRunner do
  def trigger_run_stream(%Docker{
    bin: docker_bin,
    benchmark: %Benchmark{
      proj_path: proj_path,
      concurrency: concurrency,
      cmd_args: cmd_args
    }}, docker_service) do
    1..concurrency
    |> Task.async_stream(
      fn i ->
        IO.puts("Trigger: #{i}")

        started_at = DateTime.utc_now()
        started_monotonic = System.monotonic_time(:millisecond)

        cmd = ["compose", "exec", "-T", docker_service] ++ cmd_args
        {output, exit_code} =
          System.cmd(
            docker_bin,
            cmd,
            cd: proj_path,
            stderr_to_stdout: true
          )

        if exit_code != 0, do: raise "Trigger failed: Exit: #{exit_code}; Output: #{output}"

        finished_at = DateTime.utc_now()
        duration_ms = System.monotonic_time(:millisecond) - started_monotonic

        %{
          trigger_number: i,
          started_at: started_at,
          finished_at: finished_at,
          duration_ms: duration_ms,
          output: output,
          exit_code: exit_code
        }
      end,
      ordered: true,
      timeout: 90_000,
      max_concurrency: concurrency
    )
    |> Enum.map(fn
      {:ok, result} ->
        result

      {:exit, reason} ->
        %{
          trigger_number: nil,
          started_at: nil,
          finished_at: DateTime.utc_now(),
          duration_ms: nil,
          output: inspect(reason),
          exit_code: -1
        }
    end)
  end

  def async_monitor_stats(%Docker{bin: docker_bin, benchmark: %Benchmark{proj_path: proj_path}}, %{every: stats_interval, stop_after: stop_after}) do
    Task.async(fn ->
        started_at = System.monotonic_time(:millisecond)  
        Stream.interval(stats_interval)
        |> Task.async_stream(
          fn _tick ->
            {output, exit_code} =
              System.cmd(
                docker_bin,
                ["stats", "--no-stream", "--format", "{{json .}}"],
                cd: proj_path,
                stderr_to_stdout: true
              )
            IO.puts("Collecting docker stats...")
            rows =
              if exit_code == 0 do
                output
                |> String.split("\n", trim: true)
                |> Enum.map(&Jason.decode!/1)
              else
                raise "Docker stats failed: EXIT #{exit_code} OUTPUT: #{output}"              
              end
  
            %{
              captured_at: DateTime.utc_now(),
              rows: rows
            }
          end,
          ordered: true,
          timeout: 90_000,
          max_concurrency: 1
        )
        |> Enum.reduce_while([], fn
          {:ok, sample}, acc ->
            elapsed = System.monotonic_time(:millisecond) - started_at
  
            if elapsed >= stop_after do
              IO.puts("Stoped monitoring stats...")
              {:halt, Enum.reverse([sample | acc])}
            else
              {:cont, [sample | acc]}
            end
  
          {:exit, reason}, acc ->
            error_sample = %{
              captured_at: DateTime.utc_now(),
              error: inspect(reason),
              rows: []
            }
  
            {:cont, [error_sample | acc]}
        end)
    end)      
  end
end

Run Helper

defmodule Runner do
  def run(%{cmd: cmd, proj_path: proj_path, target: target, service: service}) do
    bm = %Benchmark{
      proj_path: proj_path,
      target: target,
    } |> Benchmark.setup_run(cmd)
  
    docker = Docker.new(bm)
    monitor_task = docker |> DockerRunner.async_monitor_stats(%{every: 1_000, stop_after: 10_000})
    tasks = docker |> DockerRunner.trigger_run_stream(service)
    stats_samples = Task.await(monitor_task, :infinity)
  
    bm |> Benchmark.collect_status(tasks, stats_samples)
  end
end

Example Invocations

Uncomment the target you want to run. In the sample below, the Airflow invocation is enabled and the Gust example remains commented out.

# Runner.run(%{
#   proj_path: "Projects/gust-benchmark/",
#   target: :gust,
#   cmd: "/app/bin/gust-cli trigger_run benchmark_mock_io",
#   service: "gust"
# })

Runner.run(%{
  proj_path: "Projects/gust-benchmark/",
  target: :airflow,
  cmd: "airflow dags trigger benchmark_mock_io",
  service: "airflow-scheduler"
})