Powered by AppSignal & Oban Pro

ExArrow Tutorial 2: Arrow Flight & Flight SQL

livebook/02_flight.livemd

ExArrow Tutorial 2: Arrow Flight & Flight SQL

deps = [
  {:explorer, "~> 0.11"},
  {:kino, "~> 0.19.0"},
  {:adbc, "~> 0.12"},
  {:gen_stage, "~> 1.3"},
  {:nx, "~> 0.12.1"},
  {:telemetry, "~> 1.0"},
  {:flow, "~> 1.2"},
  {:broadway, "~> 1.0"}
]

local? = File.exists?(Path.join(__DIR__, "../native/ex_arrow_native/Cargo.toml"))

{ex_arrow_dep, extra_deps, config} =
  if local? do
    System.put_env("EX_ARROW_BUILD", "1")

    # Force recompile ex_arrow so optional deps are detected at compile time.
    ex_arrow_beam = Path.join(__DIR__, "../_build/dev/lib/ex_arrow/ebin")
    if File.dir?(ex_arrow_beam) do
      File.rm_rf!(ex_arrow_beam)
    end

    {
      {:ex_arrow, path: Path.expand("..", __DIR__)},
      [{:rustler, "~> 0.36", optional: true}],
      [adbc: [drivers: [:sqlite]], rustler_precompiled: [force_build: [ex_arrow: true]]]
    }
  else
    {
      {:ex_arrow, "~> 0.7.0"},
      [],
      [adbc: [drivers: [:sqlite]]]
    }
  end

Mix.install(deps ++ [ex_arrow_dep] ++ extra_deps, config: config)

1. Overview

Arrow Flight is a gRPC-based protocol for high-throughput Arrow data transfer. ExArrow provides a Flight client, a built-in echo server, and a Flight SQL client for executing SQL queries against remote servers. This notebook covers the Flight API (do_put/do_get, listing flights, metadata, actions) and Flight SQL prepared statements with parameter binding.


Component Module Purpose
Server ExArrow.Flight.Server Built-in echo server: stores last do_put, serves it on do_get with ticket "echo".
Client ExArrow.Flight.Client Connect, do_put, do_get, list_flights, get_flight_info, get_schema, list_actions, do_action.
SQL Client ExArrow.FlightSQL.Client Connect to Flight SQL servers (DuckDB, DataFusion, Dremio). Query, execute DML, prepare statements with parameter binding.

Connections are plaintext HTTP/2 only. Use on localhost or trusted networks.


2. Start the echo server

The server binds to a port and stores the last uploaded stream; do_get with ticket "echo" returns that data.

{:ok, server} = ExArrow.Flight.Server.start_link(9999, [])
{:ok, port} = ExArrow.Flight.Server.port(server)
IO.puts("Echo server listening on port #{port}")

3. Connect and do_put / do_get

Upload schema and batches with do_put, then download the same data with do_get and the ticket "echo".

{:ok, ipc_bytes} = ExArrow.Native.ipc_test_fixture_binary()
{:ok, stream} = ExArrow.IPC.Reader.from_binary(ipc_bytes)
{:ok, schema} = ExArrow.Stream.schema(stream)
batches = Stream.repeatedly(fn -> ExArrow.Stream.next(stream) end) |> Enum.take_while(&is_struct(&1, ExArrow.RecordBatch))

{:ok, client} = ExArrow.Flight.Client.connect("localhost", 9999, [])

# Upload
:ok = ExArrow.Flight.Client.do_put(client, schema, batches)

# Download (same stream API as IPC)
{:ok, down_stream} = ExArrow.Flight.Client.do_get(client, "echo")
{:ok, down_schema} = ExArrow.Stream.schema(down_stream)
first = ExArrow.Stream.next(down_stream)
IO.puts("Downloaded schema fields: #{inspect(ExArrow.Schema.fields(down_schema) |> Enum.map(& &1.name))}")
IO.puts("First batch rows: #{ExArrow.RecordBatch.num_rows(first)}")

4. list_flights

List all flights the server advertises. With the echo server, you’ll see the stored flight (e.g. descriptor {:cmd, "echo"}).

# Empty criteria = list all
{:ok, flights} = ExArrow.Flight.Client.list_flights(client, <<>>)

Enum.each(flights, fn info ->
  IO.puts("Descriptor: #{inspect(info.descriptor)}")
  IO.puts("  Endpoints: #{length(info.endpoints)}")
  IO.puts("  Total records: #{info.total_records}, bytes: #{info.total_bytes}")
end)

5. get_flight_info

Get metadata for a specific flight by descriptor (e.g. command or path).

{:ok, info} = ExArrow.Flight.Client.get_flight_info(client, {:cmd, "echo"})
IO.inspect(info.descriptor, label: "Descriptor")
IO.inspect(length(info.endpoints), label: "Endpoints")

6. get_schema

Retrieve the Arrow schema for a flight without streaming record batches. Useful for validation or UI.

{:ok, schema} = ExArrow.Flight.Client.get_schema(client, {:cmd, "echo"})
ExArrow.Schema.fields(schema) |> Enum.map(& &1.name)

