Powered by AppSignal & Oban Pro

bloccs · 02 — A webhook/event processor

notebooks/02_events_webhook.livemd

bloccs · 02 — A webhook/event processor

Mix.install([
  {:bloccs, "~> 0.9"}
])

What this notebook builds

The flagship bloccs example: a real-shaped pipeline of the kind every backend grows — receive an event, validate it, enrich it from an HTTP lookup, then branch on type and fan out. It runs on the built-in mock effect backends, so there are no external services to stand up.

                                              ┌─▶ persist (DB)    → stored
webhook ─▶ ingest ─▶ validate ─▶ enrich ─▶ route ┤   (events table)
          (source)  (pure)     (HTTP,    (router)│
                               retry,            └─▶ notify (HTTP)  → notified
                               timeout,          │
                               idempotent)       └──(unknown type)─▶ deadletter (DB) → dead

It exercises the whole toolbox in one flow: schema-typed edges, declared HTTP + DB effects, a pure validation gate, retry + timeout, idempotency, branching, fan-out, and structural coverage.

If you have not seen the parse → validate → compile → run loop yet, start with 01_first_network — this notebook assumes it.

1 · Register the schemas

Four payload shapes flow through the network. RawEvent@1 enters; enrich adds an enrichment map to produce EnrichedEvent@1; the two sinks emit StoredEvent@1 and Notified@1.

Bloccs.Schema.register("RawEvent@1", id: :string, type: :string, payload: :map)

Bloccs.Schema.register("EnrichedEvent@1",
  id: :string,
  type: :string,
  payload: :map,
  enrichment: :map
)

Bloccs.Schema.register("StoredEvent@1", id: :integer, event_id: :string, type: :string)
Bloccs.Schema.register("Notified@1", id: :string, status: :string)

2 · Write the manifests

Seven node manifests and one network manifest. Read the [effects] blocks — they are the declared capability surface. enrich may only GET enrichment.local; persist may only insert the events table. A call outside the allowlist is refused at runtime, and an undeclared axis is a compile warning. The manifests go to a fixed directory so the node modules below can reference them by literal path.

dir = "/tmp/bloccs_events"
File.mkdir_p!(dir)

