Powered by AppSignal & Oban Pro

ExArrow Tutorial 1: IPC (Stream and File)

livebook/01_ipc.livemd

ExArrow Tutorial 1: IPC (Stream and File)

deps = [
  {:explorer, "~> 0.11"},
  {:kino, "~> 0.19.0"},
  {:adbc, "~> 0.12"},
  {:gen_stage, "~> 1.3"},
  {:nx, "~> 0.12.1"},
  {:telemetry, "~> 1.0"},
  {:flow, "~> 1.2"},
  {:broadway, "~> 1.0"}
]

local? = File.exists?(Path.join(__DIR__, "../native/ex_arrow_native/Cargo.toml"))

{ex_arrow_dep, extra_deps, config} =
  if local? do
    System.put_env("EX_ARROW_BUILD", "1")

    # Force recompile ex_arrow so optional deps are detected at compile time.
    ex_arrow_beam = Path.join(__DIR__, "../_build/dev/lib/ex_arrow/ebin")
    if File.dir?(ex_arrow_beam) do
      File.rm_rf!(ex_arrow_beam)
    end

    {
      {:ex_arrow, path: Path.expand("..", __DIR__)},
      [{:rustler, "~> 0.36", optional: true}],
      [adbc: [drivers: [:sqlite]], rustler_precompiled: [force_build: [ex_arrow: true]]]
    }
  else
    {
      {:ex_arrow, "~> 0.7.0"},
      [],
      [adbc: [drivers: [:sqlite]]]
    }
  end

Mix.install(deps ++ [ex_arrow_dep] ++ extra_deps, config: config)

Section

This notebook covers Arrow IPC in ExArrow: reading and writing stream format (binary or file), random-access file format, and how schema and types are exposed. You’ll use the same ExArrow.Stream and ExArrow.RecordBatch handles whether data came from IPC, Flight, or ADBC.


1. Stream vs file format

Format Use case API
Stream Sequential read/write (sockets, HTTP, one-pass files) Reader.from_binary/1, from_file/1Stream; Writer.to_binary/2, to_file/3
File Random access: schema, batch count, get batch by index IPC.File.from_file/1, from_binary/1schema/1, batch_count/1, get_batch/2

Stream format has no footer; you read batches one by one until next/1 returns nil. File format has a footer so you can jump to any batch without reading the whole file.


2. Reading a stream from binary

Typical when the Arrow payload is in memory (e.g. HTTP response body or a buffer).

{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)

Get the schema without consuming the stream:

{:ok, schema} = ExArrow.Stream.schema(stream)
fields = ExArrow.Schema.fields(schema)
Enum.each(fields, fn f -> IO.puts("#{f.name}: #{f.type}") end)

Consume batches until the stream is exhausted:

batches =
  Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end)
  |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

total_rows = Enum.reduce(batches, 0, fn b, acc -> acc + ExArrow.RecordBatch.num_rows(b) end)
IO.puts("Batches: #{length(batches)}, total rows: #{total_rows}")

3. Reading a stream from a file path

Same API as from binary; the backend reads from the path. Use this for .arrow files that are in stream format (no random access needed).

