Powered by AppSignal & Oban Pro

bloccs · 05 — Composing networks as subgraphs

notebooks/05_subgraph_composition.livemd

bloccs · 05 — Composing networks as subgraphs

Mix.install([
  {:bloccs, "~> 0.9"}
])

What this covers

A [nodes] entry can use a whole network instead of a single node. The parser flattens it at parse time into namespaced leaf nodes, rewrites the edges across the seam, and the compiler builds one Broadway tree over the flat graph. A sub-network is a reusable unit — wire it in like any node, and lineage, coverage, and supervision treat the composition as one graph.

app:
  start ─▶ [head] ──go──▶ [ pipe ]──▶ done
                          │ entry → up → down → exit │
                          └──────── sub-network ─────┘

flattened ⇒  head ─▶ pipe.up ─▶ pipe.down

The parse → validate → compile → run loop is in 01_first_network.

The pieces

Three pass-through nodes (head, up, down), one sub-network (pipe = up → down), and one parent network (app = head → pipe). The nodes forward their payload unchanged so a single push is easy to follow head → pipe.up → pipe.down.

Bloccs.Schema.register("Msg@1", id: :string)
dir = "/tmp/bloccs_subgraph"
File.mkdir_p!(dir)

subgraph = %{
  "head.bloccs" => """
  [node]
  id = "sg_head"
  version = "0.1.0"
  kind = "transform"
  [ports.in]
  start = { schema = "Msg@1" }
  [ports.out]
  go = { schema = "Msg@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Sub.Head.transform/2"
  effect_shell = "BloccsNotebook.Sub.Head.execute/2"
  """,
  "up.bloccs" => """
  [node]
  id = "sg_up"
  version = "0.1.0"
  kind = "transform"
  [ports.in]
  input = { schema = "Msg@1" }
  [ports.out]
  mid = { schema = "Msg@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Sub.Up.transform/2"
  effect_shell = "BloccsNotebook.Sub.Up.execute/2"
  """,
  "down.bloccs" => """
  [node]
  id = "sg_down"
  version = "0.1.0"
  kind = "sink"
  [ports.in]
  mid = { schema = "Msg@1" }
  [ports.out]
  output = { schema = "Msg@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Sub.Down.transform/2"
  effect_shell = "BloccsNotebook.Sub.Down.execute/2"
  """,
  # The sub-network: a self-contained pipeline with its own exposed ports.
  "pipe.bloccs" => """
  [network]
  id = "pipe"
  version = "0.1.0"
  [nodes]
  up   = { use = "up.bloccs" }
  down = { use = "down.bloccs" }
  [[edges]]
  from = "up.mid"
  to   = "down.mid"
  [expose]
  in  = { entry = "up.input" }
  out = { exit = "down.output" }
  """,
  # The parent: `pipe` is used exactly like a node, by its exposed ports.
  "app.bloccs" => """
  [network]
  id = "app"
  version = "0.1.0"
  [nodes]
  head = { use = "head.bloccs" }
  pipe = { use = "pipe.bloccs" }
  [[edges]]
  from = "head.go"
  to   = "pipe.entry"
  [expose]
  in  = { start = "head.start" }
  out = { done = "pipe.exit" }
  [supervision]
  strategy = "one_for_one"
  """
}

for {name, body} <- subgraph, do: File.write!(Path.join(dir, name), body)
File.ls!(dir)

Notice app wires head.go → pipe.entry and exposes pipe.exit. It refers to the sub-network only through pipe‘s exposed ports (entry/exit) — the sub-network’s internals (up, down) are private to pipe.

defmodule BloccsNotebook.Sub.Head do
  use Bloccs.Node, manifest: "/tmp/bloccs_subgraph/head.bloccs"
  def transform(d, _ctx), do: {:ok, d}
  def execute(d, _ctx), do: {:emit, :go, d}
end

defmodule BloccsNotebook.Sub.Up do
  use Bloccs.Node, manifest: "/tmp/bloccs_subgraph/up.bloccs"
  def transform(d, _ctx), do: {:ok, d}
  def execute(d, _ctx), do: {:emit, :mid, d}
end

defmodule BloccsNotebook.Sub.Down do
  use Bloccs.Node, manifest: "/tmp/bloccs_subgraph/down.bloccs"
  def transform(d, _ctx), do: {:ok, d}
  def execute(d, _ctx), do: {:emit, :output, d}
end

Parse the parent — watch it flatten

Parsing app pulls in pipe and flattens it. The sub-network’s nodes become namespaced leaf nodes under the instance name: up and down become pipe.up and pipe.down. There is no nesting at runtime — one flat graph.

{:ok, app_net} = Bloccs.Parser.parse_network("/tmp/bloccs_subgraph/app.bloccs")

%{
  node_ids: app_net.nodes |> Map.keys() |> Enum.sort(),
  edges: Enum.map(app_net.edges, &{&1.from, &1.to})
}

The node ids are [:head, :"pipe.down", :"pipe.up"]. The edges show the seam was rewritten: head.go now points at the sub-network’s exposed entry (pipe.up.input), and pipe‘s internal up.mid → down.mid is namespaced to pipe.up.mid → pipe.down.mid. The composition boundary is gone by the time the compiler sees it.

Compile, run, and trace across the seam

:ok = Bloccs.Validator.validate_network(app_net)
{:ok, app_sup} = Bloccs.Compiler.compile_and_load(app_net)
{:ok, _} = app_sup.start_link([])

# The exposed `done` resolves to the flattened pipe.down.output.
Bloccs.Router.register_sink(:app, :"pipe.down", :output, self())

trace = Bloccs.Trace.record(:app)

Bloccs.Producer.push(Bloccs.Router.producer_name(:app, :head, :start), %{"id" => "msg-1"})

receive do
  {:bloccs_sink, :app, :"pipe.down", :output, m} -> m
after
  2_000 -> "(timeout)"
end

The message pushed at head.start flowed head → pipe.up → pipe.down and emerged at the exposed done port as %{"id" => "msg-1"} — across the composition boundary without you wiring the sub-network’s internals.

Coverage spans the composed graph

Coverage measures the flattened graph, so it accounts for the sub-network’s ports and edges too — one happy path here reaches everything.

reached = Bloccs.Trace.reached(Bloccs.Trace.stop(trace))
report = Bloccs.Coverage.report(app_net, reached)

IO.puts(Bloccs.Coverage.render(app_net, report))

Eight obligations, all reached — including the seam edge head.go → pipe.up.input and the sub-network’s internal pipe.up.mid → pipe.down.mid. Compose networks freely; the tooling still sees one graph.

Why this matters

  • Reuse — a sub-network (an auth check, an enrichment pipeline, a fan-out) is defined once and dropped into many parents by its exposed ports.
  • Encapsulation — a parent depends only on a sub-network’s [expose] contract, not its internals; refactor the inside freely.
  • No runtime cost — flattening happens at parse time, so composition is a source-level convenience, not a layer of indirection at runtime.

Where next

  • 02_events_webhook — a flat network you could factor into sub-networks (e.g. an enrich + route pipeline reused across apps).
  • The guides — the manifest reference for [expose] and the subgraph flattening rules (including cycle detection across use).