ExZarr 05.01 — Finance: tick data cube (time × symbol × field)
We’ll generate synthetic tick data and store it as a 3D array:
shape = {T, S, F}
-
T: ticks (time index) -
S: symbols -
F: fields (price, size, bid, ask)
Then we’ll compute OHLC and VWAP using chunk streaming and parallel chunk maps.
Setup
Mix.install([{:ex_zarr, path: ".."}])
alias ExZarr.Array
alias ExZarr.Gallery.{Pack, Metrics}
1) Generate synthetic tick data
symbols = ["AAPL", "MSFT", "NVDA", "SPY"]
fields = [:price, :size, :bid, :ask]
t = 10_000
s = length(symbols)
f = length(fields)
# deterministic-ish RNG
:rand.seed(:exsss, {1, 2, 3})
gen_tick = fn base ->
price = base + (:rand.normal() * 0.05)
size = 1.0 + :rand.uniform() * 99.0
spread = 0.01 + :rand.uniform() * 0.05
bid = price - spread / 2
ask = price + spread / 2
[price, size, bid, ask]
end
# layout: row-major over {t, s, f}
data =
for ti <- 0..(t - 1),
{sym, si} <- Enum.with_index(symbols),
fi <- 0..(f - 1),
reduce: [] do
acc ->
base = 100.0 + si * 10.0 + ti * 0.0001
tick = gen_tick.(base)
acc ++ [Enum.at(tick, fi)]
end
bin = Pack.pack(data, :float64)
{t, s, f, byte_size(bin)}
2) Store in ExZarr
Chunk by time windows (e.g. 500 ticks). This makes window analytics efficient.
chunks = {500, s, f}
{:ok, cube} =
Array.create(
shape: {t, s, f},
chunks: chunks,
dtype: :float64,
compressor: :zstd,
storage: :memory
)
:ok =
Array.set_slice(cube, bin,
start: {0, 0, 0},
stop: {t, s, f}
)
%{shape: cube.shape, chunks: cube.chunks, dtype: cube.dtype}
3) Compute OHLC + VWAP (per symbol) over the whole sample
We’ll stream chunks and aggregate.
Field order: [price, size, bid, ask]
field_idx = %{price: 0, size: 1, bid: 2, ask: 3}
init_stats =
for sym <- symbols, into: %{} do
{sym, %{open: nil, high: -1.0e99, low: 1.0e99, close: nil, vwap_num: 0.0, vwap_den: 0.0}}
end
update_stats = fn stats, row ->
# row: list length s*f, grouped by symbol then field
Enum.reduce(Enum.with_index(symbols), stats, fn {sym, si}, acc ->
base = si * f
price = Enum.at(row, base + field_idx.price)
size = Enum.at(row, base + field_idx.size)
st = Map.fetch!(acc, sym)
st =
st
|> Map.update!(:open, fn o -> o || price end)
|> Map.update!(:high, fn h -> max(h, price) end)
|> Map.update!(:low, fn l -> min(l, price) end)
|> Map.put(:close, price)
|> Map.update!(:vwap_num, fn n -> n + price * size end)
|> Map.update!(:vwap_den, fn d -> d + size end)
Map.put(acc, sym, st)
end)
end
{final, us} =
Metrics.time(fn ->
Array.chunk_stream(cube, parallel: 4, ordered: true)
|> Enum.reduce(init_stats, fn {_chunk_index, chunk_bin}, stats ->
floats = Pack.unpack(chunk_bin, :float64)
floats
|> Enum.chunk_every(s * f)
|> Enum.reduce(stats, fn row, acc -> update_stats.(acc, row) end)
end)
end)
summary =
final
|> Enum.map(fn {sym, st} ->
vwap = st.vwap_num / max(st.vwap_den, 1.0e-9)
{sym, %{open: st.open, high: st.high, low: st.low, close: st.close, vwap: vwap}}
end)
|> Enum.into(%{})
%{took: Metrics.human_us(us), summary: summary}
4) Parallel “chunk analytics” with parallel_chunk_map/3
Compute per-chunk mini-summaries, then reduce them. This mirrors how you’d scale on big datasets.
chunk_summary = fn {_chunk_index, chunk_bin} ->
floats = Pack.unpack(chunk_bin, :float64)
# per chunk, compute OHLC per symbol (ignore VWAP to keep it short)
init =
for sym <- symbols, into: %{} do
{sym, %{open: nil, high: -1.0e99, low: 1.0e99, close: nil}}
end
floats
|> Enum.chunk_every(s * f)
|> Enum.reduce(init, fn row, acc ->
Enum.reduce(Enum.with_index(symbols), acc, fn {sym, si}, a2 ->
base = si * f
price = Enum.at(row, base + field_idx.price)
st = Map.fetch!(a2, sym)
st =
st
|> Map.update!(:open, fn o -> o || price end)
|> Map.update!(:high, fn h -> max(h, price) end)
|> Map.update!(:low, fn l -> min(l, price) end)
|> Map.put(:close, price)
Map.put(a2, sym, st)
end)
end)
end
combine = fn a, b ->
for sym <- symbols, into: %{} do
sa = Map.fetch!(a, sym)
sb = Map.fetch!(b, sym)
{
sym,
%{
open: sa.open || sb.open,
high: max(sa.high, sb.high),
low: min(sa.low, sb.low),
close: sb.close || sa.close
}
}
end
end
{chunk_summaries, us} =
Metrics.time(fn ->
Array.parallel_chunk_map(cube, chunk_summary, max_concurrency: 8, ordered: true)
|> Enum.to_list()
end)
reduced =
Enum.reduce(chunk_summaries, hd(chunk_summaries), fn s1, acc -> combine.(acc, s1) end)
%{chunks: length(chunk_summaries), took: Metrics.human_us(us), reduced: reduced}
Next ideas
- Order book snapshots (4D arrays)
- Risk grids (scenario × factor × portfolio)
- Crypto cross-venue OHLCV cubes (venue × symbol × time × field)