bloccs · 03 — Routing: filter, split & merge
Mix.install([
{:bloccs, "~> 0.9"}
])
What this covers
Three routing primitives — how messages are removed, fanned out, and fanned in. None of them needs a special construct; they fall out of what a node’s effect shell returns and how edges are wired.
-
Filter — a node returns
:drop; the message is consumed and emits nothing. - Split — a node emits to several out-ports in one invocation.
- Merge (fan-in) — several out-ports edge into one in-port.
This notebook assumes the parse → validate → compile → run loop from
01_first_network. The next notebook,
04_aggregation_primitives, covers the stateful ones: batch, join, and rate.
Filter + split in one node
A single gate node does both. Its pure core passes the message through; its
effect shell decides:
-
a blank message returns
:drop— filtered out, nothing emitted (a[:bloccs, :node, :dropped]telemetry event fires); -
anything else returns
{:emit, [{:kept, ...}, {:audit, ...}]}— split to both out-ports, each with a distinct payload.
(blank) ✘ dropped
message ─▶ [gate] ┤
(pure) ├──kept───▶ [keep_sink] ─▶ stored
└──audit──▶ [audit_sink] ─▶ logged
Schemas
Bloccs.Schema.register("Message@1", id: :string, text: :string)
Bloccs.Schema.register("Audit@1", id: :string, note: :string)
Bloccs.Schema.register("Receipt@1", id: :string, outcome: :string)
Bloccs.Schema.register("Msg@1", id: :string)
Manifests
All manifests for this notebook go to one directory so the node modules can reference them by literal path.
dir = "/tmp/bloccs_routing"
File.mkdir_p!(dir)
routing = %{
"gate.bloccs" => """
[node]
id = "gate"
version = "0.1.0"
kind = "transform"
[doc]
intent = "Filter out blank messages (:drop); split the rest to kept + audit."
[ports.in]
message = { schema = "Message@1" }
[ports.out]
kept = { schema = "Message@1" }
audit = { schema = "Audit@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Routing.Gate.transform/2"
effect_shell = "BloccsNotebook.Routing.Gate.execute/2"
""",
"keep_sink.bloccs" => """
[node]
id = "keep_sink"
version = "0.1.0"
kind = "sink"
[ports.in]
item = { schema = "Message@1" }
[ports.out]
stored = { schema = "Receipt@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Routing.KeepSink.transform/2"
effect_shell = "BloccsNotebook.Routing.KeepSink.execute/2"
""",
"audit_sink.bloccs" => """
[node]
id = "audit_sink"
version = "0.1.0"
kind = "sink"
[ports.in]
entry = { schema = "Audit@1" }
[ports.out]
logged = { schema = "Receipt@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Routing.AuditSink.transform/2"
effect_shell = "BloccsNotebook.Routing.AuditSink.execute/2"
""",
"filtersplit.bloccs" => """
[network]
id = "filtersplit"
version = "0.1.0"
runtime = "beam"
[nodes]
gate = { use = "gate.bloccs" }
keep_sink = { use = "keep_sink.bloccs" }
audit_sink = { use = "audit_sink.bloccs" }
[[edges]]
from = "gate.kept"
to = "keep_sink.item"
[[edges]]
from = "gate.audit"
to = "audit_sink.entry"
[expose]
in = { message = "gate.message" }
out = { stored = "keep_sink.stored", logged = "audit_sink.logged" }
[supervision]
strategy = "one_for_one"
""",
"left.bloccs" => """
[node]
id = "merge_left"
version = "0.1.0"
kind = "source"
[ports.in]
req = { schema = "Msg@1" }
[ports.out]
out = { schema = "Msg@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Routing.Left.transform/2"
effect_shell = "BloccsNotebook.Routing.Left.execute/2"
""",
"right.bloccs" => """
[node]
id = "merge_right"
version = "0.1.0"
kind = "source"
[ports.in]
req = { schema = "Msg@1" }
[ports.out]
out = { schema = "Msg@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Routing.Right.transform/2"
effect_shell = "BloccsNotebook.Routing.Right.execute/2"
""",
"collect.bloccs" => """
[node]
id = "merge_collect"
version = "0.1.0"
kind = "sink"
[ports.in]
acc = { schema = "Msg@1" }
[ports.out]
done = { schema = "Msg@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Routing.Collect.transform/2"
effect_shell = "BloccsNotebook.Routing.Collect.execute/2"
""",
"merge.bloccs" => """
[network]
id = "merge"
version = "0.1.0"
runtime = "beam"
[nodes]
left = { use = "left.bloccs" }
right = { use = "right.bloccs" }
collect = { use = "collect.bloccs" }
# Fan-in: two distinct out-ports both edge into collect's single in-port.
[[edges]]
from = "left.out"
to = "collect.acc"
[[edges]]
from = "right.out"
to = "collect.acc"
[expose]
in = { left_in = "left.req", right_in = "right.req" }
out = { merged = "collect.done" }
[supervision]
strategy = "one_for_one"
"""
}
for {name, body} <- routing, do: File.write!(Path.join(dir, name), body)
File.ls!(dir)
The nodes
defmodule BloccsNotebook.Routing.Gate do
use Bloccs.Node, manifest: "/tmp/bloccs_routing/gate.bloccs"
def transform(message, _ctx), do: {:ok, message}
def execute(%{text: text} = message, _ctx) do
if String.trim(text) == "" do
:drop
else
audit = %{id: message.id, note: "#{String.length(text)} chars"}
{:emit, [{:kept, message}, {:audit, audit}]}
end
end
end
defmodule BloccsNotebook.Routing.KeepSink do
use Bloccs.Node, manifest: "/tmp/bloccs_routing/keep_sink.bloccs"
def transform(m, _ctx), do: {:ok, m}
def execute(m, _ctx), do: {:emit, :stored, %{id: m.id, outcome: "stored"}}
end
defmodule BloccsNotebook.Routing.AuditSink do
use Bloccs.Node, manifest: "/tmp/bloccs_routing/audit_sink.bloccs"
def transform(e, _ctx), do: {:ok, e}
def execute(e, _ctx), do: {:emit, :logged, %{id: e.id, outcome: "audited"}}
end
Run it
{:ok, fs_net} = Bloccs.Parser.parse_network("/tmp/bloccs_routing/filtersplit.bloccs")
:ok = Bloccs.Validator.validate_network(fs_net)
{:ok, fs_sup} = Bloccs.Compiler.compile_and_load(fs_net)
{:ok, _} = fs_sup.start_link([])
Bloccs.Router.register_sink(:filtersplit, :keep_sink, :stored, self())
Bloccs.Router.register_sink(:filtersplit, :audit_sink, :logged, self())
gate = Bloccs.Router.producer_name(:filtersplit, :gate, :message)
Bloccs.Producer.push(gate, %{id: "m1", text: "hello"})
Bloccs.Producer.push(gate, %{id: "m2", text: "world!!"})
# Blank → filtered out, produces nothing.
Bloccs.Producer.push(gate, %{id: "m3", text: " "})
# Two real messages × (kept + audit) = 4 sink messages; m3 is dropped.
for _ <- 1..4 do
receive do
{:bloccs_sink, :filtersplit, :keep_sink, :stored, r} -> "stored #{r.id}"
{:bloccs_sink, :filtersplit, :audit_sink, :logged, r} -> "logged #{r.id}"
after
1_500 -> "(timeout)"
end
end
|> Enum.sort()
You get four results — m1 and m2 each stored and logged. m3 produced
nothing: the gate dropped it before it reached either sink. One node both
removed a message from the flow and expanded the others into two streams.
Merge: fan-in to a single in-port
A v0.1 node has one in-port, but a port can be the target of many edges. Wire two sources’ out-ports into one collector in-port and they merge into a single stream — no merge node, no special construct. Delivery is interleaved and unordered.
[left] ─out─┐
├─▶ [collect] ─▶ merged
[right] ─out─┘
defmodule BloccsNotebook.Routing.Left do
use Bloccs.Node, manifest: "/tmp/bloccs_routing/left.bloccs"
def transform(d, _ctx), do: {:ok, d}
def execute(d, _ctx), do: {:emit, :out, d}
end
defmodule BloccsNotebook.Routing.Right do
use Bloccs.Node, manifest: "/tmp/bloccs_routing/right.bloccs"
def transform(d, _ctx), do: {:ok, d}
def execute(d, _ctx), do: {:emit, :out, d}
end
defmodule BloccsNotebook.Routing.Collect do
use Bloccs.Node, manifest: "/tmp/bloccs_routing/collect.bloccs"
def transform(d, _ctx), do: {:ok, d}
def execute(d, _ctx), do: {:emit, :done, d}
end
{:ok, mg_net} = Bloccs.Parser.parse_network("/tmp/bloccs_routing/merge.bloccs")
:ok = Bloccs.Validator.validate_network(mg_net)
{:ok, mg_sup} = Bloccs.Compiler.compile_and_load(mg_net)
{:ok, _} = mg_sup.start_link([])
Bloccs.Router.register_sink(:merge, :collect, :done, self())
Bloccs.Producer.push(Bloccs.Router.producer_name(:merge, :left, :req), %{"id" => "from-left"})
Bloccs.Producer.push(Bloccs.Router.producer_name(:merge, :right, :req), %{"id" => "from-right"})
for _ <- 1..2 do
receive do
{:bloccs_sink, :merge, :collect, :done, m} -> m["id"]
after
1_500 -> "(timeout)"
end
end
|> Enum.sort()
Both sources land at the same collector — ["from-left", "from-right"]. The
collector’s single in-port accumulated both edges; the validator accepts the two
distinct edges into one port, and the router builds one producer that every
upstream edge feeds.
Where next
-
04_aggregation_primitives— batch (windowing), join (key correlation + deadletter), and rate/delay (throttling). The primitives that hold state or bend time. -
02_events_webhook— routing in a realistic flow: theroutenode splits known from unknown, and a known event fans out to two sinks.