ExArrow ADBC Integration
Mix.install([
{:ex_arrow, path: Path.join(__DIR__, "..")},
{:adbc, "~> 0.7"},
{:explorer, "~> 0.8"},
{:nimble_pool, "~> 1.1"},
{:rustler, "~> 0.32.0"}
])
Section
This notebook demonstrates the adbc_package backend: a single Livebook-friendly integration that uses the adbc package (and optional NimblePool) so you can query a database and get ExArrow.Stream results without loading a native ADBC driver. It also demos connection pooling when nimble_pool is available.
Setup
Run this cell first. It installs dependencies, configures the adbc_package backend (SQLite in-memory), enables a small connection pool, and starts the backend manager if the ExArrow application started without this config (e.g. in Livebook).
# Ensure the SQLite driver is available (downloads if needed; required before opening :adbc_package)
Adbc.download_driver!(:sqlite)
# ADBC: use SQLite driver
Application.put_env(:adbc, :drivers, [:sqlite])
# ExArrow adbc_package backend: driver + URI (in-memory SQLite)
Application.put_env(:ex_arrow, :adbc_package, [driver: :sqlite, uri: ":memory:"])
# Optional: use a pool of connections (requires nimble_pool)
Application.put_env(:ex_arrow, :adbc_package_pool_size, 4)
# Start the manager if it wasn't started by the application (e.g. config was set after app start)
unless Process.whereis(ExArrow.ADBC.AdbcPackageManager) do
{:ok, _pid} = ExArrow.ADBC.AdbcPackageManager.start_link()
end
IO.puts("Setup complete. adbc_package backend ready (pool size: #{Application.get_env(:ex_arrow, :adbc_package_pool_size, 1)}).")
1. Open the adbc_package backend and run a query
Open the database with Database.open(:adbc_package), then create a connection and statement, set SQL, and execute. You get back an ExArrow.Stream of Arrow record batches.
{:ok, db} = ExArrow.ADBC.Database.open(:adbc_package)
{:ok, conn} = ExArrow.ADBC.Connection.open(db)
{:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS n, 'hello' AS msg")
{:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)
2. Consume the result stream
Use the same ExArrow.Stream API as for IPC and Flight: schema/1 and next/1.
{:ok, schema} = ExArrow.Stream.schema(stream)
IO.puts("Columns: #{inspect(ExArrow.Schema.fields(schema) |> Enum.map(& &1.name))}")
batch = ExArrow.Stream.next(stream)
IO.puts("Rows: #{ExArrow.RecordBatch.num_rows(batch)}")
# No more batches
ExArrow.Stream.next(stream)
Display as a table (optional) — In Livebook you can load the stream into Explorer to render a table. Run after the query cell:
{:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end)
|> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))
{:ok, binary} = ExArrow.IPC.Writer.to_binary(schema, batches)
# to_binary produces stream format; use load_ipc_stream!
Explorer.DataFrame.load_ipc_stream!(binary)
3. Another query (multiple rows)
Run a query that returns several rows and consume all batches.
{:ok, stmt2} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS x UNION SELECT 2 UNION SELECT 3")
{:ok, stream2} = ExArrow.ADBC.Statement.execute(stmt2)
batches =
Stream.repeatedly(fn -> ExArrow.Stream.next(stream2) end)
|> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))
total = Enum.reduce(batches, 0, fn b, acc -> acc + ExArrow.RecordBatch.num_rows(b) end)
IO.puts("Batches: #{length(batches)}, total rows: #{total}")
4. Connection pooling
When :adbc_package_pool_size is greater than 1 and the nimble_pool dependency is available, the adbc_package backend uses a NimblePool of connections. Each execute checks out a connection from the pool, runs the query, and returns it. This allows concurrent queries to run in parallel.
Below we run several queries concurrently with Task.async / Task.await. With a pool size of 4, up to 4 of these can run at once.
# Run 8 queries concurrently; the pool (size 4) serves them
tasks =
for i <- 1..8 do
Task.async(fn ->
{:ok, db} = ExArrow.ADBC.Database.open(:adbc_package)
{:ok, conn} = ExArrow.ADBC.Connection.open(db)
{:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT #{i} AS id, 'query_#{i}' AS label")
{:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)
batch = ExArrow.Stream.next(stream)
rows = if batch, do: ExArrow.RecordBatch.num_rows(batch), else: 0
{i, rows}
end)
end
results = Task.await_many(tasks, 15_000)
IO.inspect(results, label: "Concurrent query results")
Summary
-
Setup: Configure
:ex_arrow, :adbc_package(e.g.[driver: :sqlite, uri: ":memory:"]) and optionally:adbc_package_pool_size. If the manager wasn’t started by the application (e.g. in Livebook), start it withAdbcPackageManager.start_link/0. -
Flow:
Database.open(:adbc_package)→Connection.open(db)→Statement.new(conn, sql)→execute→ExArrow.Stream(schema + next), same as in Tutorial 3: ADBC. -
Pooling: With
pool_size > 1andnimble_poolin deps, the backend uses a connection pool so multiple concurrent executes can run in parallel.