Powered by AppSignal & Oban Pro

bloccs · 03 — Routing: filter, split & merge

notebooks/03_routing_primitives.livemd

bloccs · 03 — Routing: filter, split & merge

Mix.install([
  {:bloccs, "~> 0.9"}
])

What this covers

Three routing primitives — how messages are removed, fanned out, and fanned in. None of them needs a special construct; they fall out of what a node’s effect shell returns and how edges are wired.

  • Filter — a node returns :drop; the message is consumed and emits nothing.
  • Split — a node emits to several out-ports in one invocation.
  • Merge (fan-in) — several out-ports edge into one in-port.

This notebook assumes the parse → validate → compile → run loop from 01_first_network. The next notebook, 04_aggregation_primitives, covers the stateful ones: batch, join, and rate.

Filter + split in one node

A single gate node does both. Its pure core passes the message through; its effect shell decides:

  • a blank message returns :drop — filtered out, nothing emitted (a [:bloccs, :node, :dropped] telemetry event fires);
  • anything else returns {:emit, [{:kept, ...}, {:audit, ...}]} — split to both out-ports, each with a distinct payload.
                  (blank) ✘ dropped
message ─▶ [gate] ┤
         (pure)   ├──kept───▶ [keep_sink]  ─▶ stored
                  └──audit──▶ [audit_sink] ─▶ logged

Schemas

Bloccs.Schema.register("Message@1", id: :string, text: :string)
Bloccs.Schema.register("Audit@1", id: :string, note: :string)
Bloccs.Schema.register("Receipt@1", id: :string, outcome: :string)
Bloccs.Schema.register("Msg@1", id: :string)

Manifests

All manifests for this notebook go to one directory so the node modules can reference them by literal path.

dir = "/tmp/bloccs_routing"
File.mkdir_p!(dir)

routing = %{
  "gate.bloccs" => """
  [node]
  id = "gate"
  version = "0.1.0"
  kind = "transform"
  [doc]
  intent = "Filter out blank messages (:drop); split the rest to kept + audit."
  [ports.in]
  message = { schema = "Message@1" }
  [ports.out]
  kept  = { schema = "Message@1" }
  audit = { schema = "Audit@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Routing.Gate.transform/2"
  effect_shell = "BloccsNotebook.Routing.Gate.execute/2"
  """,
  "keep_sink.bloccs" => """
  [node]
  id = "keep_sink"
  version = "0.1.0"
  kind = "sink"
  [ports.in]
  item = { schema = "Message@1" }
  [ports.out]
  stored = { schema = "Receipt@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Routing.KeepSink.transform/2"
  effect_shell = "BloccsNotebook.Routing.KeepSink.execute/2"
  """,
  "audit_sink.bloccs" => """
  [node]
  id = "audit_sink"
  version = "0.1.0"
  kind = "sink"
  [ports.in]
  entry = { schema = "Audit@1" }
  [ports.out]
  logged = { schema = "Receipt@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Routing.AuditSink.transform/2"
  effect_shell = "BloccsNotebook.Routing.AuditSink.execute/2"
  """,
  "filtersplit.bloccs" => """
  [network]
  id = "filtersplit"
  version = "0.1.0"
  runtime = "beam"
  [nodes]
  gate       = { use = "gate.bloccs" }
  keep_sink  = { use = "keep_sink.bloccs" }
  audit_sink = { use = "audit_sink.bloccs" }
  [[edges]]
  from = "gate.kept"
  to   = "keep_sink.item"
  [[edges]]
  from = "gate.audit"
  to   = "audit_sink.entry"
  [expose]
  in  = { message = "gate.message" }
  out = { stored = "keep_sink.stored", logged = "audit_sink.logged" }
  [supervision]
  strategy = "one_for_one"
  """,
  "left.bloccs" => """
  [node]
  id = "merge_left"
  version = "0.1.0"
  kind = "source"
  [ports.in]
  req = { schema = "Msg@1" }
  [ports.out]
  out = { schema = "Msg@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Routing.Left.transform/2"
  effect_shell = "BloccsNotebook.Routing.Left.execute/2"
  """,
  "right.bloccs" => """
  [node]
  id = "merge_right"
  version = "0.1.0"
  kind = "source"
  [ports.in]
  req = { schema = "Msg@1" }
  [ports.out]
  out = { schema = "Msg@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Routing.Right.transform/2"
  effect_shell = "BloccsNotebook.Routing.Right.execute/2"
  """,
  "collect.bloccs" => """
  [node]
  id = "merge_collect"
  version = "0.1.0"
  kind = "sink"
  [ports.in]
  acc = { schema = "Msg@1" }
  [ports.out]
  done = { schema = "Msg@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Routing.Collect.transform/2"
  effect_shell = "BloccsNotebook.Routing.Collect.execute/2"
  """,
  "merge.bloccs" => """
  [network]
  id = "merge"
  version = "0.1.0"
  runtime = "beam"
  [nodes]
  left    = { use = "left.bloccs" }
  right   = { use = "right.bloccs" }
  collect = { use = "collect.bloccs" }
  # Fan-in: two distinct out-ports both edge into collect's single in-port.
  [[edges]]
  from = "left.out"
  to   = "collect.acc"
  [[edges]]
  from = "right.out"
  to   = "collect.acc"
  [expose]
  in  = { left_in = "left.req", right_in = "right.req" }
  out = { merged = "collect.done" }
  [supervision]
  strategy = "one_for_one"
  """
}

