Powered by AppSignal & Oban Pro

ExArrow ADBC Integration

livebook/04_adbc_integration.livemd

ExArrow ADBC Integration

Mix.install([
  {:ex_arrow, path: Path.join(__DIR__, "..")},
  {:adbc, "~> 0.7"},
  {:explorer, "~> 0.8"},
  {:nimble_pool, "~> 1.1"},
  {:rustler, "~> 0.32.0"}
])

Section

This notebook demonstrates the adbc_package backend: a single Livebook-friendly integration that uses the adbc package (and optional NimblePool) so you can query a database and get ExArrow.Stream results without loading a native ADBC driver. It also demos connection pooling when nimble_pool is available.


Setup

Run this cell first. It installs dependencies, configures the adbc_package backend (SQLite in-memory), enables a small connection pool, and starts the backend manager if the ExArrow application started without this config (e.g. in Livebook).

# Ensure the SQLite driver is available (downloads if needed; required before opening :adbc_package)
Adbc.download_driver!(:sqlite)

# ADBC: use SQLite driver
Application.put_env(:adbc, :drivers, [:sqlite])

# ExArrow adbc_package backend: driver + URI (in-memory SQLite)
Application.put_env(:ex_arrow, :adbc_package, [driver: :sqlite, uri: ":memory:"])

# Optional: use a pool of connections (requires nimble_pool)
Application.put_env(:ex_arrow, :adbc_package_pool_size, 4)

# Start the manager if it wasn't started by the application (e.g. config was set after app start)
unless Process.whereis(ExArrow.ADBC.AdbcPackageManager) do
  {:ok, _pid} = ExArrow.ADBC.AdbcPackageManager.start_link()
end

IO.puts("Setup complete. adbc_package backend ready (pool size: #{Application.get_env(:ex_arrow, :adbc_package_pool_size, 1)}).")

1. Open the adbc_package backend and run a query

Open the database with Database.open(:adbc_package), then create a connection and statement, set SQL, and execute. You get back an ExArrow.Stream of Arrow record batches.

{:ok, db} = ExArrow.ADBC.Database.open(:adbc_package)
{:ok, conn} = ExArrow.ADBC.Connection.open(db)
{:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS n, 'hello' AS msg")
{:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)

2. Consume the result stream

Use the same ExArrow.Stream API as for IPC and Flight: schema/1 and next/1.

{:ok, schema} = ExArrow.Stream.schema(stream)
IO.puts("Columns: #{inspect(ExArrow.Schema.fields(schema) |> Enum.map(& &1.name))}")

batch = ExArrow.Stream.next(stream)
IO.puts("Rows: #{ExArrow.RecordBatch.num_rows(batch)}")

# No more batches
ExArrow.Stream.next(stream)

Display as a table (optional) — In Livebook you can load the stream into Explorer to render a table. Run after the query cell:

{:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end)
          |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))
{:ok, binary} = ExArrow.IPC.Writer.to_binary(schema, batches)
# to_binary produces stream format; use load_ipc_stream!
Explorer.DataFrame.load_ipc_stream!(binary)

3. Another query (multiple rows)

Run a query that returns several rows and consume all batches.

{:ok, stmt2} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS x UNION SELECT 2 UNION SELECT 3")
{:ok, stream2} = ExArrow.ADBC.Statement.execute(stmt2)

batches =
  Stream.repeatedly(fn -> ExArrow.Stream.next(stream2) end)
  |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

total = Enum.reduce(batches, 0, fn b, acc -> acc + ExArrow.RecordBatch.num_rows(b) end)
IO.puts("Batches: #{length(batches)}, total rows: #{total}")

4. Connection pooling

When :adbc_package_pool_size is greater than 1 and the nimble_pool dependency is available, the adbc_package backend uses a NimblePool of connections. Each execute checks out a connection from the pool, runs the query, and returns it. This allows concurrent queries to run in parallel.

Below we run several queries concurrently with Task.async / Task.await. With a pool size of 4, up to 4 of these can run at once.

# Run 8 queries concurrently; the pool (size 4) serves them
tasks =
  for i <- 1..8 do
    Task.async(fn ->
      {:ok, db} = ExArrow.ADBC.Database.open(:adbc_package)
      {:ok, conn} = ExArrow.ADBC.Connection.open(db)
      {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT #{i} AS id, 'query_#{i}' AS label")
      {:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)
      batch = ExArrow.Stream.next(stream)
      rows = if batch, do: ExArrow.RecordBatch.num_rows(batch), else: 0
      {i, rows}
    end)
  end

results = Task.await_many(tasks, 15_000)
IO.inspect(results, label: "Concurrent query results")

Summary

  • Setup: Configure :ex_arrow, :adbc_package (e.g. [driver: :sqlite, uri: ":memory:"]) and optionally :adbc_package_pool_size. If the manager wasn’t started by the application (e.g. in Livebook), start it with AdbcPackageManager.start_link/0.
  • Flow: Database.open(:adbc_package)Connection.open(db)Statement.new(conn, sql)executeExArrow.Stream (schema + next), same as in Tutorial 3: ADBC.
  • Pooling: With pool_size > 1 and nimble_pool in deps, the backend uses a connection pool so multiple concurrent executes can run in parallel.