Powered by AppSignal & Oban Pro

ExArrow Tutorial 3: ADBC (Arrow Database Connectivity)

livebook/03_adbc.livemd

ExArrow Tutorial 3: ADBC (Arrow Database Connectivity)

Section

ADBC exposes databases through the Arrow Database Connectivity API. You open a Database (driver + options), then a Connection, then run Statement (SQL) and get an Arrow stream of result batches—the same ExArrow.Stream you use for IPC and Flight.

This notebook covers: opening a database (with optional adbc package), executing SQL, consuming the result stream, and optional metadata APIs.


1. Concepts

Handle Module Purpose
Database ExArrow.ADBC.Database Driver (path or name) + init options (e.g. URI).
Connection ExArrow.ADBC.Connection Session from a database.
Statement ExArrow.ADBC.Statement Set SQL, execute → returns ExArrow.Stream of record batches.

Flow: Database.openConnection.openStatement.new(conn, sql)executeStream (schema + next).


2. Driver loading

ExArrow does not install or download ADBC drivers. You either:

  • By path: Pass the path to the driver shared library (e.g. libadbc_driver_sqlite.so).
  • By name: Pass driver_name: "adbc_driver_sqlite" and optionally uri: ":memory:". The ADBC driver manager looks up the library by name (environment or system path).

If the adbc package is available, you can use it to download drivers on demand and then open with ExArrow.


3. Open database (with adbc package)

When adbc is in your deps, use DriverHelper.ensure_driver_and_open/2 to ensure the driver is present and open the database:

# Ensures SQLite driver is available (downloads if needed), then opens :memory: DB
case ExArrow.ADBC.DriverHelper.ensure_driver_and_open(:sqlite, ":memory:") do
  {:ok, db} ->
    IO.puts("Database opened (in-memory SQLite)")
    # We'll use db in the next cells

  {:error, msg} ->
    IO.puts("Driver not available: #{msg}")
    IO.puts("Add {:adbc, \"~> 0.7\"} to deps to use DriverHelper, or install an ADBC driver and open by path/name.")
    db = nil
end

4. Open database (by name, no adbc)

If you have the driver installed (e.g. via system package or manual install), open by name:

# Uncomment and run if you don't use DriverHelper above
# {:ok, db} = ExArrow.ADBC.Database.open(driver_name: "adbc_driver_sqlite", uri: ":memory:")
# Or with a file: uri: "file:my.db"

5. Connection and first query

Open a connection, create a statement, set SQL, execute. You get back an ExArrow.Stream; use Stream.schema/1 and Stream.next/1 as with IPC and Flight.

if db do
  {:ok, conn} = ExArrow.ADBC.Connection.open(db)
  {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS n, 'hello' AS msg")
  {:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)

  {:ok, schema} = ExArrow.Stream.schema(stream)
  IO.puts("Columns: #{inspect(ExArrow.Schema.fields(schema) |> Enum.map(& &1.name))}")

  batch = ExArrow.Stream.next(stream)
  IO.puts("Rows: #{ExArrow.RecordBatch.num_rows(batch)}")
  # Consume until ExArrow.Stream.next(stream) returns nil
else
  IO.puts("Open a database in the previous cells first.")
end

6. Consume all batches

Typical pattern: loop until next/1 returns nil or {:error, _}.

if db do
  {:ok, conn} = ExArrow.ADBC.Connection.open(db)
  {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS x UNION SELECT 2 UNION SELECT 3")
  {:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)

  batches =
    Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end)
    |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

  total = Enum.reduce(batches, 0, fn b, acc -> acc + ExArrow.RecordBatch.num_rows(b) end)
  IO.puts("Batches: #{length(batches)}, total rows: #{total}")
else
  IO.puts("Open a database first.")
end

7. Metadata APIs (when supported by driver)

When the driver supports them, you can query catalog metadata without SQL.

  • get_table_types/1 — stream of table types (e.g. TABLE, VIEW).
  • get_table_schema/3 — Arrow schema for a table: (conn, catalog, db_schema, table_name); use nil for catalog/db_schema where not applicable.
  • get_objects/2 — hierarchical view (catalogs, schemas, tables, columns) with optional filters.
if db do
  {:ok, conn} = ExArrow.ADBC.Connection.open(db)

  # Create a table so we can ask for its schema
  {:ok, stmt_create} = ExArrow.ADBC.Statement.new(conn, "CREATE TABLE demo (id INTEGER, name TEXT)")
  {:ok, _} = ExArrow.ADBC.Statement.execute(stmt_create)

  # Get table schema (SQLite: catalog/db_schema often nil)
  case ExArrow.ADBC.Connection.get_table_schema(conn, nil, nil, "demo") do
    {:ok, schema} ->
      IO.puts("Table 'demo' columns: #{inspect(ExArrow.Schema.fields(schema) |> Enum.map(& &1.name))}")
    {:error, msg} ->
      IO.puts("get_table_schema not supported or failed: #{msg}")
  end

  # get_objects (depth and optional filters)
  case ExArrow.ADBC.Connection.get_objects(conn, depth: "tables") do
    {:ok, stream} ->
      count = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch)) |> length
      IO.puts("get_objects returned #{count} batch(es)")
    {:error, msg} ->
      IO.puts("get_objects: #{msg}")
  end
else
  IO.puts("Open a database first.")
end

8. Statement.bind (when supported)

Some drivers support binding a record batch to a statement (e.g. for bulk insert or prepared parameters). Use Statement.new(conn, sql, bind: record_batch) for an initial bind, or Statement.bind(stmt, record_batch) to rebind; unsupported drivers return {:error, message}.

# Example (driver support varies):
# {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT * FROM t WHERE id = ?", bind: record_batch)
# {:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)
# Rebinding:
# :ok = ExArrow.ADBC.Statement.bind(stmt, other_batch)

9. Errors

Driver load failure, execute failure, or unsupported operations return {:error, message}. Use ExArrow.ADBC.Error.from_message/1 to wrap a string in a struct; ExArrow.ADBC.Error.message/1 works on both.

case ExArrow.ADBC.Database.open(driver_name: "nonexistent_driver", uri: "") do
  {:ok, _} -> IO.puts("Unexpected success")
  {:error, msg} -> IO.puts("Expected error: #{msg}")
end

10. Roundtrip: ADBC → IPC → Explorer (optional)

A common pattern: query with ADBC, get Arrow batches, write to IPC (binary or file), then load into Explorer for analysis.

if db && Code.ensure_loaded?(Explorer.DataFrame) do
  {:ok, conn} = ExArrow.ADBC.Connection.open(db)
  {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS a, 'x' AS b UNION SELECT 2, 'y'")
  {:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)

  {:ok, schema} = ExArrow.Stream.schema(stream)
  batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

  {:ok, binary} = ExArrow.IPC.Writer.to_binary(schema, batches)
  df = Explorer.DataFrame.load_ipc_stream!(binary)
  df["a"]
else
  "Need db and Explorer to run this cell."
end

Summary

  • Database.open: by path (string) or by driver_name + optional uri. Use DriverHelper.ensure_driver_and_open/2 when the adbc package is available.
  • Connection.openStatement.new(conn, sql)executeStream: same schema/1 and next/1 as IPC and Flight.
  • Metadata: get_table_types, get_table_schema, get_objects when the driver supports them.
  • Statement.bind: optional; driver-dependent.

You’ve now seen IPC, Flight, and ADBC. ExArrow keeps Arrow data in native memory and gives you a single stream/batch API across all three—ideal for pipelines, ETL, and interchanging with Explorer or other Arrow consumers.