bloccs · 04 — Aggregation: batch, join & rate
Mix.install([
{:bloccs, "~> 0.9"}
])
What this covers
Three primitives that hold state or bend time — declared in the manifest, run by the generated Broadway topology, with no bookkeeping in your node code.
-
Batch —
[batch]collects messages into a window (by count or time); the pure core receives the list. -
Join —
[join]correlates arrivals across two in-ports by a key; a matched pair emits once, an unmatched side is dead-lettered after a timeout. -
Rate + delay —
[rate]throttles a node’s producer;[delay]paces it.
Routing primitives (filter, split, merge) are in
03_routing_primitives; the run loop is in
01_first_network.
Shared setup
All schemas and manifests for the three networks, written once.
Bloccs.Schema.register("Event@1", id: :string)
Bloccs.Schema.register("Summary@1", count: :integer, ids: {:list, :string})
Bloccs.Schema.register("Order@1", order_id: :string, item: :string)
Bloccs.Schema.register("Payment@1", order_id: :string, amount: :integer)
Bloccs.Schema.register("Reconciled@1", order_id: :string, amount: :integer)
Bloccs.Schema.register("Unmatched@1", key: :string, present: {:list, :string}, payloads: :map)
Bloccs.Schema.register("X@1", n: :integer)
dir = "/tmp/bloccs_aggregation"
File.mkdir_p!(dir)
agg = %{
# ---- batch ----
"rollup.bloccs" => """
[node]
id = "rollup"
version = "0.1.0"
kind = "transform"
[ports.in]
events = { schema = "Event@1" }
[ports.out]
summary = { schema = "Summary@1" }
[effects]
[batch]
size = 3
timeout_ms = 200
[contract]
pure_core = "BloccsNotebook.Agg.Rollup.transform/2"
effect_shell = "BloccsNotebook.Agg.Rollup.execute/2"
""",
"agg_collect.bloccs" => """
[node]
id = "agg_collect"
version = "0.1.0"
kind = "sink"
[ports.in]
summary = { schema = "Summary@1" }
[ports.out]
done = { schema = "Summary@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Agg.Collect.transform/2"
effect_shell = "BloccsNotebook.Agg.Collect.execute/2"
""",
"aggregate.bloccs" => """
[network]
id = "aggregate"
version = "0.1.0"
runtime = "beam"
[nodes]
rollup = { use = "rollup.bloccs" }
collect = { use = "agg_collect.bloccs" }
[[edges]]
from = "rollup.summary"
to = "collect.summary"
[expose]
in = { events = "rollup.events" }
out = { summaries = "collect.done" }
[supervision]
strategy = "one_for_one"
""",
# ---- join ----
"reconcile.bloccs" => """
[node]
id = "reconcile"
version = "0.1.0"
kind = "transform"
[ports.in]
left = { schema = "Order@1" }
right = { schema = "Payment@1" }
[ports.out]
joined = { schema = "Reconciled@1" }
deadletter = { schema = "Unmatched@1" }
[effects]
[join]
on = "order_id"
timeout_ms = 200
deadletter = "deadletter"
[contract]
pure_core = "BloccsNotebook.Agg.Reconcile.transform/2"
effect_shell = "BloccsNotebook.Agg.Reconcile.execute/2"
""",
"reconcile_sink.bloccs" => """
[node]
id = "reconcile_sink"
version = "0.1.0"
kind = "sink"
[ports.in]
rec = { schema = "Reconciled@1" }
[ports.out]
done = { schema = "Reconciled@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Agg.ReconcileSink.transform/2"
effect_shell = "BloccsNotebook.Agg.ReconcileSink.execute/2"
""",
"dead_sink.bloccs" => """
[node]
id = "dead_sink"
version = "0.1.0"
kind = "sink"
[ports.in]
env = { schema = "Unmatched@1" }
[ports.out]
dead = { schema = "Unmatched@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Agg.DeadSink.transform/2"
effect_shell = "BloccsNotebook.Agg.DeadSink.execute/2"
""",
"join.bloccs" => """
[network]
id = "join"
version = "0.1.0"
runtime = "beam"
[nodes]
reconcile = { use = "reconcile.bloccs" }
reconcile_sink = { use = "reconcile_sink.bloccs" }
dead_sink = { use = "dead_sink.bloccs" }
[[edges]]
from = "reconcile.joined"
to = "reconcile_sink.rec"
[[edges]]
from = "reconcile.deadletter"
to = "dead_sink.env"
[expose]
in = { left = "reconcile.left", right = "reconcile.right" }
out = { reconciled = "reconcile_sink.done", dead = "dead_sink.dead" }
[supervision]
strategy = "one_for_one"
""",
# ---- rate ----
"limited.bloccs" => """
[node]
id = "limited"
version = "0.1.0"
kind = "source"
[ports.in]
inp = { schema = "X@1" }
[ports.out]
out = { schema = "X@1" }
[effects]
[rate]
allowed = 10
interval_ms = 1000
[delay]
ms = 50
[contract]
pure_core = "BloccsNotebook.Agg.Limited.transform/2"
effect_shell = "BloccsNotebook.Agg.Limited.execute/2"
""",
"rate_sink.bloccs" => """
[node]
id = "rate_sink"
version = "0.1.0"
kind = "sink"
[ports.in]
inp = { schema = "X@1" }
[ports.out]
out = { schema = "X@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Agg.RateSink.transform/2"
effect_shell = "BloccsNotebook.Agg.RateSink.execute/2"
""",
"rate.bloccs" => """
[network]
id = "rate"
version = "0.1.0"
runtime = "beam"
[nodes]
limited = { use = "limited.bloccs" }
rate_sink = { use = "rate_sink.bloccs" }
[[edges]]
from = "limited.out"
to = "rate_sink.inp"
[expose]
in = { input = "limited.inp" }
out = { output = "rate_sink.out" }
[supervision]
strategy = "one_for_one"
"""
}
for {name, body} <- agg, do: File.write!(Path.join(dir, name), body)
File.ls!(dir)
Batch — a windowed aggregate
rollup declares [batch] size = 3, timeout_ms = 200. The runtime collects
events into a window and flushes when it hits 3 events or 200ms,
whichever comes first. The pure core of a batch node receives the whole window —
a list of payloads — and reduces it to one summary.
events ─▶ [rollup] ──summary──▶ [collect] ─▶ summaries
([batch] size 3 (reduces the list
/ 200ms window) to one summary)
defmodule BloccsNotebook.Agg.Rollup do
use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/rollup.bloccs"
# A [batch] node's pure core receives the whole window (a list of payloads).
def transform(events, _ctx) when is_list(events) do
{:ok, %{"count" => length(events), "ids" => Enum.map(events, & &1["id"])}}
end
def execute(summary, _ctx), do: {:emit, :summary, summary}
end
defmodule BloccsNotebook.Agg.Collect do
use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/agg_collect.bloccs"
def transform(d, _ctx), do: {:ok, d}
def execute(d, _ctx), do: {:emit, :done, d}
end
{:ok, agg_net} = Bloccs.Parser.parse_network("/tmp/bloccs_aggregation/aggregate.bloccs")
:ok = Bloccs.Validator.validate_network(agg_net)
{:ok, agg_sup} = Bloccs.Compiler.compile_and_load(agg_net)
{:ok, _} = agg_sup.start_link([])
Bloccs.Router.register_sink(:aggregate, :collect, :done, self())
events = Bloccs.Router.producer_name(:aggregate, :rollup, :events)
for id <- ["e1", "e2", "e3"], do: Bloccs.Producer.push(events, %{"id" => id})
receive do
{:bloccs_sink, :aggregate, :collect, :done, summary} -> summary
after
1_500 -> "(timeout)"
end
Three events in, one summary out: %{"count" => 3, "ids" => ["e1", "e2", "e3"]}. The count window filled and flushed. Push only two and the 200ms
timeout flushes a partial window instead — no batch is ever stranded.
Join — correlate two inputs by key
reconcile declares [join] on = "order_id" with two typed in-ports (left: Order@1, right: Payment@1). The runtime buffers arrivals and correlates them
by order_id. A matched pair flows into the pure core as %{left: ..., right: ...}; an unmatched side is dead-lettered after timeout_ms.
left (Order) ─┐
├─[reconcile]─joined──────▶ [reconcile_sink] ─▶ reconciled
right (Payment)─┘ ([join] on order_id)
└─deadletter (unmatched after 200ms)──▶ [dead_sink] ─▶ dead
defmodule BloccsNotebook.Agg.Reconcile do
use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/reconcile.bloccs"
# A [join] node's pure core receives the correlated set, keyed by in-port.
def transform(%{left: order, right: payment}, _ctx) do
{:ok, %{"order_id" => order["order_id"], "amount" => payment["amount"]}}
end
def execute(reconciled, _ctx), do: {:emit, :joined, reconciled}
end
defmodule BloccsNotebook.Agg.ReconcileSink do
use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/reconcile_sink.bloccs"
def transform(d, _ctx), do: {:ok, d}
def execute(d, _ctx), do: {:emit, :done, d}
end
defmodule BloccsNotebook.Agg.DeadSink do
use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/dead_sink.bloccs"
def transform(d, _ctx), do: {:ok, d}
def execute(d, _ctx), do: {:emit, :dead, d}
end
{:ok, join_net} = Bloccs.Parser.parse_network("/tmp/bloccs_aggregation/join.bloccs")
:ok = Bloccs.Validator.validate_network(join_net)
{:ok, join_sup} = Bloccs.Compiler.compile_and_load(join_net)
{:ok, _} = join_sup.start_link([])
Bloccs.Join.reset()
Bloccs.Router.register_sink(:join, :reconcile_sink, :done, self())
Bloccs.Router.register_sink(:join, :dead_sink, :dead, self())
left = Bloccs.Router.producer_name(:join, :reconcile, :left)
right = Bloccs.Router.producer_name(:join, :reconcile, :right)
# A matched pair (same order_id) → one joined record.
Bloccs.Producer.push(left, %{"order_id" => "o1", "item" => "book"})
Bloccs.Producer.push(right, %{"order_id" => "o1", "amount" => 30})
matched =
receive do
{:bloccs_sink, :join, :reconcile_sink, :done, rec} -> rec
after
1_500 -> "(timeout)"
end
# An unmatched left with no partner → dead-lettered after the join timeout.
Bloccs.Producer.push(left, %{"order_id" => "o2", "item" => "pen"})
Process.sleep(250)
Bloccs.Join.sweep_now()
deadlettered =
receive do
{:bloccs_sink, :join, :dead_sink, :dead, env} -> env
after
1_500 -> "(timeout)"
end
%{matched: matched, deadlettered: deadlettered}
The matched pair emits %{"order_id" => "o1", "amount" => 30} — left and
right correlated into one record (out-of-order arrivals match too; the buffer
does not care which side lands first). The unmatched o2 produced an envelope
%{"key" => "o2", "present" => ["left"], ...} on the deadletter port — nothing
silently lost. Bloccs.Join.sweep_now/0 forces the timeout sweep here so the
notebook does not wait on the background timer.
Rate + delay — throttle a node
limited declares [rate] allowed = 10, interval_ms = 1000 (at most 10
messages/second) and [delay] ms = 50 (pace the producer). These compile into
the generated Broadway producer — [rate] as rate_limiting, [delay] as a
producer delay — so throttling is configuration, not code in the node.
defmodule BloccsNotebook.Agg.Limited do
use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/limited.bloccs"
def transform(d, _ctx), do: {:ok, d}
def execute(d, _ctx), do: {:emit, :out, d}
end
defmodule BloccsNotebook.Agg.RateSink do
use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/rate_sink.bloccs"
def transform(d, _ctx), do: {:ok, d}
def execute(d, _ctx), do: {:emit, :out, d}
end
{:ok, rate_net} = Bloccs.Parser.parse_network("/tmp/bloccs_aggregation/rate.bloccs")
:ok = Bloccs.Validator.validate_network(rate_net)
{:ok, rate_sup} = Bloccs.Compiler.compile_and_load(rate_net)
{:ok, _} = rate_sup.start_link([])
# The [rate]/[delay] blocks parsed straight onto the node manifest:
{_id, limited} = Enum.find(rate_net.nodes, fn {id, _} -> id == :limited end)
config = %{rate: limited.manifest.rate, delay_ms: limited.manifest.delay_ms}
Bloccs.Router.register_sink(:rate, :rate_sink, :out, self())
Bloccs.Producer.push(Bloccs.Router.producer_name(:rate, :limited, :inp), %{"n" => 1})
flowed =
receive do
{:bloccs_sink, :rate, :rate_sink, :out, m} -> m
after
1_500 -> "(timeout)"
end
%{parsed_config: config, flowed: flowed}
The manifest’s [rate] parsed to %Bloccs.Manifest.Rate{allowed: 10, interval_ms: 1000} and [delay] to delay_ms: 50 — the throttle the generated
producer enforces. The message still flows through; the cap shapes how fast,
not whether.
Where next
-
02_events_webhook—enrichpairs these ideas withretry,timeout, andidempotencyin one realistic node. -
The guides — the manifest reference for
[batch],[join],[rate], and[delay], plus how each maps onto the Broadway topology.