# Write a temp file so we can read it back (stream format)
{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

path = Path.join(System.tmp_dir!(), "ex_arrow_demo.arrow")
:ok = ExArrow.IPC.Writer.to_file(path, schema, batches)

# Read back from file
{:ok, file_stream} = ExArrow.IPC.Reader.from_file(path)
{:ok, schema2} = ExArrow.Stream.schema(file_stream)
first_batch = ExArrow.Stream.next(file_stream)
IO.puts("Rows in first batch: #{ExArrow.RecordBatch.num_rows(first_batch)}")

File.rm(path)

4. Writing: to binary and to file

You always need a schema and a list (or enumerable) of record batches that share that schema. Usually you get them from a stream you read, or from another source (e.g. Flight/ADBC).

{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

# Write to binary (e.g. for HTTP response or socket)
{:ok, binary} = ExArrow.IPC.Writer.to_binary(schema, batches)
IO.puts("Written #{byte_size(binary)} bytes")

# Write to file (stream format)
out_path = Path.join(System.tmp_dir!(), "ex_arrow_out.arrow")
:ok = ExArrow.IPC.Writer.to_file(out_path, schema, batches)
IO.puts("Written to #{out_path}")
File.rm(out_path)

5. File format: random access

When the IPC data is in file format (with footer), use ExArrow.IPC.File to open it. You get schema, batch count, and any batch by index without reading the whole file.

# Built-in file-format fixture (same schema: id int64, name utf8; one batch, 2 rows)
{:ok, file_binary} = ExArrow.Native.ipc_test_fixture_file_binary()
{:ok, file} = ExArrow.IPC.File.from_binary(file_binary)

{:ok, schema} = ExArrow.IPC.File.schema(file)
n = ExArrow.IPC.File.batch_count(file)
IO.puts("Batch count: #{n}")

{:ok, batch0} = ExArrow.IPC.File.get_batch(file, 0)
IO.puts("Rows in batch 0: #{ExArrow.RecordBatch.num_rows(batch0)}")

To create file-format data from Elixir you currently use the low-level NIF ipc_file_writer_to_file/3; a higher-level API may be added later. For most workflows, stream format with Writer.to_file/3 is enough.


6. Schema and field types

ExArrow.Schema.fields/1 returns a list of %ExArrow.Field{} structs with name and type. The type is an atom such as :int64, :utf8, :float64, :boolean, :timestamp, etc.

{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)

ExArrow.Schema.fields(schema)

Supported type atoms include: :null, :boolean, :int64, :float64, :utf8, :binary, :list, :timestamp, :decimal128, :dictionary, and others. Record batches are opaque in this release—column/array access is planned for a later milestone.


7. Interop with Explorer (v0.6+)

ExArrow.from_dataframe/1 and ExArrow.to_dataframe/1 convert between Explorer DataFrames and Arrow RecordBatches in one call (requires {:explorer, "~> 0.11"}):

if Code.ensure_loaded?(Explorer.DataFrame) do
  df = Explorer.DataFrame.new(x: [1, 2, 3], y: ["a", "b", "c"])

  # DataFrame → Arrow
  {:ok, batch} = ExArrow.from_dataframe(df)
  IO.puts("from_dataframe rows: #{ExArrow.RecordBatch.num_rows(batch)}")

  # Arrow → DataFrame
  {:ok, df2} = ExArrow.to_dataframe(batch)
  IO.inspect(df2)
else
  "Explorer not loaded; add {:explorer, \"~> 0.11\"} to deps."
end

8. Interop with Nx (v0.6+)

ExArrow.from_nx/1 and ExArrow.to_nx/1 convert between Nx tensors and Arrow RecordBatches by sharing raw byte buffers (requires {:nx, "~> 0.9"}):

if Code.ensure_loaded?(Nx) do
  # Rank-1 tensor
  tensor = Nx.tensor([1, 2, 3], type: {:s, 64})
  {:ok, batch} = ExArrow.from_nx(tensor)
  IO.puts("from_nx rows: #{ExArrow.RecordBatch.num_rows(batch)}")

  {:ok, back} = ExArrow.to_nx(batch)
  IO.inspect(Nx.to_list(back))
else
  "Nx not loaded; add {:nx, \"~> 0.9\"} to deps."
end

9. Building parameter batches with from_columns/4 (v0.6+)

ExArrow.RecordBatch.from_columns/4 creates a batch from column-oriented binary data. This is the primary way to build parameter batches for Flight SQL prepared statement binding:

# Single int64 column with one row
{:ok, batch} = ExArrow.RecordBatch.from_columns(["id"], [<<42::little-signed-64>>], ["s64"], 1)
IO.puts("from_columns rows: #{ExArrow.RecordBatch.num_rows(batch)}")

# Mixed primitives (int64 + float64)
{:ok, batch2} = ExArrow.RecordBatch.from_columns(
  ["id", "score"],
  [<<1::little-signed-64>>, <<3.14::little-float-64>>],
  ["s64", "f64"],
  1
)
IO.puts("from_columns (mixed) rows: #{ExArrow.RecordBatch.num_rows(batch2)}")

# utf8 column (length-prefixed records)
utf8 = <<5::little-32, "hello", 5::little-32, "world">>
{:ok, batch3} = ExArrow.RecordBatch.from_columns(["name"], [utf8], ["utf8"], 2)
IO.puts("from_columns (utf8) rows: #{ExArrow.RecordBatch.num_rows(batch3)}")

Summary

  • Stream: Reader.from_binary/1 or from_file/1 then Stream.schema/1, Stream.next/1; write with Writer.to_binary/2 or to_file/3.
  • File format: IPC.File.from_binary/1 or from_file/1 then schema/1, batch_count/1, get_batch/2.
  • Explorer interchange (v0.6+): ExArrow.from_dataframe/1 / to_dataframe/1 for one-call conversion. Also ExArrow.DataFrame.from_arrow/1 / to_arrow/1.
  • Nx interchange (v0.6+): ExArrow.from_nx/1 / to_nx/1 for zero-copy tensor conversion.
  • Parameter batches (v0.6+): ExArrow.RecordBatch.from_columns/4 builds batches from raw binary column data, used for Flight SQL prepared statement binding.
  • Schema and batches are shared across IPC, Flight, Flight SQL, and ADBC. Next notebook: Flight & Flight SQL.