Powered by AppSignal & Oban Pro

ExArrow Tutorial 2: Arrow Flight

livebook/02_flight.livemd

ExArrow Tutorial 2: Arrow Flight

Arrow Flight is a gRPC-based protocol for high-throughput Arrow data transfer. ExArrow provides a client and a built-in echo server. This notebook walks through the full Flight API: do_put/do_get, listing flights, metadata, and actions.


1. Overview

Component Module Purpose
Server ExArrow.Flight.Server Built-in echo server: stores last do_put, serves it on do_get with ticket "echo".
Client ExArrow.Flight.Client Connect, do_put, do_get, list_flights, get_flight_info, get_schema, list_actions, do_action.

Connections are plaintext HTTP/2 only. Use on localhost or trusted networks.


2. Start the echo server

The server binds to a port and stores the last uploaded stream; do_get with ticket "echo" returns that data.

{:ok, server} = ExArrow.Flight.Server.start_link(9999, [])
port = ExArrow.Flight.Server.port(server)
IO.puts("Echo server listening on port #{port}")

3. Connect and do_put / do_get

Upload schema and batches with do_put, then download the same data with do_get and the ticket "echo".

{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

{:ok, client} = ExArrow.Flight.Client.connect("localhost", 9999, [])

# Upload
:ok = ExArrow.Flight.Client.do_put(client, schema, batches)

# Download (same stream API as IPC)
{:ok, down_stream} = ExArrow.Flight.Client.do_get(client, "echo")
{:ok, down_schema} = ExArrow.Stream.schema(down_stream)
first = ExArrow.Stream.next(down_stream)
IO.puts("Downloaded schema fields: #{inspect(ExArrow.Schema.fields(down_schema) |> Enum.map(& &1.name))}")
IO.puts("First batch rows: #{ExArrow.RecordBatch.num_rows(first)}")

4. list_flights

List all flights the server advertises. With the echo server, you’ll see the stored flight (e.g. descriptor {:cmd, "echo"}).

# Empty criteria = list all
{:ok, flights} = ExArrow.Flight.Client.list_flights(client, <<>>)

Enum.each(flights, fn info ->
  IO.puts("Descriptor: #{inspect(info.descriptor)}")
  IO.puts("  Endpoints: #{length(info.endpoints)}")
  IO.puts("  Total records: #{info.total_records}, bytes: #{info.total_bytes}")
end)

5. get_flight_info

Get metadata for a specific flight by descriptor (e.g. command or path).

{:ok, info} = ExArrow.Flight.Client.get_flight_info(client, {:cmd, "echo"})
IO.inspect(info.descriptor, label: "Descriptor")
IO.inspect(length(info.endpoints), label: "Endpoints")

6. get_schema

Retrieve the Arrow schema for a flight without streaming record batches. Useful for validation or UI.

{:ok, schema} = ExArrow.Flight.Client.get_schema(client, {:cmd, "echo"})
ExArrow.Schema.fields(schema) |> Enum.map(&amp; &amp;1.name)

7. Actions: list_actions and do_action

The echo server supports simple actions: ping (returns ["pong"]) and clear (clears stored data, returns []).

{:ok, action_types} = ExArrow.Flight.Client.list_actions(client)
Enum.each(action_types, fn a -> IO.puts("#{a.type}: #{a.description}") end)
{:ok, result} = ExArrow.Flight.Client.do_action(client, "ping", <<>>)
IO.inspect(result, label: "ping result")
# Clear stored echo data
{:ok, result} = ExArrow.Flight.Client.do_action(client, "clear", <<>>)
IO.inspect(result, label: "clear result")

# After clear, do_get("echo") would have nothing until you do_put again

8. Optional: connection timeout

You can pass a connection timeout when connecting:

# Example: 5 second connect timeout
# {:ok, client} = ExArrow.Flight.Client.connect("localhost", 9999, connect_timeout_ms: 5_000)

Per-call timeouts (e.g. for do_get) are not yet in the API; wrap calls in Task.async + Task.yield(pid, timeout_ms) if you need them.


9. Stop the server

:ok = ExArrow.Flight.Server.stop(server)
IO.puts("Server stopped.")

Summary

  • Server: Flight.Server.start_link(port, opts) → optional host (default "127.0.0.1").
  • Client: connect(host, port, opts)do_put, do_get, list_flights, get_flight_info, get_schema, list_actions, do_action.
  • Same ExArrow.Stream and ExArrow.RecordBatch as IPC and ADBC. Next notebook: ADBC for querying databases and getting Arrow result streams.