manifests = %{
  "ingest.bloccs" => """
  [node]
  id = "ingest"
  version = "0.1.0"
  kind = "source"
  [doc]
  intent = "Accept a raw webhook event and normalize its shape."
  [ports.in]
  received = { schema = "RawEvent@1", buffer = 1000 }
  [ports.out]
  event = { schema = "RawEvent@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Events.Ingest.transform/2"
  effect_shell = "BloccsNotebook.Events.Ingest.execute/2"
  """,
  "validate.bloccs" => """
  [node]
  id = "validate"
  version = "0.1.0"
  kind = "transform"
  [doc]
  intent = "Reject events with an empty payload — a pure rule, no effects."
  [ports.in]
  event = { schema = "RawEvent@1" }
  [ports.out]
  valid = { schema = "RawEvent@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Events.Validate.transform/2"
  effect_shell = "BloccsNotebook.Events.Validate.execute/2"
  """,
  "enrich.bloccs" => """
  [node]
  id = "enrich"
  version = "0.1.0"
  kind = "transform"
  [doc]
  intent = "Enrich an event from an HTTP lookup. Idempotent, retried, time-bounded."
  [ports.in]
  valid = { schema = "RawEvent@1" }
  [ports.out]
  enriched = { schema = "EnrichedEvent@1" }
  [effects]
  http = { allow = ["enrichment.local"], methods = ["GET"] }
  [contract]
  pure_core    = "BloccsNotebook.Events.Enrich.transform/2"
  effect_shell = "BloccsNotebook.Events.Enrich.execute/2"
  timeout_ms   = 3000
  retry        = { strategy = "exponential", max = 2, on = ["timeout"], base_ms = 50 }
  idempotency  = { key = "id" }
  """,
  "route.bloccs" => """
  [node]
  id = "route"
  version = "0.1.0"
  kind = "router"
  [doc]
  intent = "Branch on event type: known types go downstream, the rest are dead-lettered."
  [ports.in]
  enriched = { schema = "EnrichedEvent@1" }
  [ports.out]
  known   = { schema = "EnrichedEvent@1" }
  unknown = { schema = "EnrichedEvent@1" }
  [effects]
  [contract]
  pure_core    = "BloccsNotebook.Events.Route.transform/2"
  effect_shell = "BloccsNotebook.Events.Route.execute/2"
  """,
  "persist.bloccs" => """
  [node]
  id = "persist"
  version = "0.1.0"
  kind = "sink"
  [doc]
  intent = "Persist a known event to the events table."
  [ports.in]
  event = { schema = "EnrichedEvent@1" }
  [ports.out]
  stored = { schema = "StoredEvent@1" }
  [effects]
  db = { allow = ["events:insert"] }
  [contract]
  pure_core    = "BloccsNotebook.Events.Persist.transform/2"
  effect_shell = "BloccsNotebook.Events.Persist.execute/2"
  """,
  "notify.bloccs" => """
  [node]
  id = "notify"
  version = "0.1.0"
  kind = "sink"
  [doc]
  intent = "Notify a downstream webhook that a known event was processed."
  [ports.in]
  event = { schema = "EnrichedEvent@1" }
  [ports.out]
  notified = { schema = "Notified@1" }
  [effects]
  http = { allow = ["hooks.local"], methods = ["POST"] }
  [contract]
  pure_core    = "BloccsNotebook.Events.Notify.transform/2"
  effect_shell = "BloccsNotebook.Events.Notify.execute/2"
  """,
  "deadletter.bloccs" => """
  [node]
  id = "deadletter"
  version = "0.1.0"
  kind = "sink"
  [doc]
  intent = "Record an unknown-type event in the deadletter table."
  [ports.in]
  event = { schema = "EnrichedEvent@1" }
  [ports.out]
  recorded = { schema = "StoredEvent@1" }
  [effects]
  db = { allow = ["deadletter:insert"] }
  [contract]
  pure_core    = "BloccsNotebook.Events.Deadletter.transform/2"
  effect_shell = "BloccsNotebook.Events.Deadletter.execute/2"
  """,
  "events.bloccs" => """
  [network]
  id = "events"
  version = "0.1.0"
  runtime = "beam"

  [nodes]
  ingest     = { use = "ingest.bloccs" }
  validate   = { use = "validate.bloccs" }
  enrich     = { use = "enrich.bloccs" }
  route      = { use = "route.bloccs" }
  persist    = { use = "persist.bloccs" }
  notify     = { use = "notify.bloccs" }
  deadletter = { use = "deadletter.bloccs" }

  [[edges]]
  from = "ingest.event"
  to   = "validate.event"

  [[edges]]
  from = "validate.valid"
  to   = "enrich.valid"

  [[edges]]
  from = "enrich.enriched"
  to   = "route.enriched"

  # Fan-out: a known event is both persisted and notified.
  [[edges]]
  from = "route.known"
  to   = ["persist.event", "notify.event"]

  [[edges]]
  from = "route.unknown"
  to   = "deadletter.event"

  [expose]
  in  = { webhook = "ingest.received" }
  out = { stored = "persist.stored", notified = "notify.notified", dead = "deadletter.recorded" }

  [supervision]
  strategy     = "rest_for_one"
  max_restarts = 5
  max_seconds  = 60

  [deploy]
  concurrency = { enrich = 4, persist = 1 }
  """
}

for {name, body} <- manifests, do: File.write!(Path.join(dir, name), body)
File.ls!(dir)

The network manifest is where the topology lives: five [[edges]] (one of them a fan-out to two targets), the exposed webhook input and three named outputs, a rest_for_one supervision strategy, and per-node concurrency under [deploy]. The pipeline shape is data, not code.

3 · Define the nodes

Each node is a pure core plus an effect shell. Notice where the work happens:

  • validate rejects an empty payload in the pure core{:error, :empty_payload} — so no downstream effect ever runs for a bad event.
  • enrich reaches the outside world through ctx.effects.http. The retry, timeout, and idempotency it declared are applied by the runtime around this call; the code stays a plain HTTP.get.
  • persist/deadletter write through ctx.effects.db.
  • route makes a pure branching decision and emits to known or unknown.