for {name, body} <- routing, do: File.write!(Path.join(dir, name), body)
File.ls!(dir)

The nodes

defmodule BloccsNotebook.Routing.Gate do
  use Bloccs.Node, manifest: "/tmp/bloccs_routing/gate.bloccs"

  def transform(message, _ctx), do: {:ok, message}

  def execute(%{text: text} = message, _ctx) do
    if String.trim(text) == "" do
      :drop
    else
      audit = %{id: message.id, note: "#{String.length(text)} chars"}
      {:emit, [{:kept, message}, {:audit, audit}]}
    end
  end
end

defmodule BloccsNotebook.Routing.KeepSink do
  use Bloccs.Node, manifest: "/tmp/bloccs_routing/keep_sink.bloccs"
  def transform(m, _ctx), do: {:ok, m}
  def execute(m, _ctx), do: {:emit, :stored, %{id: m.id, outcome: "stored"}}
end

defmodule BloccsNotebook.Routing.AuditSink do
  use Bloccs.Node, manifest: "/tmp/bloccs_routing/audit_sink.bloccs"
  def transform(e, _ctx), do: {:ok, e}
  def execute(e, _ctx), do: {:emit, :logged, %{id: e.id, outcome: "audited"}}
end

Run it

{:ok, fs_net} = Bloccs.Parser.parse_network("/tmp/bloccs_routing/filtersplit.bloccs")
:ok = Bloccs.Validator.validate_network(fs_net)
{:ok, fs_sup} = Bloccs.Compiler.compile_and_load(fs_net)
{:ok, _} = fs_sup.start_link([])

Bloccs.Router.register_sink(:filtersplit, :keep_sink, :stored, self())
Bloccs.Router.register_sink(:filtersplit, :audit_sink, :logged, self())

gate = Bloccs.Router.producer_name(:filtersplit, :gate, :message)
Bloccs.Producer.push(gate, %{id: "m1", text: "hello"})
Bloccs.Producer.push(gate, %{id: "m2", text: "world!!"})
# Blank → filtered out, produces nothing.
Bloccs.Producer.push(gate, %{id: "m3", text: "   "})

# Two real messages × (kept + audit) = 4 sink messages; m3 is dropped.
for _ <- 1..4 do
  receive do
    {:bloccs_sink, :filtersplit, :keep_sink, :stored, r} -> "stored #{r.id}"
    {:bloccs_sink, :filtersplit, :audit_sink, :logged, r} -> "logged #{r.id}"
  after
    1_500 -> "(timeout)"
  end
end
|> Enum.sort()

You get four results — m1 and m2 each stored and logged. m3 produced nothing: the gate dropped it before it reached either sink. One node both removed a message from the flow and expanded the others into two streams.

Merge: fan-in to a single in-port

A v0.1 node has one in-port, but a port can be the target of many edges. Wire two sources’ out-ports into one collector in-port and they merge into a single stream — no merge node, no special construct. Delivery is interleaved and unordered.

[left]  ─out─┐
             ├─▶ [collect] ─▶ merged
[right] ─out─┘
defmodule BloccsNotebook.Routing.Left do
  use Bloccs.Node, manifest: "/tmp/bloccs_routing/left.bloccs"
  def transform(d, _ctx), do: {:ok, d}
  def execute(d, _ctx), do: {:emit, :out, d}
end

defmodule BloccsNotebook.Routing.Right do
  use Bloccs.Node, manifest: "/tmp/bloccs_routing/right.bloccs"
  def transform(d, _ctx), do: {:ok, d}
  def execute(d, _ctx), do: {:emit, :out, d}
end

defmodule BloccsNotebook.Routing.Collect do
  use Bloccs.Node, manifest: "/tmp/bloccs_routing/collect.bloccs"
  def transform(d, _ctx), do: {:ok, d}
  def execute(d, _ctx), do: {:emit, :done, d}
end
{:ok, mg_net} = Bloccs.Parser.parse_network("/tmp/bloccs_routing/merge.bloccs")
:ok = Bloccs.Validator.validate_network(mg_net)
{:ok, mg_sup} = Bloccs.Compiler.compile_and_load(mg_net)
{:ok, _} = mg_sup.start_link([])

Bloccs.Router.register_sink(:merge, :collect, :done, self())

Bloccs.Producer.push(Bloccs.Router.producer_name(:merge, :left, :req), %{"id" => "from-left"})
Bloccs.Producer.push(Bloccs.Router.producer_name(:merge, :right, :req), %{"id" => "from-right"})

for _ <- 1..2 do
  receive do
    {:bloccs_sink, :merge, :collect, :done, m} -> m["id"]
  after
    1_500 -> "(timeout)"
  end
end
|> Enum.sort()

Both sources land at the same collector — ["from-left", "from-right"]. The collector’s single in-port accumulated both edges; the validator accepts the two distinct edges into one port, and the router builds one producer that every upstream edge feeds.

Where next

  • 04_aggregation_primitives — batch (windowing), join (key correlation + deadletter), and rate/delay (throttling). The primitives that hold state or bend time.
  • 02_events_webhook — routing in a realistic flow: the route node splits known from unknown, and a known event fans out to two sinks.