Powered by AppSignal & Oban Pro

Nx Streaming from Zarr

livebooks/nx_streaming.livemd

Nx Streaming from Zarr

Introduction

Stream Zarr chunks directly into Nx tensors for machine learning workflows.

Mix.install([
  {:ex_zarr, path: Path.dirname(__DIR__)},
  {:nx, "~> 0.7"},
  {:kino, "~> 0.13"}
])

Create Array and Stream to Tensors

alias ExZarr.Array

{:ok, array} =
  Array.create(
    shape: {100, 100},
    chunks: {10, 10},
    dtype: :float32,
    storage: :memory
  )

data = for(_ <- 1..100, into: <<>>, do: <<1.0::float-little-32>>)
Array.set_slice(array, data, start: {0, 0}, stop: {10, 10})

tensors =
  array
  |> Array.stream_chunks()
  |> Stream.take(4)
  |> Enum.map(fn {_index, chunk_data} ->
    Nx.from_binary(chunk_data, {:f, 32})
  end)

length(tensors)

Batch Pipeline

batches =
  array
  |> Array.stream_chunks(concurrency: 2)
  |> Stream.map(fn {_index, data} -> Nx.from_binary(data, {:f, 32}) end)
  |> Stream.chunk_every(2)
  |> Enum.take(2)

Kino.inspect(batches |> Enum.map(fn batch -> Enum.map(batch, &Nx.shape/1) end))

Using DataLoader

For training workflows with shuffling:

array
|> ExZarr.Nx.DataLoader.batch_stream(32)
|> Enum.take(3)
|> Enum.map(fn {:ok, batch} -> Nx.shape(batch) end)

Key Takeaways

  1. stream_chunks composes naturally with Nx.from_binary/2
  2. Use concurrency to overlap I/O with tensor construction
  3. ExZarr.Nx.DataLoader handles sample-level batching and shuffling