Powered by AppSignal & Oban Pro

bloccs · 04 — Aggregation: batch, join & rate

04_aggregation_primitives.livemd

bloccs · 04 — Aggregation: batch, join & rate

Mix.install([
  {:bloccs, "~> 0.9"}
])

What this covers

Three primitives that hold state or bend time — declared in the manifest, run by the generated Broadway topology, with no bookkeeping in your node code.

  • Batch[batch] collects messages into a window (by count or time); the pure core receives the list.
  • Join[join] correlates arrivals across two in-ports by a key; a matched pair emits once, an unmatched side is dead-lettered after a timeout.
  • Rate + delay[rate] throttles a node’s producer; [delay] paces it.

Routing primitives (filter, split, merge) are in 03_routing_primitives; the run loop is in 01_first_network.

Shared setup

All schemas and manifests for the three networks, written once.

Bloccs.Schema.register("Event@1", id: :string)
Bloccs.Schema.register("Summary@1", count: :integer, ids: {:list, :string})
Bloccs.Schema.register("Order@1", order_id: :string, item: :string)
Bloccs.Schema.register("Payment@1", order_id: :string, amount: :integer)
Bloccs.Schema.register("Reconciled@1", order_id: :string, amount: :integer)
Bloccs.Schema.register("Unmatched@1", key: :string, present: {:list, :string}, payloads: :map)
Bloccs.Schema.register("X@1", n: :integer)
dir = "/tmp/bloccs_aggregation"
File.mkdir_p!(dir)

agg = %{
  # ---- batch ----
  "rollup.bloccs" => """
  [node]
  id = "rollup"
  version = "0.1.0"
  kind = "transform"
  [ports.in]
  events = { schema = "Event@1" }
  [ports.out]
  summary = { schema = "Summary@1" }
  [effects]
  [batch]
  size       = 3
  timeout_ms = 200
  [contract]
  pure_core    = "BloccsNotebook.Agg.Rollup.transform/2"
  effect_shell = "BloccsNotebook.Agg.Rollup.execute/2"
  """,
  "agg_collect.bloccs" => """
  [node]
  id = "agg_collect"
  version = "0.1.0"
  kind = "sink"
  [ports.in]
  summary = { schema = "Summary@1" }
  [ports.out]
  done = { schema = "Summary@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Agg.Collect.transform/2"
  effect_shell = "BloccsNotebook.Agg.Collect.execute/2"
  """,
  "aggregate.bloccs" => """
  [network]
  id = "aggregate"
  version = "0.1.0"
  runtime = "beam"
  [nodes]
  rollup  = { use = "rollup.bloccs" }
  collect = { use = "agg_collect.bloccs" }
  [[edges]]
  from = "rollup.summary"
  to   = "collect.summary"
  [expose]
  in  = { events = "rollup.events" }
  out = { summaries = "collect.done" }
  [supervision]
  strategy = "one_for_one"
  """,
  # ---- join ----
  "reconcile.bloccs" => """
  [node]
  id = "reconcile"
  version = "0.1.0"
  kind = "transform"
  [ports.in]
  left  = { schema = "Order@1" }
  right = { schema = "Payment@1" }
  [ports.out]
  joined     = { schema = "Reconciled@1" }
  deadletter = { schema = "Unmatched@1" }
  [effects]
  [join]
  on         = "order_id"
  timeout_ms = 200
  deadletter = "deadletter"
  [contract]
  pure_core    = "BloccsNotebook.Agg.Reconcile.transform/2"
  effect_shell = "BloccsNotebook.Agg.Reconcile.execute/2"
  """,
  "reconcile_sink.bloccs" => """
  [node]
  id = "reconcile_sink"
  version = "0.1.0"
  kind = "sink"
  [ports.in]
  rec = { schema = "Reconciled@1" }
  [ports.out]
  done = { schema = "Reconciled@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Agg.ReconcileSink.transform/2"
  effect_shell = "BloccsNotebook.Agg.ReconcileSink.execute/2"
  """,
  "dead_sink.bloccs" => """
  [node]
  id = "dead_sink"
  version = "0.1.0"
  kind = "sink"
  [ports.in]
  env = { schema = "Unmatched@1" }
  [ports.out]
  dead = { schema = "Unmatched@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Agg.DeadSink.transform/2"
  effect_shell = "BloccsNotebook.Agg.DeadSink.execute/2"
  """,
  "join.bloccs" => """
  [network]
  id = "join"
  version = "0.1.0"
  runtime = "beam"
  [nodes]
  reconcile      = { use = "reconcile.bloccs" }
  reconcile_sink = { use = "reconcile_sink.bloccs" }
  dead_sink      = { use = "dead_sink.bloccs" }
  [[edges]]
  from = "reconcile.joined"
  to   = "reconcile_sink.rec"
  [[edges]]
  from = "reconcile.deadletter"
  to   = "dead_sink.env"
  [expose]
  in  = { left = "reconcile.left", right = "reconcile.right" }
  out = { reconciled = "reconcile_sink.done", dead = "dead_sink.dead" }
  [supervision]
  strategy = "one_for_one"
  """,
  # ---- rate ----
  "limited.bloccs" => """
  [node]
  id = "limited"
  version = "0.1.0"
  kind = "source"
  [ports.in]
  inp = { schema = "X@1" }
  [ports.out]
  out = { schema = "X@1" }
  [effects]
  [rate]
  allowed     = 10
  interval_ms = 1000
  [delay]
  ms = 50
  [contract]
  pure_core    = "BloccsNotebook.Agg.Limited.transform/2"
  effect_shell = "BloccsNotebook.Agg.Limited.execute/2"
  """,
  "rate_sink.bloccs" => """
  [node]
  id = "rate_sink"
  version = "0.1.0"
  kind = "sink"
  [ports.in]
  inp = { schema = "X@1" }
  [ports.out]
  out = { schema = "X@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Agg.RateSink.transform/2"
  effect_shell = "BloccsNotebook.Agg.RateSink.execute/2"
  """,
  "rate.bloccs" => """
  [network]
  id = "rate"
  version = "0.1.0"
  runtime = "beam"
  [nodes]
  limited   = { use = "limited.bloccs" }
  rate_sink = { use = "rate_sink.bloccs" }
  [[edges]]
  from = "limited.out"
  to   = "rate_sink.inp"
  [expose]
  in  = { input  = "limited.inp" }
  out = { output = "rate_sink.out" }
  [supervision]
  strategy = "one_for_one"
  """
}

