Powered by AppSignal & Oban Pro

ExArrow Quick Start

livebook/00_quickstart.livemd

ExArrow Quick Start

deps = [
  {:explorer, "~> 0.11"},
  {:kino, "~> 0.19.0"},
  {:adbc, "~> 0.12"},
  {:nimble_pool, "~> 1.1"},
  {:nx, "~> 0.12"},
  {:telemetry, "~> 1.0"},
  {:flow, "~> 1.2"},
  {:gen_stage, "~> 1.2"},
  {:broadway, "~> 1.0"}
]

# Opened from livebook/ in the repo → local source; otherwise Hex (precompiled NIF).
local? = File.exists?(Path.join(__DIR__, "../native/ex_arrow_native/Cargo.toml"))

{ex_arrow_dep, extra_deps, config} =
  if local? do
    System.put_env("EX_ARROW_BUILD", "1")

    # Force recompile ex_arrow so optional deps (Nx, GenStage, etc.) are
    # detected at compile time. Without this, a cached build from a prior
    # Mix.install (without these deps) will have stub functions.
    ex_arrow_beam = Path.join(__DIR__, "../_build/dev/lib/ex_arrow/ebin")
    if File.dir?(ex_arrow_beam) do
      File.rm_rf!(ex_arrow_beam)
    end

    {
      {:ex_arrow, path: Path.expand("..", __DIR__)},
      [{:rustler, "~> 0.36", optional: true}],
      [adbc: [drivers: [:sqlite]], rustler_precompiled: [force_build: [ex_arrow: true]]]
    }
  else
    {
      {:ex_arrow, "~> 0.7.0"},
      [],
      [adbc: [drivers: [:sqlite]]]
    }
  end

Mix.install(deps ++ [ex_arrow_dep] ++ extra_deps, config: config)

Start

Get up and running with ExArrow in a few minutes. This notebook touches all major features: IPC (read/write Arrow streams), Arrow Flight (client/server), Flight SQL (parameterized queries), ADBC (query databases), and data interchange (Explorer/Nx).

Configure adbc to use the SQLite driver (for section 5):

Application.put_env(:adbc, :drivers, [:sqlite])
:ok = Adbc.download_driver!(:sqlite)
db = Kino.start_child!({Adbc.Database, driver: :sqlite})
conn = Kino.start_child!({Adbc.Connection, database: db})
{:ok, result} = Adbc.Connection.query(conn, "SELECT 1 AS n, 'hello' AS msg")

1. Setup

Add ex_arrow to your Mix project and start a Livebook (or run in IEx). Here we assume the dependency is already in place.

# Verify the NIF loaded
ExArrow.native_version()

2. IPC: Read an Arrow stream

ExArrow can read Arrow IPC from a binary (e.g. from a file, socket, or HTTP body). We’ll use a small built-in fixture so you can run this without any external file.

# Get a small IPC stream binary (schema: id int64, name utf8; 2 rows)
{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()

# Open a stream from the binary
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)

# Inspect schema (without consuming the stream)
{:ok, schema} = ExArrow.Stream.schema(stream)
ExArrow.Schema.fields(schema) |> Enum.map(& &1.name)

Read batches one at a time until the stream is done:

# First batch
batch1 = ExArrow.Stream.next(stream)
if batch1, do: ExArrow.RecordBatch.num_rows(batch1), else: nil

# Second call: no more batches
ExArrow.Stream.next(stream)

3. IPC: Write then read (roundtrip)

You can write the same schema and batches back to a binary or file, then read again.

# Rebuild stream from fixture and collect schema + all batches
{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)

batches =
  Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end)
  |> Enum.take_while(&(is_struct(&1, ExArrow.RecordBatch)))

# Write to binary
{:ok, out_binary} = ExArrow.IPC.Writer.to_binary(schema, batches)
byte_size(out_binary)

4. Arrow Flight: Echo server and client