defmodule BloccsNotebook.Events.Ingest do
  use Bloccs.Node, manifest: "/tmp/bloccs_events/ingest.bloccs"

  def transform(raw, _ctx) do
    id = raw[:id] || raw["id"]
    type = raw[:type] || raw["type"]
    payload = raw[:payload] || raw["payload"]

    if is_binary(id) and is_binary(type) and is_map(payload) do
      {:ok, %{id: id, type: String.downcase(type), payload: payload}}
    else
      {:error, :missing_id_or_type}
    end
  end

  def execute(event, _ctx), do: {:emit, :event, event}
end

defmodule BloccsNotebook.Events.Validate do
  use Bloccs.Node, manifest: "/tmp/bloccs_events/validate.bloccs"

  def transform(event, _ctx) do
    if map_size(event.payload) > 0, do: {:ok, event}, else: {:error, :empty_payload}
  end

  def execute(event, _ctx), do: {:emit, :valid, event}
end

defmodule BloccsNotebook.Events.Enrich do
  use Bloccs.Node, manifest: "/tmp/bloccs_events/enrich.bloccs"
  alias Bloccs.Effects.HTTP

  @lookup_url "http://enrichment.local/lookup"

  def transform(event, _ctx), do: {:ok, event}

  def execute(event, ctx) do
    case HTTP.get(ctx.effects.http, @lookup_url) do
      {:ok, enrichment} -> {:emit, :enriched, Map.put(event, :enrichment, enrichment)}
      {:error, reason} -> {:error, reason}
    end
  end
end

defmodule BloccsNotebook.Events.Route do
  use Bloccs.Node, manifest: "/tmp/bloccs_events/route.bloccs"

  @known ~w(order.created order.updated user.created)

  def transform(event, _ctx), do: {:ok, event}

  def execute(event, _ctx) do
    if event.type in @known, do: {:emit, :known, event}, else: {:emit, :unknown, event}
  end
end

defmodule BloccsNotebook.Events.Persist do
  use Bloccs.Node, manifest: "/tmp/bloccs_events/persist.bloccs"
  alias Bloccs.Effects.DB

  def transform(event, _ctx), do: {:ok, event}

  def execute(event, ctx) do
    {:ok, row} = DB.insert(ctx.effects.db, :events, event_id: event.id, type: event.type)
    {:emit, :stored, %{id: row[:id], event_id: event.id, type: event.type}}
  end
end

defmodule BloccsNotebook.Events.Notify do
  use Bloccs.Node, manifest: "/tmp/bloccs_events/notify.bloccs"
  alias Bloccs.Effects.HTTP

  @hook_url "http://hooks.local/notify"

  def transform(event, _ctx), do: {:ok, event}

  def execute(event, ctx) do
    status =
      case HTTP.post(ctx.effects.http, @hook_url, %{id: event.id, type: event.type}) do
        {:ok, _} -> "sent"
        {:error, _} -> "failed"
      end

    {:emit, :notified, %{id: event.id, status: status}}
  end
end

defmodule BloccsNotebook.Events.Deadletter do
  use Bloccs.Node, manifest: "/tmp/bloccs_events/deadletter.bloccs"
  alias Bloccs.Effects.DB

  def transform(event, _ctx), do: {:ok, event}

  def execute(event, ctx) do
    {:ok, row} = DB.insert(ctx.effects.db, :deadletter, event_id: event.id, type: event.type)
    {:emit, :recorded, %{id: row[:id], event_id: event.id, type: event.type}}
  end
end

4 · Wire the mock effect backends

bloccs ships with mock HTTP and DB backends, and they are the default — no config needed. The HTTP mock refuses any URL you have not stubbed (the same discipline the allowlist enforces), so stub the two endpoints the network calls. In production these become the real HTTP.Req and DB.Ecto adapters through config :bloccs, :effect_backends, ... — the manifests and node code do not change.

alias Bloccs.Effects.HTTP.Mock, as: MockHTTP
alias Bloccs.Effects.DB.Mock, as: MockDB