for {name, body} <- agg, do: File.write!(Path.join(dir, name), body)
File.ls!(dir)

Batch — a windowed aggregate

rollup declares [batch] size = 3, timeout_ms = 200. The runtime collects events into a window and flushes when it hits 3 events or 200ms, whichever comes first. The pure core of a batch node receives the whole window — a list of payloads — and reduces it to one summary.

events ─▶ [rollup] ──summary──▶ [collect] ─▶ summaries
        ([batch] size 3        (reduces the list
         / 200ms window)        to one summary)
defmodule BloccsNotebook.Agg.Rollup do
  use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/rollup.bloccs"

  # A [batch] node's pure core receives the whole window (a list of payloads).
  def transform(events, _ctx) when is_list(events) do
    {:ok, %{"count" => length(events), "ids" => Enum.map(events, & &1["id"])}}
  end

  def execute(summary, _ctx), do: {:emit, :summary, summary}
end

defmodule BloccsNotebook.Agg.Collect do
  use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/agg_collect.bloccs"
  def transform(d, _ctx), do: {:ok, d}
  def execute(d, _ctx), do: {:emit, :done, d}
end
{:ok, agg_net} = Bloccs.Parser.parse_network("/tmp/bloccs_aggregation/aggregate.bloccs")
:ok = Bloccs.Validator.validate_network(agg_net)
{:ok, agg_sup} = Bloccs.Compiler.compile_and_load(agg_net)
{:ok, _} = agg_sup.start_link([])

Bloccs.Router.register_sink(:aggregate, :collect, :done, self())

events = Bloccs.Router.producer_name(:aggregate, :rollup, :events)
for id <- ["e1", "e2", "e3"], do: Bloccs.Producer.push(events, %{"id" => id})

receive do
  {:bloccs_sink, :aggregate, :collect, :done, summary} -> summary
after
  1_500 -> "(timeout)"
end

Three events in, one summary out: %{"count" => 3, "ids" => ["e1", "e2", "e3"]}. The count window filled and flushed. Push only two and the 200ms timeout flushes a partial window instead — no batch is ever stranded.

Join — correlate two inputs by key

reconcile declares [join] on = "order_id" with two typed in-ports (left: Order@1, right: Payment@1). The runtime buffers arrivals and correlates them by order_id. A matched pair flows into the pure core as %{left: ..., right: ...}; an unmatched side is dead-lettered after timeout_ms.

left  (Order)  ─┐
                ├─[reconcile]─joined──────▶ [reconcile_sink] ─▶ reconciled
right (Payment)─┘ ([join] on order_id)
                       └─deadletter (unmatched after 200ms)──▶ [dead_sink] ─▶ dead
defmodule BloccsNotebook.Agg.Reconcile do
  use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/reconcile.bloccs"

  # A [join] node's pure core receives the correlated set, keyed by in-port.
  def transform(%{left: order, right: payment}, _ctx) do
    {:ok, %{"order_id" => order["order_id"], "amount" => payment["amount"]}}
  end

  def execute(reconciled, _ctx), do: {:emit, :joined, reconciled}
