Powered by AppSignal & Oban Pro

Broadway Zarr Pipeline

livebooks/broadway_pipeline.livemd

Broadway Zarr Pipeline

Introduction

This livebook demonstrates fault-tolerant image tile processing using ExZarr and Broadway.

Mix.install([
  {:ex_zarr, path: Path.dirname(__DIR__)},
  {:broadway, "~> 1.0"},
  {:kino, "~> 0.13"}
])

Create Sample Array

alias ExZarr.Array

{:ok, array} =
  Array.create(
    shape: {100, 100},
    chunks: {10, 10},
    dtype: :uint8,
    storage: :memory
  )

# Write gradient pattern
for x <- 0..9, y <- 0..9 do
  data = for(_ <- 1..100, into: <<>>, do: <<x * 10 + y>>)
  Array.set_slice(array, data, start: {x * 10, y * 10}, stop: {(x + 1) * 10, (y + 1) * 10})
end

Process Chunks with stream_chunks

results =
  array
  |> Array.stream_chunks(concurrency: 4, metadata: true)
  |> Enum.map(fn %{index: idx, data: tile} ->
    {idx, Enum.sum(:binary.bin_to_list(tile))}
  end)

Kino.DataTable.new(results)

Broadway Pipeline Pattern

For production pipelines, define a Broadway module:

defmodule TilePipeline do
  use Broadway

  def start_link(array) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {ExZarr.Broadway.ChunkProducer, array: array, stream_opts: [concurrency: 1]},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 4, max_demand: 5, min_demand: 2]
      ]
    )
  end

  @impl Broadway
  def handle_message(_processor, %{data: {index, data}} = message, _ctx) do
    sum = Enum.sum(:binary.bin_to_list(data))
    Broadway.Message.put_data(message, {index, sum})
  end
end

{:ok, _pid} = TilePipeline.start_link(array)

Key Takeaways

  1. Broadway provides supervision and retry semantics for chunk pipelines
  2. ExZarr.Broadway.ChunkProducer handles demand-driven chunk reads
  3. Use stream_chunks directly for simpler pipelines without Broadway