Broadway Zarr Pipeline
Introduction
This livebook demonstrates fault-tolerant image tile processing using ExZarr and Broadway.
Mix.install([
{:ex_zarr, path: Path.dirname(__DIR__)},
{:broadway, "~> 1.0"},
{:kino, "~> 0.13"}
])
Create Sample Array
alias ExZarr.Array
{:ok, array} =
Array.create(
shape: {100, 100},
chunks: {10, 10},
dtype: :uint8,
storage: :memory
)
# Write gradient pattern
for x <- 0..9, y <- 0..9 do
data = for(_ <- 1..100, into: <<>>, do: <<x * 10 + y>>)
Array.set_slice(array, data, start: {x * 10, y * 10}, stop: {(x + 1) * 10, (y + 1) * 10})
end
Process Chunks with stream_chunks
results =
array
|> Array.stream_chunks(concurrency: 4, metadata: true)
|> Enum.map(fn %{index: idx, data: tile} ->
{idx, Enum.sum(:binary.bin_to_list(tile))}
end)
Kino.DataTable.new(results)
Broadway Pipeline Pattern
For production pipelines, define a Broadway module:
defmodule TilePipeline do
use Broadway
def start_link(array) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {ExZarr.Broadway.ChunkProducer, array: array, stream_opts: [concurrency: 1]},
concurrency: 1
],
processors: [
default: [concurrency: 4, max_demand: 5, min_demand: 2]
]
)
end
@impl Broadway
def handle_message(_processor, %{data: {index, data}} = message, _ctx) do
sum = Enum.sum(:binary.bin_to_list(data))
Broadway.Message.put_data(message, {index, sum})
end
end
{:ok, _pid} = TilePipeline.start_link(array)
Key Takeaways
- Broadway provides supervision and retry semantics for chunk pipelines
- ExZarr.Broadway.ChunkProducer handles demand-driven chunk reads
-
Use
stream_chunksdirectly for simpler pipelines without Broadway