ExArrow Tutorial 2: Arrow Flight & Flight SQL
deps = [
{:explorer, "~> 0.11"},
{:kino, "~> 0.19.0"},
{:adbc, "~> 0.12"},
{:gen_stage, "~> 1.3"},
{:nx, "~> 0.12.1"},
{:telemetry, "~> 1.0"},
{:flow, "~> 1.2"},
{:broadway, "~> 1.0"}
]
local? = File.exists?(Path.join(__DIR__, "../native/ex_arrow_native/Cargo.toml"))
{ex_arrow_dep, extra_deps, config} =
if local? do
System.put_env("EX_ARROW_BUILD", "1")
# Force recompile ex_arrow so optional deps are detected at compile time.
ex_arrow_beam = Path.join(__DIR__, "../_build/dev/lib/ex_arrow/ebin")
if File.dir?(ex_arrow_beam) do
File.rm_rf!(ex_arrow_beam)
end
{
{:ex_arrow, path: Path.expand("..", __DIR__)},
[{:rustler, "~> 0.36", optional: true}],
[adbc: [drivers: [:sqlite]], rustler_precompiled: [force_build: [ex_arrow: true]]]
}
else
{
{:ex_arrow, "~> 0.7.0"},
[],
[adbc: [drivers: [:sqlite]]]
}
end
Mix.install(deps ++ [ex_arrow_dep] ++ extra_deps, config: config)
1. Overview
Arrow Flight is a gRPC-based protocol for high-throughput Arrow data transfer. ExArrow provides a Flight client, a built-in echo server, and a Flight SQL client for executing SQL queries against remote servers. This notebook covers the Flight API (do_put/do_get, listing flights, metadata, actions) and Flight SQL prepared statements with parameter binding.
| Component | Module | Purpose |
|---|---|---|
| Server |
ExArrow.Flight.Server |
Built-in echo server: stores last do_put, serves it on do_get with ticket "echo". |
| Client |
ExArrow.Flight.Client |
Connect, do_put, do_get, list_flights, get_flight_info, get_schema, list_actions, do_action. |
| SQL Client |
ExArrow.FlightSQL.Client |
Connect to Flight SQL servers (DuckDB, DataFusion, Dremio). Query, execute DML, prepare statements with parameter binding. |
Connections are plaintext HTTP/2 only. Use on localhost or trusted networks.
2. Start the echo server
The server binds to a port and stores the last uploaded stream; do_get with ticket "echo" returns that data.
{:ok, server} = ExArrow.Flight.Server.start_link(9999, [])
{:ok, port} = ExArrow.Flight.Server.port(server)
IO.puts("Echo server listening on port #{port}")
3. Connect and do_put / do_get
Upload schema and batches with do_put, then download the same data with do_get and the ticket "echo".
{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))
{:ok, client} = ExArrow.Flight.Client.connect("localhost", 9999, [])
# Upload
:ok = ExArrow.Flight.Client.do_put(client, schema, batches)
# Download (same stream API as IPC)
{:ok, down_stream} = ExArrow.Flight.Client.do_get(client, "echo")
{:ok, down_schema} = ExArrow.Stream.schema(down_stream)
first = ExArrow.Stream.next(down_stream)
IO.puts("Downloaded schema fields: #{inspect(ExArrow.Schema.fields(down_schema) |> Enum.map(& &1.name))}")
IO.puts("First batch rows: #{ExArrow.RecordBatch.num_rows(first)}")
4. list_flights
List all flights the server advertises. With the echo server, you’ll see the stored flight (e.g. descriptor {:cmd, "echo"}).
# Empty criteria = list all
{:ok, flights} = ExArrow.Flight.Client.list_flights(client, <<>>)
Enum.each(flights, fn info ->
IO.puts("Descriptor: #{inspect(info.descriptor)}")
IO.puts(" Endpoints: #{length(info.endpoints)}")
IO.puts(" Total records: #{info.total_records}, bytes: #{info.total_bytes}")
end)
5. get_flight_info
Get metadata for a specific flight by descriptor (e.g. command or path).
{:ok, info} = ExArrow.Flight.Client.get_flight_info(client, {:cmd, "echo"})
IO.inspect(info.descriptor, label: "Descriptor")
IO.inspect(length(info.endpoints), label: "Endpoints")
6. get_schema
Retrieve the Arrow schema for a flight without streaming record batches. Useful for validation or UI.
{:ok, schema} = ExArrow.Flight.Client.get_schema(client, {:cmd, "echo"})
ExArrow.Schema.fields(schema) |> Enum.map(& &1.name)
7. Actions: list_actions and do_action
The echo server supports simple actions: ping (returns ["pong"]) and clear (clears stored data, returns []).
{:ok, action_types} = ExArrow.Flight.Client.list_actions(client)
Enum.each(action_types, fn a -> IO.puts("#{a.type}: #{a.description}") end)
{:ok, result} = ExArrow.Flight.Client.do_action(client, "ping", <<>>)
IO.inspect(result, label: "ping result")
# Clear stored echo data
{:ok, result} = ExArrow.Flight.Client.do_action(client, "clear", <<>>)
IO.inspect(result, label: "clear result")
# After clear, do_get("echo") would have nothing until you do_put again
8. Optional: connection timeout
You can pass a connection timeout when connecting:
# Example: 5 second connect timeout
{:ok, client} = ExArrow.Flight.Client.connect("localhost", 9999, connect_timeout_ms: 5_000)
Per-call timeouts (e.g. for do_get) are not yet in the API; wrap calls in Task.async + Task.yield(pid, timeout_ms) if you need them.
9. Stop the server
:ok = ExArrow.Flight.Server.stop(server)
IO.puts("Server stopped.")
Summary
-
Server:
Flight.Server.start_link(port, opts)→ optionalhost(default"127.0.0.1"). -
Client:
connect(host, port, opts)→do_put,do_get,list_flights,get_flight_info,get_schema,list_actions,do_action. -
Same
ExArrow.StreamandExArrow.RecordBatchas IPC and ADBC.
10. Flight SQL: prepared statements with parameter binding
Flight SQL layers SQL semantics on top of Arrow Flight. Use it to connect to remote query servers (DuckDB over the network, DataFusion, Dremio, InfluxDB v3). For in-process databases, use ADBC instead.
Note: This section requires a running Flight SQL server. Start DuckDB with
INSTALL flight_sql; SELECT * FROM duckdb_extensions();and then launch its Flight SQL listener, or use any compatible server.
Connect and query
# Uncomment when a Flight SQL server is running:
# {:ok, client} = ExArrow.FlightSQL.Client.connect("localhost:32010")
#
# # Simple query (all batches collected into a Result)
# {:ok, result} = ExArrow.FlightSQL.Client.query(client, "SELECT id, name FROM users LIMIT 10")
# result.num_rows
# result.batches
#
# # Lazy streaming for large results
# {:ok, stream} = ExArrow.FlightSQL.Client.stream_query(client, "SELECT * FROM events")
# Enum.to_list(stream)
IO.puts("Uncomment when a Flight SQL server is available.")
Prepare, bind, execute, close
# Uncomment when a Flight SQL server is running:
# {:ok, client} = ExArrow.FlightSQL.Client.connect("localhost:32010")
#
# # Prepare a parameterized query
# {:ok, stmt} = ExArrow.FlightSQL.Client.prepare(client, "SELECT * FROM users WHERE id = ?")
#
# # Inspect expected parameter schema
# {:ok, schema} = ExArrow.FlightSQL.Statement.parameter_schema(stmt)
# ExArrow.Schema.fields(schema)
#
# # Bind parameters as a RecordBatch
# {:ok, params} = ExArrow.RecordBatch.from_columns(["id"], [<<42::little-signed-64>>], ["s64"], 1)
# :ok = ExArrow.FlightSQL.Statement.bind(stmt, params)
#
# # Execute
# {:ok, stream} = ExArrow.FlightSQL.Statement.execute(stmt)
# batches = Enum.to_list(stream)
#
# # Re-bind with a different value (reuses the prepared plan)
# {:ok, other} = ExArrow.RecordBatch.from_columns(["id"], [<<99::little-signed-64>>], ["s64"], 1)
# :ok = ExArrow.FlightSQL.Statement.bind(stmt, other)
# {:ok, stream2} = ExArrow.FlightSQL.Statement.execute(stmt)
#
# # Close when done (idempotent)
# :ok = ExArrow.FlightSQL.Statement.close(stmt)
IO.puts("Uncomment when a Flight SQL server is available.")
Supported dtypes for from_columns/4
# Fixed-width: "s8"-"s64", "u8"-"u64", "f32", "f64", "bool"
# Date/time: "date32", "date64", "timestamp_seconds"-"timestamp_nanos",
# "duration_seconds"-"duration_nanos"
# Variable-length: "utf8", "large_utf8", "binary", "large_binary"
# (length-prefixed: <<len::little-32, bytes::binary-size(len)>> per element)
# Example: bind two columns (int64 + utf8)
(utf8 = <<5::little-32, "hello", 5::little-32, "world">>)
# {:ok, params} = ExArrow.RecordBatch.from_columns(
# ["id", "name"],
# [<<1::little-signed-64, 2::little-signed-64>>, utf8],
# ["s64", "utf8"],
# 2
# )
IO.puts("See ExArrow.RecordBatch moduledoc for the full dtype table and wire format.")
Next notebook: ADBC for querying databases and getting Arrow result streams directly.