Powered by AppSignal & Oban Pro

ExArrow Tutorial 3: ADBC (Arrow Database Connectivity)

livebook/03_adbc.livemd

ExArrow Tutorial 3: ADBC (Arrow Database Connectivity)

deps = [
  {:explorer, "~> 0.11"},
  {:kino, "~> 0.19.0"},
  {:adbc, "~> 0.12"},
  {:gen_stage, "~> 1.3"},
  {:nx, "~> 0.12.1"},
  {:telemetry, "~> 1.0"},
  {:flow, "~> 1.2"},
  {:broadway, "~> 1.0"}
]

local? = File.exists?(Path.join(__DIR__, "../native/ex_arrow_native/Cargo.toml"))

{ex_arrow_dep, extra_deps, config} =
  if local? do
    System.put_env("EX_ARROW_BUILD", "1")

    # Force recompile ex_arrow so optional deps are detected at compile time.
    ex_arrow_beam = Path.join(__DIR__, "../_build/dev/lib/ex_arrow/ebin")
    if File.dir?(ex_arrow_beam) do
      File.rm_rf!(ex_arrow_beam)
    end

    {
      {:ex_arrow, path: Path.expand("..", __DIR__)},
      [{:rustler, "~> 0.36", optional: true}],
      [adbc: [drivers: [:sqlite]], rustler_precompiled: [force_build: [ex_arrow: true]]]
    }
  else
    {
      {:ex_arrow, "~> 0.7.0"},
      [],
      [adbc: [drivers: [:sqlite]]]
    }
  end

Mix.install(deps ++ [ex_arrow_dep] ++ extra_deps, config: config)

Section

ADBC exposes databases through the Arrow Database Connectivity API. You open a Database (driver + options), then a Connection, then run Statement (SQL) and get an Arrow stream of result batches—the same ExArrow.Stream you use for IPC and Flight.

This notebook covers: opening a database, executing SQL, consuming the result stream, and optional metadata APIs.


1. Concepts

Handle Module Purpose
Database ExArrow.ADBC.Database Driver (path or name) + init options (e.g. URI).
Connection ExArrow.ADBC.Connection Session from a database.
Statement ExArrow.ADBC.Statement Set SQL, execute → returns ExArrow.Stream of record batches.

Flow: Database.openConnection.openStatement.new(conn, sql)executeStream (schema + next).


2. Driver loading

ExArrow supports two ways to reach a database:

  • :adbc_package backend (this notebook) — uses the adbc Hex package to download SQLite and query via supervised Adbc.Database / Adbc.Connection. Results are converted to ExArrow.Stream. No native ADBC .dylib required — works in Livebook.
  • Native C driver — ExArrow’s NIF loads libadbc_driver_sqlite.dylib (or .so) via the ADBC driver manager. Use driver_path: or driver_name: + ADBC_DRIVER. See Installing an ADBC driver.

DriverHelper.ensure_driver_and_open/2 calls Adbc.download_driver/1 then opens by driver_name; that only works when a loadable C driver is on disk. In Livebook, prefer :adbc_package instead.


3. Open database (adbc_package backend)

Configure the backend, download the SQLite driver for the adbc package, start the manager, and open an in-memory database:

:ok = Adbc.download_driver!(:sqlite)
Application.put_env(:adbc, :drivers, [:sqlite])
Application.put_env(:ex_arrow, :adbc_package, driver: :sqlite, uri: ":memory:")

unless Process.whereis(ExArrow.ADBC.AdbcPackageManager) do
  {:ok, _pid} = ExArrow.ADBC.AdbcPackageManager.start_link()
end

db =
  case ExArrow.ADBC.Database.open(:adbc_package) do
  {:ok, db} ->
    IO.puts("Database opened (in-memory SQLite via adbc_package)")
    db
  {:error, msg} ->
    IO.puts("Failed to open database: #{msg}")
    nil
end

4. Open database (native C driver, optional)

If you have a native ADBC driver installed (build from Apache Arrow ADBC or system package), open by name or path:

# Uncomment when a native driver is available (not needed for cells below):
# {:ok, db} = ExArrow.ADBC.Database.open(driver_name: "adbc_driver_sqlite", uri: ":memory:")
# Or by path:
# {:ok, db} = ExArrow.ADBC.Database.open(driver_path: "/path/to/libadbc_driver_sqlite.dylib", uri: ":memory:")

5. Connection and first query

Open a connection, create a statement, set SQL, execute. You get back an ExArrow.Stream; use Stream.schema/1 and Stream.next/1 as with IPC and Flight.