Arrow Flight is a gRPC-based protocol for streaming Arrow data. ExArrow includes a small echo server: you upload data with do_put, then download it with do_get using the ticket "echo".

Start the server (one cell), then use the client in the next.

# Start the built-in echo server on port 9999
{:ok, server} = ExArrow.Flight.Server.start_link(9999, [])
{:ok, port} = ExArrow.Flight.Server.port(server)
port
# Connect and upload our fixture data
{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

{:ok, client} = ExArrow.Flight.Client.connect("localhost", 9999, [])
:ok = ExArrow.Flight.Client.do_put(client, schema, batches)

# Download the same data via do_get
{:ok, down_stream} = ExArrow.Flight.Client.do_get(client, "echo")
{:ok, down_schema} = ExArrow.Stream.schema(down_stream)

first = ExArrow.Stream.next(down_stream)
ExArrow.RecordBatch.num_rows(first)

# Clean up
ExArrow.Flight.Server.stop(server)

5. ADBC: Query a database, get Arrow results

With ADBC you open a database (e.g. SQLite), run SQL, and get an Arrow stream of result batches—same ExArrow.Stream API as IPC and Flight.

This cell uses the :adbc_package backend (see 03 ADBC and 04 ADBC integration): the adbc package downloads SQLite; ExArrow returns ExArrow.Stream batches without a native .dylib. For native C drivers in production, see Installing an ADBC driver.

:ok = Adbc.download_driver!(:sqlite)
Application.put_env(:adbc, :drivers, [:sqlite])
Application.put_env(:ex_arrow, :adbc_package, driver: :sqlite, uri: ":memory:")

unless Process.whereis(ExArrow.ADBC.AdbcPackageManager) do
  {:ok, _pid} = ExArrow.ADBC.AdbcPackageManager.start_link()
end

case ExArrow.ADBC.Database.open(:adbc_package) do
  {:ok, db} ->
    {:ok, conn} = ExArrow.ADBC.Connection.open(db)
    {:ok, stmt} = ExArrow.ADBC.Statement.new(conn, "SELECT 1 AS n, 'hello' AS msg")
    {:ok, stream} = ExArrow.ADBC.Statement.execute(stmt)
    {:ok, schema} = ExArrow.Stream.schema(stream)
    batch = ExArrow.Stream.next(stream)
    IO.inspect(ExArrow.Schema.fields(schema), label: "Columns")
    IO.inspect(ExArrow.RecordBatch.num_rows(batch), label: "Rows")

  {:error, msg} ->
    IO.puts("ADBC query failed: #{msg}")
end

Next steps

  • Notebook 01 — IPC: Stream vs file format, reading from files, writing to files, schema and types.
  • Notebook 02 — Flight: Full server/client API, Flight SQL prepared statements with parameter binding.
  • Notebook 03 — ADBC: Metadata APIs, optional Explorer roundtrip, and production tips.

6. Explorer interchange (v0.6+)

ExArrow can convert between Arrow data and Explorer DataFrames in one call:

if Code.ensure_loaded?(Explorer.DataFrame) do
  df = Explorer.DataFrame.new(id: [1, 2, 3], name: ["alice", "bob", "carol"])

  # DataFrame → Arrow
  {:ok, batch} = ExArrow.from_dataframe(df)
  IO.puts("Batch rows: #{ExArrow.RecordBatch.num_rows(batch)}")

  # Arrow → DataFrame
  {:ok, df2} = ExArrow.to_dataframe(batch)
  IO.inspect(df2)
else
  "Explorer not loaded; add {:explorer, \"~> 0.11\"} to deps."
end

7. Nx interchange (v0.6+)

ExArrow can convert between Arrow columns and Nx tensors without materialising lists:

if Code.ensure_loaded?(Nx) do
  tensor = Nx.tensor([1, 2, 3], type: {:s, 64})

  # Tensor → Arrow
  {:ok, batch} = ExArrow.from_nx(tensor)
  IO.puts("Batch rows: #{ExArrow.RecordBatch.num_rows(batch)}")

  # Arrow → Tensor
  {:ok, back} = ExArrow.to_nx(batch)
  IO.inspect(Nx.to_list(back))
else
  "Nx not loaded; add {:nx, \"~> 0.9\"} to deps."
end

8. Flight SQL prepared statements with parameter binding (v0.6+)

Connect to a Flight SQL server, prepare a query, bind parameters, and execute:

# Requires a running Flight SQL server (e.g. DuckDB with flight_sql extension)
# {:ok, client} = ExArrow.FlightSQL.Client.connect("localhost:32010")
#
# {:ok, stmt} = ExArrow.FlightSQL.Client.prepare(client, "SELECT * FROM users WHERE id = ?")
#
# {:ok, schema} = ExArrow.FlightSQL.Statement.parameter_schema(stmt)
# # => [%ExArrow.Field{name: "id", type: :int64, nullable: false}]
#
# {:ok, params} = ExArrow.RecordBatch.from_columns(["id"], [<<42::little-signed-64>>], ["s64"], 1)
# :ok = ExArrow.FlightSQL.Statement.bind(stmt, params)
# {:ok, stream} = ExArrow.FlightSQL.Statement.execute(stmt)
# batches = Enum.to_list(stream)
#
# # Re-bind with a different parameter value
# {:ok, other} = ExArrow.RecordBatch.from_columns(["id"], [<<99::little-signed-64>>], ["s64"], 1)
# :ok = ExArrow.FlightSQL.Statement.bind(stmt, other)
# {:ok, stream2} = ExArrow.FlightSQL.Statement.execute(stmt)
#
# :ok = ExArrow.FlightSQL.Statement.close(stmt)

IO.puts("Uncomment the cells above when a Flight SQL server is available.")

9. Streaming pipelines (v0.7.0)

v0.7.0 introduces first-class streaming and a pipeline DSL. The unit of execution is the Arrow RecordBatch.

Stream constructors — one entry point per source:

# From an IPC binary (built-in fixture)
{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.Stream.from_ipc(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)
ExArrow.Schema.field_names(schema)

Batch operations — lightweight, Arrow-native transforms:

batch = ExArrow.Stream.next(stream)

{:ok, slim} = ExArrow.Batch.select(batch, ["id"])
ExArrow.RecordBatch.column_names(slim)

{:ok, renamed} = ExArrow.Batch.rename(slim, %{"id" => "user_id"})
ExArrow.RecordBatch.column_names(renamed)

{:ok, first1} = ExArrow.Batch.take(renamed, 1)
ExArrow.RecordBatch.num_rows(first1)

Pipeline DSL — lazy map_batches + sink:

# Re-open the stream (the previous one was consumed)
{:ok, stream2} = ExArrow.Stream.from_ipc(ipc_bytes)
out_path = Path.join(System.tmp_dir!(), "ex_arrow_quickstart_pipeline.parquet")

{:ok, stream2}
|> ExArrow.Pipeline.map_batches(fn b ->
  {:ok, s} = ExArrow.Batch.select(b, ["id"])
  s
end)
|> ExArrow.Pipeline.write_parquet(out_path)

File.exists?(out_path)

Telemetry — attach a handler and observe every batch:

:telemetry.attach(
  "ex-arrow-quickstart",
  [:ex_arrow, :stream, :batch],
  fn _event, measurements, metadata, _config ->
    IO.puts("batch: #{measurements[:rows]} rows from #{inspect(metadata[:source])}")
  end,
  nil
)

{:ok, stream3} = ExArrow.Stream.from_ipc(ipc_bytes)
_ = ExArrow.Stream.to_list(stream3)

:telemetry.detach("ex-arrow-quickstart")

Docs: hexdocs.pm/ex_arrow.