Powered by AppSignal & Oban Pro

ExArrow ADBC Integration

livebook/04_adbc_integration.livemd

ExArrow ADBC Integration

deps = [
  {:explorer, "~> 0.11"},
  {:kino, "~> 0.19.0"},
  {:adbc, "~> 0.12"},
  {:gen_stage, "~> 1.3"},
  {:nx, "~> 0.12.1"},
  {:telemetry, "~> 1.0"},
  {:flow, "~> 1.2"},
  {:broadway, "~> 1.0"}
]

local? = File.exists?(Path.join(__DIR__, "../native/ex_arrow_native/Cargo.toml"))

{ex_arrow_dep, extra_deps, config} =
  if local? do
    System.put_env("EX_ARROW_BUILD", "1")

    # Force recompile ex_arrow so optional deps are detected at compile time.
    ex_arrow_beam = Path.join(__DIR__, "../_build/dev/lib/ex_arrow/ebin")
    if File.dir?(ex_arrow_beam) do
      File.rm_rf!(ex_arrow_beam)
    end

    {
      {:ex_arrow, path: Path.expand("..", __DIR__)},
      [{:rustler, "~> 0.36", optional: true}],
      [adbc: [drivers: [:sqlite]], rustler_precompiled: [force_build: [ex_arrow: true]]]
    }
  else
    {
      {:ex_arrow, "~> 0.7.0"},
      [],
      [adbc: [drivers: [:sqlite]]]
    }
  end

Mix.install(deps ++ [ex_arrow_dep] ++ extra_deps, config: config)

Section

This notebook demonstrates the adbc_package backend: a single Livebook-friendly integration that uses the adbc package (and optional NimblePool) so you can query a database and get ExArrow.Stream results without loading a native ADBC driver. It also demos connection pooling when nimble_pool is available.


Setup

Run this cell first. It installs dependencies, configures the adbc_package backend (SQLite in-memory), enables a small connection pool, and starts the backend manager if the ExArrow application started without this config (e.g. in Livebook).

# Ensure the SQLite driver is available (downloads if needed; required before opening :adbc_package)
Adbc.download_driver!(:sqlite)

# ADBC: use SQLite driver
Application.put_env(:adbc, :drivers, [:sqlite])

# ExArrow adbc_package backend: driver + URI (in-memory SQLite)
Application.put_env(:ex_arrow, :adbc_package, [driver: :sqlite, uri: ":memory:"])

# Optional: use a pool of connections (requires nimble_pool)
Application.put_env(:ex_arrow, :adbc_package_pool_size, 4)

# Start the manager if it wasn't started by the application (e.g. config was set after app start)
unless Process.whereis(ExArrow.ADBC.AdbcPackageManager) do
  {:ok, _pid} = ExArrow.ADBC.AdbcPackageManager.start_link()
end

IO.puts("Setup complete. adbc_package backend ready (pool size: #{Application.get_env(:ex_arrow, :adbc_package_pool_size, 1)}).")

1. Open the adbc_package backend and run a query

Open the database with Database.open(:adbc_package), then create a connection and statement, set SQL, and execute. You get back an ExArrow.Stream of Arrow record batches.

{:ok, db} = ExArrow.ADBC.Database.open(:adbc_package)
{:ok, conn} = ExArrow.ADBC.Connection.open(db)
{:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS n, 'hello' AS msg")
{:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)

2. Consume the result stream

Use the same ExArrow.Stream API as for IPC and Flight: schema/1 and next/1.

{:ok, schema} = ExArrow.Stream.schema(stream)
IO.puts("Columns: #{inspect(ExArrow.Schema.fields(schema) |> Enum.map(& &1.name))}")

batch = ExArrow.Stream.next(stream)
IO.puts("Rows: #{ExArrow.RecordBatch.num_rows(batch)}")

# No more batches
ExArrow.Stream.next(stream)

Display as a table (optional) — In Livebook you can load the stream into Explorer to render a table. Run after the query cell:

{:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end)
          |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))
{:ok, binary} = ExArrow.IPC.Writer.to_binary(schema, batches)
# to_binary produces stream format; use load_ipc_stream!
Explorer.DataFrame.load_ipc_stream!(binary)

3. Another query (multiple rows)

Run a query that returns several rows and consume all batches.

{:ok, stmt2} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS x UNION SELECT 2 UNION SELECT 3")
{:ok, stream2} = ExArrow.ADBC.Statement.execute(stmt2)

batches =
  Stream.repeatedly(fn -> ExArrow.Stream.next(stream2) end)
  |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

total = Enum.reduce(batches, 0, fn b, acc -> acc + ExArrow.RecordBatch.num_rows(b) end)
IO.puts("Batches: #{length(batches)}, total rows: #{total}")

4. Connection pooling

When :adbc_package_pool_size is greater than 1 and the nimble_pool dependency is available, the adbc_package backend uses a NimblePool of connections. Each execute checks out a connection from the pool, runs the query, and returns it. This allows concurrent queries to run in parallel.

Below we run several queries concurrently with Task.async / Task.await. With a pool size of 4, up to 4 of these can run at once.

# Run 8 queries concurrently; the pool (size 4) serves them
tasks =
  for i <- 1..8 do
    Task.async(fn ->
      {:ok, db} = ExArrow.ADBC.Database.open(:adbc_package)
      {:ok, conn} = ExArrow.ADBC.Connection.open(db)
      {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT #{i} AS id, 'query_#{i}' AS label")
      {:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)
      batch = ExArrow.Stream.next(stream)
      rows = if batch, do: ExArrow.RecordBatch.num_rows(batch), else: 0
      {i, rows}
    end)
  end

results = Task.await_many(tasks, 15_000)
IO.inspect(results, label: "Concurrent query results")

Summary

  • Setup: Configure :ex_arrow, :adbc_package (e.g. [driver: :sqlite, uri: ":memory:"]) and optionally :adbc_package_pool_size. If the manager wasn’t started by the application (e.g. in Livebook), start it with AdbcPackageManager.start_link/0.
  • Flow: Database.open(:adbc_package)Connection.open(db)Statement.new(conn, sql)executeExArrow.Stream (schema + next), same as in Tutorial 3: ADBC.
  • Pooling: With pool_size > 1 and nimble_pool in deps, the backend uses a connection pool so multiple concurrent executes can run in parallel.