Powered by AppSignal & Oban Pro

ExZarr 05.01 — Finance: tick data cube (time × symbol × field)

05_01_tick_data_cube.livemd

ExZarr 05.01 — Finance: tick data cube (time × symbol × field)

We’ll generate synthetic tick data and store it as a 3D array:

shape = {T, S, F}

  • T: ticks (time index)
  • S: symbols
  • F: fields (price, size, bid, ask)

Then we’ll compute OHLC and VWAP using chunk streaming and parallel chunk maps.


Setup

Mix.install([{:ex_zarr, path: ".."}])

alias ExZarr.Array
alias ExZarr.Gallery.{Pack, Metrics}

1) Generate synthetic tick data

symbols = ["AAPL", "MSFT", "NVDA", "SPY"]
fields = [:price, :size, :bid, :ask]
t = 10_000

s = length(symbols)
f = length(fields)

# deterministic-ish RNG
:rand.seed(:exsss, {1, 2, 3})

gen_tick = fn base ->
  price = base + (:rand.normal() * 0.05)
  size = 1.0 + :rand.uniform() * 99.0
  spread = 0.01 + :rand.uniform() * 0.05
  bid = price - spread / 2
  ask = price + spread / 2
  [price, size, bid, ask]
end

# layout: row-major over {t, s, f}
data =
  for ti <- 0..(t - 1),
      {sym, si} <- Enum.with_index(symbols),
      fi <- 0..(f - 1),
      reduce: [] do
    acc ->
      base = 100.0 + si * 10.0 + ti * 0.0001
      tick = gen_tick.(base)
      acc ++ [Enum.at(tick, fi)]
  end

bin = Pack.pack(data, :float64)

{t, s, f, byte_size(bin)}

2) Store in ExZarr

Chunk by time windows (e.g. 500 ticks). This makes window analytics efficient.

chunks = {500, s, f}

{:ok, cube} =
  Array.create(
    shape: {t, s, f},
    chunks: chunks,
    dtype: :float64,
    compressor: :zstd,
    storage: :memory
  )

:ok =
  Array.set_slice(cube, bin,
    start: {0, 0, 0},
    stop: {t, s, f}
  )

%{shape: cube.shape, chunks: cube.chunks, dtype: cube.dtype}

3) Compute OHLC + VWAP (per symbol) over the whole sample

We’ll stream chunks and aggregate.

Field order: [price, size, bid, ask]

field_idx = %{price: 0, size: 1, bid: 2, ask: 3}

init_stats =
  for sym <- symbols, into: %{} do
    {sym, %{open: nil, high: -1.0e99, low: 1.0e99, close: nil, vwap_num: 0.0, vwap_den: 0.0}}
  end

update_stats = fn stats, row ->
  # row: list length s*f, grouped by symbol then field
  Enum.reduce(Enum.with_index(symbols), stats, fn {sym, si}, acc ->
    base = si * f
    price = Enum.at(row, base + field_idx.price)
    size = Enum.at(row, base + field_idx.size)

    st = Map.fetch!(acc, sym)

    st =
      st
      |> Map.update!(:open, fn o -> o || price end)
      |> Map.update!(:high, fn h -> max(h, price) end)
      |> Map.update!(:low, fn l -> min(l, price) end)
      |> Map.put(:close, price)
      |> Map.update!(:vwap_num, fn n -> n + price * size end)
      |> Map.update!(:vwap_den, fn d -> d + size end)

    Map.put(acc, sym, st)
  end)
end

{final, us} =
  Metrics.time(fn ->
    Array.chunk_stream(cube, parallel: 4, ordered: true)
    |> Enum.reduce(init_stats, fn {_chunk_index, chunk_bin}, stats ->
      floats = Pack.unpack(chunk_bin, :float64)

      floats
      |> Enum.chunk_every(s * f)
      |> Enum.reduce(stats, fn row, acc -> update_stats.(acc, row) end)
    end)
  end)

summary =
  final
  |> Enum.map(fn {sym, st} ->
    vwap = st.vwap_num / max(st.vwap_den, 1.0e-9)
    {sym, %{open: st.open, high: st.high, low: st.low, close: st.close, vwap: vwap}}
  end)
  |> Enum.into(%{})

%{took: Metrics.human_us(us), summary: summary}

4) Parallel “chunk analytics” with parallel_chunk_map/3

Compute per-chunk mini-summaries, then reduce them. This mirrors how you’d scale on big datasets.

chunk_summary = fn {_chunk_index, chunk_bin} ->
  floats = Pack.unpack(chunk_bin, :float64)

  # per chunk, compute OHLC per symbol (ignore VWAP to keep it short)
  init =
    for sym <- symbols, into: %{} do
      {sym, %{open: nil, high: -1.0e99, low: 1.0e99, close: nil}}
    end

  floats
  |> Enum.chunk_every(s * f)
  |> Enum.reduce(init, fn row, acc ->
    Enum.reduce(Enum.with_index(symbols), acc, fn {sym, si}, a2 ->
      base = si * f
      price = Enum.at(row, base + field_idx.price)
      st = Map.fetch!(a2, sym)

      st =
        st
        |> Map.update!(:open, fn o -> o || price end)
        |> Map.update!(:high, fn h -> max(h, price) end)
        |> Map.update!(:low, fn l -> min(l, price) end)
        |> Map.put(:close, price)

      Map.put(a2, sym, st)
    end)
  end)
end

combine = fn a, b ->
  for sym <- symbols, into: %{} do
    sa = Map.fetch!(a, sym)
    sb = Map.fetch!(b, sym)

    {
      sym,
      %{
        open: sa.open || sb.open,
        high: max(sa.high, sb.high),
        low: min(sa.low, sb.low),
        close: sb.close || sa.close
      }
    }
  end
end

{chunk_summaries, us} =
  Metrics.time(fn ->
    Array.parallel_chunk_map(cube, chunk_summary, max_concurrency: 8, ordered: true)
    |> Enum.to_list()
  end)

reduced =
  Enum.reduce(chunk_summaries, hd(chunk_summaries), fn s1, acc -> combine.(acc, s1) end)

%{chunks: length(chunk_summaries), took: Metrics.human_us(us), reduced: reduced}

Next ideas

  • Order book snapshots (4D arrays)
  • Risk grids (scenario × factor × portfolio)
  • Crypto cross-venue OHLCV cubes (venue × symbol × time × field)