MockHTTP.reset()
MockDB.reset()
Bloccs.Idempotency.reset()

MockHTTP.stub("GET", "http://enrichment.local/lookup", fn _req ->
  %{"region" => "us-east", "tier" => "gold"}
end)

MockHTTP.stub("POST", "http://hooks.local/notify", fn _req -> %{"ok" => true} end)
:ok

5 · Compile, start, and record a trace

Same loop as notebook 01, on a seven-node graph. Trace.record/1 attaches telemetry so we can measure structural coverage afterward.

{:ok, network} = Bloccs.Parser.parse_network("/tmp/bloccs_events/events.bloccs")
:ok = Bloccs.Validator.validate_network(network)
{:ok, sup} = Bloccs.Compiler.compile_and_load(network)
{:ok, _pid} = sup.start_link([])

# Subscribe to all three exposed outputs.
Bloccs.Router.register_sink(:events, :persist, :stored, self())
Bloccs.Router.register_sink(:events, :notify, :notified, self())
Bloccs.Router.register_sink(:events, :deadletter, :recorded, self())

trace = Bloccs.Trace.record(:events)
:started

Push four webhook events

Two known types, one unknown type, and a replay of evt-1. Watch what each produces:

producer = Bloccs.Router.producer_name(:events, :ingest, :received)

events = [
  %{id: "evt-1", type: "order.created", payload: %{"order_id" => 1001}},
  %{id: "evt-2", type: "user.created", payload: %{"user_id" => 77}},
  # Unknown type → dead-lettered, not dropped.
  %{id: "evt-3", type: "subscription.canceled", payload: %{"sub_id" => 9}},
  # Replay of evt-1 → deduped at enrich (idempotency key = id), never reaches a sink.
  %{id: "evt-1", type: "order.created", payload: %{"order_id" => 1001}}
]

for e <- events, do: Bloccs.Producer.push(producer, e)

# 2 known × (persist + notify) + 1 dead-lettered = 5 sink messages.
# The replayed evt-1 is deduped, so there is no 6th.
for _ <- 1..5 do
  receive do
    {:bloccs_sink, :events, :persist, :stored, %{event_id: id}} -> "persisted #{id}"
    {:bloccs_sink, :events, :notify, :notified, %{id: id, status: s}} -> "notified #{id} (#{s})"
    {:bloccs_sink, :events, :deadletter, :recorded, %{event_id: id}} -> "dead-lettered #{id}"
  after
    3_000 -> "(timeout)"
  end
end

Five results: evt-1 and evt-2 each persisted and notified, evt-3 dead-lettered. The duplicate evt-1 produced nothing — deduped at the enrich node before the lookup ran, concurrency-safe.

Inspect the mock tables

inserts = Enum.reverse(MockDB.inserts())

%{
  events: for({:events, r} <- inserts, do: %{id: r.id, event_id: r.event_id, type: r.type}),
  deadletter: for({:deadletter, r} <- inserts, do: %{id: r.id, event_id: r.event_id, type: r.type})
}

Two rows in events (the known types), one in deadletter (the unknown type). Four events in, three stored, one deduped — the fan-out, the branch, and the idempotency guarantee, all visible in the data.

6 · Structural coverage

Trace.reached/1 turns the recorded telemetry into the set of ports and edges the run actually exercised; Coverage.report/2 measures it against every obligation in the network. This is what mix bloccs.coverage renders from the CLI — here it runs in-process.

reached = Bloccs.Trace.reached(Bloccs.Trace.stop(trace))
report = Bloccs.Coverage.report(network, reached)

IO.puts(Bloccs.Coverage.render(network, report))

The four events between them touch every port and edge — 100.0%. Drop the unknown-type event and re-run from section 5: the route.unknown → deadletter edge goes unreached, and coverage tells you which obligation you stopped testing.

Where next

  • 03_routing_primitives — filter, split, and merge, each as a small runnable network.
  • 04_aggregation_primitives — batch, join, and rate.
  • The events example in the library — the same network as a full Mix project, plus the swap to real HTTP (Req) and a real database (Ecto).
  • The guides — the manifest reference for retry, idempotency, [deploy] concurrency, and the generated supervision tree.