if db do
  {:ok, conn} = ExArrow.ADBC.Connection.open(db)
  {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS n, 'hello' AS msg")
  {:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)

  {:ok, schema} = ExArrow.Stream.schema(stream)
  IO.puts("Columns: #{inspect(ExArrow.Schema.fields(schema) |> Enum.map(& &1.name))}")

  batch = ExArrow.Stream.next(stream)
  IO.puts("Rows: #{ExArrow.RecordBatch.num_rows(batch)}")
  # Consume until ExArrow.Stream.next(stream) returns nil
else
  IO.puts("Open a database in the previous cells first.")
end

6. Consume all batches

Typical pattern: loop until next/1 returns nil or {:error, _}.

if db do
  {:ok, conn} = ExArrow.ADBC.Connection.open(db)
  {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS x UNION SELECT 2 UNION SELECT 3")
  {:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)

  batches =
    Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end)
    |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

  total = Enum.reduce(batches, 0, fn b, acc -> acc + ExArrow.RecordBatch.num_rows(b) end)
  IO.puts("Batches: #{length(batches)}, total rows: #{total}")
else
  IO.puts("Open a database first.")
end

7. Metadata APIs (native driver only)

When using a native C driver, you can query catalog metadata without SQL:

  • get_table_types/1 — stream of table types (e.g. TABLE, VIEW).
  • get_table_schema/3 — Arrow schema for a table: (conn, catalog, db_schema, table_name); use nil for catalog/db_schema where not applicable.
  • get_objects/2 — hierarchical view (catalogs, schemas, tables, columns) with optional filters.

The :adbc_package backend does not implement these yet. With the setup above, expect {:error, ...}. DDL such as CREATE TABLE also fails to produce an ExArrow.Stream on this backend — use a native driver (section 4) for metadata and schema setup.

if db do
  {:ok, conn} = ExArrow.ADBC.Connection.open(db)

  case ExArrow.ADBC.Connection.get_table_schema(conn, nil, nil, "demo") do
    {:ok, schema} ->
      IO.puts("Table 'demo' columns: #{inspect(ExArrow.Schema.fields(schema) |> Enum.map(& &1.name))}")

    {:error, msg} ->
      IO.puts("get_table_schema: #{msg}")
  end

  case ExArrow.ADBC.Connection.get_objects(conn, depth: "tables") do
    {:ok, stream} ->
      count =
        Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end)
        |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))
        |> length()

      IO.puts("get_objects returned #{count} batch(es)")

    {:error, msg} ->
      IO.puts("get_objects: #{msg}")
  end
else
  IO.puts("Open a database first.")
end

With a native driver, create a table first, then call the metadata APIs:

# Native driver only (after opening via section 4):
# {:ok, conn} = ExArrow.ADBC.Connection.open(db)
# {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "CREATE TABLE demo (id INTEGER, name TEXT)")
# {:ok, _stream} = ExArrow.ADBC.Statement.execute(stmt)
# ExArrow.ADBC.Connection.get_table_schema(conn, nil, nil, "demo")

8. Statement.bind (when supported)

Some drivers support binding a record batch to a statement (e.g. for bulk insert or prepared parameters). Use Statement.new(conn, sql, bind: record_batch) for an initial bind, or Statement.bind(stmt, record_batch) to rebind; unsupported drivers return {:error, message}. Not supported on the :adbc_package backend.

# Example (native driver; support varies):
# {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT * FROM t WHERE id = ?", bind: record_batch)
# {:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)
# Rebinding:
# :ok = ExArrow.ADBC.Statement.bind(stmt, other_batch)

9. Errors

Driver load failure, execute failure, or unsupported operations return {:error, message}. Use ExArrow.ADBC.Error.from_message/1 to wrap a string in a struct; ExArrow.ADBC.Error.message/1 works on both.

case ExArrow.ADBC.Database.open(driver_name: "nonexistent_driver", uri: "") do
  {:ok, _} -> IO.puts("Unexpected success")
  {:error, msg} -> IO.puts("Expected error: #{msg}")
end

10. Roundtrip: ADBC → IPC → Explorer (optional)

A common pattern: query with ADBC, get Arrow batches, write to IPC (binary or file), then load into Explorer for analysis.

if db && Code.ensure_loaded?(Explorer.DataFrame) do
  {:ok, conn} = ExArrow.ADBC.Connection.open(db)
  {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS a, 'x' AS b UNION SELECT 2, 'y'")
  {:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)

  {:ok, schema} = ExArrow.Stream.schema(stream)
  batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

  {:ok, binary} = ExArrow.IPC.Writer.to_binary(schema, batches)
  df = Explorer.DataFrame.load_ipc_stream!(binary)
  df["a"]
else
  "Need db and Explorer to run this cell."
end

Summary

  • Livebook / quick start: configure :ex_arrow, :adbc_package, then Database.open(:adbc_package) → same Statement/Stream API as IPC and Flight.
  • Production / native driver: Database.open by driver_path or driver_name + uri; metadata and bind when the driver supports them.
  • Connection pooling with the adbc package: see 04_adbc_integration.livemd.

You’ve now seen IPC, Flight, and ADBC. ExArrow keeps Arrow data in native memory and gives you a single stream/batch API across all three—ideal for pipelines, ETL, and interchanging with Explorer or other Arrow consumers.