end

defmodule BloccsNotebook.Agg.ReconcileSink do
  use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/reconcile_sink.bloccs"
  def transform(d, _ctx), do: {:ok, d}
  def execute(d, _ctx), do: {:emit, :done, d}
end

defmodule BloccsNotebook.Agg.DeadSink do
  use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/dead_sink.bloccs"
  def transform(d, _ctx), do: {:ok, d}
  def execute(d, _ctx), do: {:emit, :dead, d}
end
{:ok, join_net} = Bloccs.Parser.parse_network("/tmp/bloccs_aggregation/join.bloccs")
:ok = Bloccs.Validator.validate_network(join_net)
{:ok, join_sup} = Bloccs.Compiler.compile_and_load(join_net)
{:ok, _} = join_sup.start_link([])

Bloccs.Join.reset()
Bloccs.Router.register_sink(:join, :reconcile_sink, :done, self())
Bloccs.Router.register_sink(:join, :dead_sink, :dead, self())

left = Bloccs.Router.producer_name(:join, :reconcile, :left)
right = Bloccs.Router.producer_name(:join, :reconcile, :right)

# A matched pair (same order_id) → one joined record.
Bloccs.Producer.push(left, %{"order_id" => "o1", "item" => "book"})
Bloccs.Producer.push(right, %{"order_id" => "o1", "amount" => 30})

matched =
  receive do
    {:bloccs_sink, :join, :reconcile_sink, :done, rec} -> rec
  after
    1_500 -> "(timeout)"
  end

# An unmatched left with no partner → dead-lettered after the join timeout.
Bloccs.Producer.push(left, %{"order_id" => "o2", "item" => "pen"})
Process.sleep(250)
Bloccs.Join.sweep_now()

deadlettered =
  receive do
    {:bloccs_sink, :join, :dead_sink, :dead, env} -> env
  after
    1_500 -> "(timeout)"
  end

%{matched: matched, deadlettered: deadlettered}

The matched pair emits %{"order_id" => "o1", "amount" => 30}left and right correlated into one record (out-of-order arrivals match too; the buffer does not care which side lands first). The unmatched o2 produced an envelope %{"key" => "o2", "present" => ["left"], ...} on the deadletter port — nothing silently lost. Bloccs.Join.sweep_now/0 forces the timeout sweep here so the notebook does not wait on the background timer.

Rate + delay — throttle a node

limited declares [rate] allowed = 10, interval_ms = 1000 (at most 10 messages/second) and [delay] ms = 50 (pace the producer). These compile into the generated Broadway producer — [rate] as rate_limiting, [delay] as a producer delay — so throttling is configuration, not code in the node.

defmodule BloccsNotebook.Agg.Limited do
  use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/limited.bloccs"
  def transform(d, _ctx), do: {:ok, d}
  def execute(d, _ctx), do: {:emit, :out, d}
end

defmodule BloccsNotebook.Agg.RateSink do
  use Bloccs.Node, manifest: "/tmp/bloccs_aggregation/rate_sink.bloccs"
  def transform(d, _ctx), do: {:ok, d}
  def execute(d, _ctx), do: {:emit, :out, d}
end
{:ok, rate_net} = Bloccs.Parser.parse_network("/tmp/bloccs_aggregation/rate.bloccs")
:ok = Bloccs.Validator.validate_network(rate_net)
{:ok, rate_sup} = Bloccs.Compiler.compile_and_load(rate_net)
{:ok, _} = rate_sup.start_link([])

# The [rate]/[delay] blocks parsed straight onto the node manifest:
{_id, limited} = Enum.find(rate_net.nodes, fn {id, _} -> id == :limited end)
config = %{rate: limited.manifest.rate, delay_ms: limited.manifest.delay_ms}

Bloccs.Router.register_sink(:rate, :rate_sink, :out, self())
Bloccs.Producer.push(Bloccs.Router.producer_name(:rate, :limited, :inp), %{"n" => 1})

flowed =
  receive do
    {:bloccs_sink, :rate, :rate_sink, :out, m} -> m
  after
    1_500 -> "(timeout)"
  end

%{parsed_config: config, flowed: flowed}

The manifest’s [rate] parsed to %Bloccs.Manifest.Rate{allowed: 10, interval_ms: 1000} and [delay] to delay_ms: 50 — the throttle the generated producer enforces. The message still flows through; the cap shapes how fast, not whether.

Where next

  • 02_events_webhookenrich pairs these ideas with retry, timeout, and idempotency in one realistic node.
  • The guides — the manifest reference for [batch], [join], [rate], and [delay], plus how each maps onto the Broadway topology.