bloccs · 02 — A webhook/event processor
Mix.install([
{:bloccs, "~> 0.9"}
])
What this notebook builds
The flagship bloccs example: a real-shaped pipeline of the kind every backend grows — receive an event, validate it, enrich it from an HTTP lookup, then branch on type and fan out. It runs on the built-in mock effect backends, so there are no external services to stand up.
┌─▶ persist (DB) → stored
webhook ─▶ ingest ─▶ validate ─▶ enrich ─▶ route ┤ (events table)
(source) (pure) (HTTP, (router)│
retry, └─▶ notify (HTTP) → notified
timeout, │
idempotent) └──(unknown type)─▶ deadletter (DB) → dead
It exercises the whole toolbox in one flow: schema-typed edges, declared HTTP + DB effects, a pure validation gate, retry + timeout, idempotency, branching, fan-out, and structural coverage.
If you have not seen the parse → validate → compile → run loop yet, start with
01_first_network — this notebook assumes it.
1 · Register the schemas
Four payload shapes flow through the network. RawEvent@1 enters; enrich adds
an enrichment map to produce EnrichedEvent@1; the two sinks emit
StoredEvent@1 and Notified@1.
Bloccs.Schema.register("RawEvent@1", id: :string, type: :string, payload: :map)
Bloccs.Schema.register("EnrichedEvent@1",
id: :string,
type: :string,
payload: :map,
enrichment: :map
)
Bloccs.Schema.register("StoredEvent@1", id: :integer, event_id: :string, type: :string)
Bloccs.Schema.register("Notified@1", id: :string, status: :string)
2 · Write the manifests
Seven node manifests and one network manifest. Read the [effects] blocks —
they are the declared capability surface. enrich may only GET
enrichment.local; persist may only insert the events table. A call
outside the allowlist is refused at runtime, and an undeclared axis is a compile
warning. The manifests go to a fixed directory so the node modules below can
reference them by literal path.
dir = "/tmp/bloccs_events"
File.mkdir_p!(dir)
manifests = %{
"ingest.bloccs" => """
[node]
id = "ingest"
version = "0.1.0"
kind = "source"
[doc]
intent = "Accept a raw webhook event and normalize its shape."
[ports.in]
received = { schema = "RawEvent@1", buffer = 1000 }
[ports.out]
event = { schema = "RawEvent@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Events.Ingest.transform/2"
effect_shell = "BloccsNotebook.Events.Ingest.execute/2"
""",
"validate.bloccs" => """
[node]
id = "validate"
version = "0.1.0"
kind = "transform"
[doc]
intent = "Reject events with an empty payload — a pure rule, no effects."
[ports.in]
event = { schema = "RawEvent@1" }
[ports.out]
valid = { schema = "RawEvent@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Events.Validate.transform/2"
effect_shell = "BloccsNotebook.Events.Validate.execute/2"
""",
"enrich.bloccs" => """
[node]
id = "enrich"
version = "0.1.0"
kind = "transform"
[doc]
intent = "Enrich an event from an HTTP lookup. Idempotent, retried, time-bounded."
[ports.in]
valid = { schema = "RawEvent@1" }
[ports.out]
enriched = { schema = "EnrichedEvent@1" }
[effects]
http = { allow = ["enrichment.local"], methods = ["GET"] }
[contract]
pure_core = "BloccsNotebook.Events.Enrich.transform/2"
effect_shell = "BloccsNotebook.Events.Enrich.execute/2"
timeout_ms = 3000
retry = { strategy = "exponential", max = 2, on = ["timeout"], base_ms = 50 }
idempotency = { key = "id" }
""",
"route.bloccs" => """
[node]
id = "route"
version = "0.1.0"
kind = "router"
[doc]
intent = "Branch on event type: known types go downstream, the rest are dead-lettered."
[ports.in]
enriched = { schema = "EnrichedEvent@1" }
[ports.out]
known = { schema = "EnrichedEvent@1" }
unknown = { schema = "EnrichedEvent@1" }
[effects]
[contract]
pure_core = "BloccsNotebook.Events.Route.transform/2"
effect_shell = "BloccsNotebook.Events.Route.execute/2"
""",
"persist.bloccs" => """
[node]
id = "persist"
version = "0.1.0"
kind = "sink"
[doc]
intent = "Persist a known event to the events table."
[ports.in]
event = { schema = "EnrichedEvent@1" }
[ports.out]
stored = { schema = "StoredEvent@1" }
[effects]
db = { allow = ["events:insert"] }
[contract]
pure_core = "BloccsNotebook.Events.Persist.transform/2"
effect_shell = "BloccsNotebook.Events.Persist.execute/2"
""",
"notify.bloccs" => """
[node]
id = "notify"
version = "0.1.0"
kind = "sink"
[doc]
intent = "Notify a downstream webhook that a known event was processed."
[ports.in]
event = { schema = "EnrichedEvent@1" }
[ports.out]
notified = { schema = "Notified@1" }
[effects]
http = { allow = ["hooks.local"], methods = ["POST"] }
[contract]
pure_core = "BloccsNotebook.Events.Notify.transform/2"
effect_shell = "BloccsNotebook.Events.Notify.execute/2"
""",
"deadletter.bloccs" => """
[node]
id = "deadletter"
version = "0.1.0"
kind = "sink"
[doc]
intent = "Record an unknown-type event in the deadletter table."
[ports.in]
event = { schema = "EnrichedEvent@1" }
[ports.out]
recorded = { schema = "StoredEvent@1" }
[effects]
db = { allow = ["deadletter:insert"] }
[contract]
pure_core = "BloccsNotebook.Events.Deadletter.transform/2"
effect_shell = "BloccsNotebook.Events.Deadletter.execute/2"
""",
"events.bloccs" => """
[network]
id = "events"
version = "0.1.0"
runtime = "beam"
[nodes]
ingest = { use = "ingest.bloccs" }
validate = { use = "validate.bloccs" }
enrich = { use = "enrich.bloccs" }
route = { use = "route.bloccs" }
persist = { use = "persist.bloccs" }
notify = { use = "notify.bloccs" }
deadletter = { use = "deadletter.bloccs" }
[[edges]]
from = "ingest.event"
to = "validate.event"
[[edges]]
from = "validate.valid"
to = "enrich.valid"
[[edges]]
from = "enrich.enriched"
to = "route.enriched"
# Fan-out: a known event is both persisted and notified.
[[edges]]
from = "route.known"
to = ["persist.event", "notify.event"]
[[edges]]
from = "route.unknown"
to = "deadletter.event"
[expose]
in = { webhook = "ingest.received" }
out = { stored = "persist.stored", notified = "notify.notified", dead = "deadletter.recorded" }
[supervision]
strategy = "rest_for_one"
max_restarts = 5
max_seconds = 60
[deploy]
concurrency = { enrich = 4, persist = 1 }
"""
}
for {name, body} <- manifests, do: File.write!(Path.join(dir, name), body)
File.ls!(dir)
The network manifest is where the topology lives: five [[edges]] (one of them
a fan-out to two targets), the exposed webhook input and three named outputs,
a rest_for_one supervision strategy, and per-node concurrency under
[deploy]. The pipeline shape is data, not code.
3 · Define the nodes
Each node is a pure core plus an effect shell. Notice where the work happens:
-
validaterejects an empty payload in the pure core —{:error, :empty_payload}— so no downstream effect ever runs for a bad event. -
enrichreaches the outside world throughctx.effects.http. The retry, timeout, and idempotency it declared are applied by the runtime around this call; the code stays a plainHTTP.get. -
persist/deadletterwrite throughctx.effects.db. -
routemakes a pure branching decision and emits toknownorunknown.
defmodule BloccsNotebook.Events.Ingest do
use Bloccs.Node, manifest: "/tmp/bloccs_events/ingest.bloccs"
def transform(raw, _ctx) do
id = raw[:id] || raw["id"]
type = raw[:type] || raw["type"]
payload = raw[:payload] || raw["payload"]
if is_binary(id) and is_binary(type) and is_map(payload) do
{:ok, %{id: id, type: String.downcase(type), payload: payload}}
else
{:error, :missing_id_or_type}
end
end
def execute(event, _ctx), do: {:emit, :event, event}
end
defmodule BloccsNotebook.Events.Validate do
use Bloccs.Node, manifest: "/tmp/bloccs_events/validate.bloccs"
def transform(event, _ctx) do
if map_size(event.payload) > 0, do: {:ok, event}, else: {:error, :empty_payload}
end
def execute(event, _ctx), do: {:emit, :valid, event}
end
defmodule BloccsNotebook.Events.Enrich do
use Bloccs.Node, manifest: "/tmp/bloccs_events/enrich.bloccs"
alias Bloccs.Effects.HTTP
@lookup_url "http://enrichment.local/lookup"
def transform(event, _ctx), do: {:ok, event}
def execute(event, ctx) do
case HTTP.get(ctx.effects.http, @lookup_url) do
{:ok, enrichment} -> {:emit, :enriched, Map.put(event, :enrichment, enrichment)}
{:error, reason} -> {:error, reason}
end
end
end
defmodule BloccsNotebook.Events.Route do
use Bloccs.Node, manifest: "/tmp/bloccs_events/route.bloccs"
@known ~w(order.created order.updated user.created)
def transform(event, _ctx), do: {:ok, event}
def execute(event, _ctx) do
if event.type in @known, do: {:emit, :known, event}, else: {:emit, :unknown, event}
end
end
defmodule BloccsNotebook.Events.Persist do
use Bloccs.Node, manifest: "/tmp/bloccs_events/persist.bloccs"
alias Bloccs.Effects.DB
def transform(event, _ctx), do: {:ok, event}
def execute(event, ctx) do
{:ok, row} = DB.insert(ctx.effects.db, :events, event_id: event.id, type: event.type)
{:emit, :stored, %{id: row[:id], event_id: event.id, type: event.type}}
end
end
defmodule BloccsNotebook.Events.Notify do
use Bloccs.Node, manifest: "/tmp/bloccs_events/notify.bloccs"
alias Bloccs.Effects.HTTP
@hook_url "http://hooks.local/notify"
def transform(event, _ctx), do: {:ok, event}
def execute(event, ctx) do
status =
case HTTP.post(ctx.effects.http, @hook_url, %{id: event.id, type: event.type}) do
{:ok, _} -> "sent"
{:error, _} -> "failed"
end
{:emit, :notified, %{id: event.id, status: status}}
end
end
defmodule BloccsNotebook.Events.Deadletter do
use Bloccs.Node, manifest: "/tmp/bloccs_events/deadletter.bloccs"
alias Bloccs.Effects.DB
def transform(event, _ctx), do: {:ok, event}
def execute(event, ctx) do
{:ok, row} = DB.insert(ctx.effects.db, :deadletter, event_id: event.id, type: event.type)
{:emit, :recorded, %{id: row[:id], event_id: event.id, type: event.type}}
end
end
4 · Wire the mock effect backends
bloccs ships with mock HTTP and DB backends, and they are the default — no
config needed. The HTTP mock refuses any URL you have not stubbed (the same
discipline the allowlist enforces), so stub the two endpoints the network calls.
In production these become the real HTTP.Req and DB.Ecto adapters through
config :bloccs, :effect_backends, ... — the manifests and node code do not
change.
alias Bloccs.Effects.HTTP.Mock, as: MockHTTP
alias Bloccs.Effects.DB.Mock, as: MockDB
MockHTTP.reset()
MockDB.reset()
Bloccs.Idempotency.reset()
MockHTTP.stub("GET", "http://enrichment.local/lookup", fn _req ->
%{"region" => "us-east", "tier" => "gold"}
end)
MockHTTP.stub("POST", "http://hooks.local/notify", fn _req -> %{"ok" => true} end)
:ok
5 · Compile, start, and record a trace
Same loop as notebook 01, on a seven-node graph. Trace.record/1 attaches
telemetry so we can measure structural coverage afterward.
{:ok, network} = Bloccs.Parser.parse_network("/tmp/bloccs_events/events.bloccs")
:ok = Bloccs.Validator.validate_network(network)
{:ok, sup} = Bloccs.Compiler.compile_and_load(network)
{:ok, _pid} = sup.start_link([])
# Subscribe to all three exposed outputs.
Bloccs.Router.register_sink(:events, :persist, :stored, self())
Bloccs.Router.register_sink(:events, :notify, :notified, self())
Bloccs.Router.register_sink(:events, :deadletter, :recorded, self())
trace = Bloccs.Trace.record(:events)
:started
Push four webhook events
Two known types, one unknown type, and a replay of evt-1. Watch what each
produces:
producer = Bloccs.Router.producer_name(:events, :ingest, :received)
events = [
%{id: "evt-1", type: "order.created", payload: %{"order_id" => 1001}},
%{id: "evt-2", type: "user.created", payload: %{"user_id" => 77}},
# Unknown type → dead-lettered, not dropped.
%{id: "evt-3", type: "subscription.canceled", payload: %{"sub_id" => 9}},
# Replay of evt-1 → deduped at enrich (idempotency key = id), never reaches a sink.
%{id: "evt-1", type: "order.created", payload: %{"order_id" => 1001}}
]
for e <- events, do: Bloccs.Producer.push(producer, e)
# 2 known × (persist + notify) + 1 dead-lettered = 5 sink messages.
# The replayed evt-1 is deduped, so there is no 6th.
for _ <- 1..5 do
receive do
{:bloccs_sink, :events, :persist, :stored, %{event_id: id}} -> "persisted #{id}"
{:bloccs_sink, :events, :notify, :notified, %{id: id, status: s}} -> "notified #{id} (#{s})"
{:bloccs_sink, :events, :deadletter, :recorded, %{event_id: id}} -> "dead-lettered #{id}"
after
3_000 -> "(timeout)"
end
end
Five results: evt-1 and evt-2 each persisted and notified, evt-3
dead-lettered. The duplicate evt-1 produced nothing — deduped at the enrich
node before the lookup ran, concurrency-safe.
Inspect the mock tables
inserts = Enum.reverse(MockDB.inserts())
%{
events: for({:events, r} <- inserts, do: %{id: r.id, event_id: r.event_id, type: r.type}),
deadletter: for({:deadletter, r} <- inserts, do: %{id: r.id, event_id: r.event_id, type: r.type})
}
Two rows in events (the known types), one in deadletter (the unknown type).
Four events in, three stored, one deduped — the fan-out, the branch, and the
idempotency guarantee, all visible in the data.
6 · Structural coverage
Trace.reached/1 turns the recorded telemetry into the set of ports and edges
the run actually exercised; Coverage.report/2 measures it against every
obligation in the network. This is what mix bloccs.coverage renders from the
CLI — here it runs in-process.
reached = Bloccs.Trace.reached(Bloccs.Trace.stop(trace))
report = Bloccs.Coverage.report(network, reached)
IO.puts(Bloccs.Coverage.render(network, report))
The four events between them touch every port and edge — 100.0%. Drop the
unknown-type event and re-run from section 5: the route.unknown → deadletter
edge goes unreached, and coverage tells you which obligation you stopped
testing.
Where next
-
03_routing_primitives— filter, split, and merge, each as a small runnable network. -
04_aggregation_primitives— batch, join, and rate. -
The
eventsexample in the library — the same network as a full Mix project, plus the swap to real HTTP (Req) and a real database (Ecto). -
The guides — the manifest reference for
retry,idempotency,[deploy]concurrency, and the generated supervision tree.