Nx Streaming from Zarr
Introduction
Stream Zarr chunks directly into Nx tensors for machine learning workflows.
Mix.install([
{:ex_zarr, path: Path.dirname(__DIR__)},
{:nx, "~> 0.7"},
{:kino, "~> 0.13"}
])
Create Array and Stream to Tensors
alias ExZarr.Array
{:ok, array} =
Array.create(
shape: {100, 100},
chunks: {10, 10},
dtype: :float32,
storage: :memory
)
data = for(_ <- 1..100, into: <<>>, do: <<1.0::float-little-32>>)
Array.set_slice(array, data, start: {0, 0}, stop: {10, 10})
tensors =
array
|> Array.stream_chunks()
|> Stream.take(4)
|> Enum.map(fn {_index, chunk_data} ->
Nx.from_binary(chunk_data, {:f, 32})
end)
length(tensors)
Batch Pipeline
batches =
array
|> Array.stream_chunks(concurrency: 2)
|> Stream.map(fn {_index, data} -> Nx.from_binary(data, {:f, 32}) end)
|> Stream.chunk_every(2)
|> Enum.take(2)
Kino.inspect(batches |> Enum.map(fn batch -> Enum.map(batch, &Nx.shape/1) end))
Using DataLoader
For training workflows with shuffling:
array
|> ExZarr.Nx.DataLoader.batch_stream(32)
|> Enum.take(3)
|> Enum.map(fn {:ok, batch} -> Nx.shape(batch) end)
Key Takeaways
-
stream_chunkscomposes naturally withNx.from_binary/2 -
Use
concurrencyto overlap I/O with tensor construction -
ExZarr.Nx.DataLoaderhandles sample-level batching and shuffling