Powered by AppSignal & Oban Pro

ExArrow Tutorial 1: IPC (Stream and File)

livebook/01_ipc.livemd

ExArrow Tutorial 1: IPC (Stream and File)

Mix.install([
  {:ex_arrow, path: Path.join(__DIR__, "..")},
  {:explorer, "~> 0.8"}
])

Section

This notebook covers Arrow IPC in ExArrow: reading and writing stream format (binary or file), random-access file format, and how schema and types are exposed. You’ll use the same ExArrow.Stream and ExArrow.RecordBatch handles whether data came from IPC, Flight, or ADBC.


1. Stream vs file format

Format Use case API
Stream Sequential read/write (sockets, HTTP, one-pass files) Reader.from_binary/1, from_file/1Stream; Writer.to_binary/2, to_file/3
File Random access: schema, batch count, get batch by index IPC.File.from_file/1, from_binary/1schema/1, batch_count/1, get_batch/2

Stream format has no footer; you read batches one by one until next/1 returns nil. File format has a footer so you can jump to any batch without reading the whole file.


2. Reading a stream from binary

Typical when the Arrow payload is in memory (e.g. HTTP response body or a buffer).

{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)

Get the schema without consuming the stream:

{:ok, schema} = ExArrow.Stream.schema(stream)
fields = ExArrow.Schema.fields(schema)
Enum.each(fields, fn f -> IO.puts("#{f.name}: #{f.type}") end)

Consume batches until the stream is exhausted:

batches =
  Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end)
  |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

total_rows = Enum.reduce(batches, 0, fn b, acc -> acc + ExArrow.RecordBatch.num_rows(b) end)
IO.puts("Batches: #{length(batches)}, total rows: #{total_rows}")

3. Reading a stream from a file path

Same API as from binary; the backend reads from the path. Use this for .arrow files that are in stream format (no random access needed).

# Write a temp file so we can read it back (stream format)
{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

path = Path.join(System.tmp_dir!(), "ex_arrow_demo.arrow")
:ok = ExArrow.IPC.Writer.to_file(path, schema, batches)

# Read back from file
{:ok, file_stream} = ExArrow.IPC.Reader.from_file(path)
{:ok, schema2} = ExArrow.Stream.schema(file_stream)
first_batch = ExArrow.Stream.next(file_stream)
IO.puts("Rows in first batch: #{ExArrow.RecordBatch.num_rows(first_batch)}")

File.rm(path)

4. Writing: to binary and to file

You always need a schema and a list (or enumerable) of record batches that share that schema. Usually you get them from a stream you read, or from another source (e.g. Flight/ADBC).

{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

# Write to binary (e.g. for HTTP response or socket)
{:ok, binary} = ExArrow.IPC.Writer.to_binary(schema, batches)
IO.puts("Written #{byte_size(binary)} bytes")

# Write to file (stream format)
out_path = Path.join(System.tmp_dir!(), "ex_arrow_out.arrow")
:ok = ExArrow.IPC.Writer.to_file(out_path, schema, batches)
IO.puts("Written to #{out_path}")
File.rm(out_path)

5. File format: random access

When the IPC data is in file format (with footer), use ExArrow.IPC.File to open it. You get schema, batch count, and any batch by index without reading the whole file.

# Built-in file-format fixture (same schema: id int64, name utf8; one batch, 2 rows)
{:ok, file_binary} = ExArrow.Native.ipc_test_fixture_file_binary()
{:ok, file} = ExArrow.IPC.File.from_binary(file_binary)

{:ok, schema} = ExArrow.IPC.File.schema(file)
n = ExArrow.IPC.File.batch_count(file)
IO.puts("Batch count: #{n}")

{:ok, batch0} = ExArrow.IPC.File.get_batch(file, 0)
IO.puts("Rows in batch 0: #{ExArrow.RecordBatch.num_rows(batch0)}")

To create file-format data from Elixir you currently use the low-level NIF ipc_file_writer_to_file/3; a higher-level API may be added later. For most workflows, stream format with Writer.to_file/3 is enough.


6. Schema and field types

ExArrow.Schema.fields/1 returns a list of %ExArrow.Field{} structs with name and type. The type is an atom such as :int64, :utf8, :float64, :boolean, :timestamp, etc.

{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)

ExArrow.Schema.fields(schema)

Supported type atoms include: :null, :boolean, :int64, :float64, :utf8, :binary, :list, :timestamp, :decimal128, :dictionary, and others. Record batches are opaque in this release—column/array access is planned for a later milestone.


7. Interop with Explorer (optional)

If you have Explorer in your project, you can move data between ExArrow and Explorer via IPC binary or file.

ExArrow to Explorer: Produce IPC with ExArrow, then load into Explorer:

if Code.ensure_loaded?(Explorer.DataFrame) do
  {:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
  {:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
  {:ok, schema} = ExArrow.Stream.schema(stream)
  batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))
  {:ok, binary} = ExArrow.IPC.Writer.to_binary(schema, batches)
  df = Explorer.DataFrame.load_ipc_stream!(binary)
  df["id"]
else
  "Explorer not loaded; add {:explorer, \"~> 0.8\"} to deps to try this."
end

Explorer to ExArrow: Dump a dataframe to IPC, then read with ExArrow:

if Code.ensure_loaded?(Explorer.DataFrame) do
  df = Explorer.DataFrame.new(x: [1, 2, 3], y: ["a", "b", "c"])
  # Reader.from_binary expects stream format; use dump_ipc_stream!
  binary = Explorer.DataFrame.dump_ipc_stream!(df)
  {:ok, stream} = ExArrow.IPC.Reader.from_binary(binary)
  {:ok, schema} = ExArrow.Stream.schema(stream)
  ExArrow.Schema.fields(schema) |> Enum.map(& &1.name)
else
  "Explorer not loaded."
end

Summary

  • Stream: Reader.from_binary/1 or from_file/1 then Stream.schema/1, Stream.next/1; write with Writer.to_binary/2 or to_file/3. Explorer → ExArrow: use Explorer.DataFrame.dump_ipc_stream!/1 (stream format) then Reader.from_binary/1.
  • File format: IPC.File.from_binary/1 or from_file/1 then schema/1, batch_count/1, get_batch/2.
  • Schema and batches are shared across IPC, Flight, and ADBC. Next notebook: Arrow Flight.