7. Actions: list_actions and do_action

The echo server supports simple actions: ping (returns ["pong"]) and clear (clears stored data, returns []).

{:ok, action_types} = ExArrow.Flight.Client.list_actions(client)
Enum.each(action_types, fn a -> IO.puts("#{a.type}: #{a.description}") end)
{:ok, result} = ExArrow.Flight.Client.do_action(client, "ping", <<>>)
IO.inspect(result, label: "ping result")
# Clear stored echo data
{:ok, result} = ExArrow.Flight.Client.do_action(client, "clear", <<>>)
IO.inspect(result, label: "clear result")

# After clear, do_get("echo") would have nothing until you do_put again

8. Optional: connection timeout

You can pass a connection timeout when connecting:

# Example: 5 second connect timeout
{:ok, client} = ExArrow.Flight.Client.connect("localhost", 9999, connect_timeout_ms: 5_000)

Per-call timeouts (e.g. for do_get) are not yet in the API; wrap calls in Task.async + Task.yield(pid, timeout_ms) if you need them.


9. Stop the server

:ok = ExArrow.Flight.Server.stop(server)
IO.puts("Server stopped.")

Summary

  • Server: Flight.Server.start_link(port, opts) → optional host (default "127.0.0.1").
  • Client: connect(host, port, opts)do_put, do_get, list_flights, get_flight_info, get_schema, list_actions, do_action.
  • Same ExArrow.Stream and ExArrow.RecordBatch as IPC and ADBC.

10. Flight SQL: prepared statements with parameter binding

Flight SQL layers SQL semantics on top of Arrow Flight. Use it to connect to remote query servers (DuckDB over the network, DataFusion, Dremio, InfluxDB v3). For in-process databases, use ADBC instead.

Note: This section requires a running Flight SQL server. Start DuckDB with INSTALL flight_sql; SELECT * FROM duckdb_extensions(); and then launch its Flight SQL listener, or use any compatible server.

Connect and query

# Uncomment when a Flight SQL server is running:
# {:ok, client} = ExArrow.FlightSQL.Client.connect("localhost:32010")
#
# # Simple query (all batches collected into a Result)
# {:ok, result} = ExArrow.FlightSQL.Client.query(client, "SELECT id, name FROM users LIMIT 10")
# result.num_rows
# result.batches
#
# # Lazy streaming for large results
# {:ok, stream} = ExArrow.FlightSQL.Client.stream_query(client, "SELECT * FROM events")
# Enum.to_list(stream)
IO.puts("Uncomment when a Flight SQL server is available.")

Prepare, bind, execute, close

# Uncomment when a Flight SQL server is running:
# {:ok, client} = ExArrow.FlightSQL.Client.connect("localhost:32010")
#
# # Prepare a parameterized query
# {:ok, stmt} = ExArrow.FlightSQL.Client.prepare(client, "SELECT * FROM users WHERE id = ?")
#
# # Inspect expected parameter schema
# {:ok, schema} = ExArrow.FlightSQL.Statement.parameter_schema(stmt)
# ExArrow.Schema.fields(schema)
#
# # Bind parameters as a RecordBatch
# {:ok, params} = ExArrow.RecordBatch.from_columns(["id"], [<<42::little-signed-64>>], ["s64"], 1)
# :ok = ExArrow.FlightSQL.Statement.bind(stmt, params)
#
# # Execute
# {:ok, stream} = ExArrow.FlightSQL.Statement.execute(stmt)
# batches = Enum.to_list(stream)
#
# # Re-bind with a different value (reuses the prepared plan)
# {:ok, other} = ExArrow.RecordBatch.from_columns(["id"], [<<99::little-signed-64>>], ["s64"], 1)
# :ok = ExArrow.FlightSQL.Statement.bind(stmt, other)
# {:ok, stream2} = ExArrow.FlightSQL.Statement.execute(stmt)
#
# # Close when done (idempotent)
# :ok = ExArrow.FlightSQL.Statement.close(stmt)
IO.puts("Uncomment when a Flight SQL server is available.")

Supported dtypes for from_columns/4

# Fixed-width: "s8"-"s64", "u8"-"u64", "f32", "f64", "bool"
# Date/time: "date32", "date64", "timestamp_seconds"-"timestamp_nanos",
#            "duration_seconds"-"duration_nanos"
# Variable-length: "utf8", "large_utf8", "binary", "large_binary"
#   (length-prefixed: <<len::little-32, bytes::binary-size(len)>> per element)

# Example: bind two columns (int64 + utf8)
(utf8 = <<5::little-32, "hello", 5::little-32, "world">>)
# {:ok, params} = ExArrow.RecordBatch.from_columns(
#   ["id", "name"],
#   [<<1::little-signed-64, 2::little-signed-64>>, utf8],
#   ["s64", "utf8"],
#   2
# )
IO.puts("See ExArrow.RecordBatch moduledoc for the full dtype table and wire format.")

Next notebook: ADBC for querying databases and getting Arrow result streams directly.