Powered by AppSignal & Oban Pro

Pi Agent — Coding Agent as a Coloured Petri Net

examples/pi_agent.livemd

Pi Agent — Coding Agent as a Coloured Petri Net

Mix.install(
  [
    {:coloured_flow, github: "Byzanteam/coloured_flow"},
    {:kino, "~> 0.14.1"}
  ],
  config: [
    coloured_flow: [
      {ColouredFlow.Runner.Storage, [storage: ColouredFlow.Runner.Storage.InMemory]}
    ]
  ]
)

What we’re building

pi.dev ships a terminal coding agent — a model that reasons, calls tools (read, grep, bash, …), and keeps a session tree you can branch from. The execution loop has well-known shapes:

  • think → tool → observe → repeat — the ReAct cycle.
  • safe vs. risky tool callsread runs unattended; bash waits on a human approval.
  • multi-provider race — the same prompt fans out to several vendors; fastest answer wins.
  • steering — the user injects a follow-up while the agent is mid-flight.

Coloured Petri Nets express exactly these shapes natively: places hold state, transitions consume/produce tokens with guards, workitems sit in :enabled until a driver (a model, a human, a scheduler) starts them. The runner snapshots after every firing, so a crashed agent resumes from the last token state instead of re-running tool calls.

We’ll model Pi’s loop in three passes:

Adds CPN feature
A minimal ReAct cycle deferred choice, free output bindings, self-driving via action
B risky-tool human gate union colour set, workitem held in :enabled until human fires
C provider race + steering injection concurrent enabled workitems, external-fed input place

Mock LLM, mock tools, dialog renderer

These are the only impure pieces — everything CPN-shaped lives in the workflow modules below. The mock LLM is a deterministic function of dialog length so the demo is reproducible. The dialog renderer is a pure markdown function passed into each enactment via lifecycle_hooks options.

defmodule PiAgent.MockLLM do
  @moduledoc false

  # next_step/2: deterministic policy keyed off how many tool turns have run.
  # Returns {:tool, {tool_name, args}} | {:answer, binary}.
  def next_step(dialog, vendor \\ :anthropic) do
    tools = Enum.count(dialog, fn {role, _} -> role == :tool end)

    case {vendor, tools} do
      {_, 0} -> {:tool, {:ls, "lib/"}}
      {_, 1} -> {:tool, {:read, "lib/coloured_flow.ex"}}
      {_, 2} -> {:tool, {:bash, "mix compile"}}
      {v, _} -> {:answer, "[#{v}] lib/ has the ColouredFlow umbrella; compiles clean."}
    end
  end

  # Builds the assistant's proposal message. If a steering / rejection
  # has landed since the agent's last assistant turn, surface it so the
  # human can confirm the model heard them.
  #
  # Pattern in action: after `:reject_proposal` or `:inject_steering`
  # appends a `{:user, _}` to the dialog, the next planning round picks
  # that message up and weaves it into the proposal.
  def proposal_text(dialog, {tool, args}) do
    call = "#{tool}(#{inspect(args)})"

    case latest_steering(dialog) do
      nil -> "proposing #{call}"
      text -> "based on your note (#{inspect(text)}), proposing #{call}"
    end
  end

  defp latest_steering(dialog) do
    # The very first message is the user's original prompt; any
    # subsequent {:user, _} is a steering or rejection note.
    user_msgs =
      dialog
      |> Enum.filter(fn {role, _} -> role == :user end)
      |> Enum.map(fn {:user, c} -> c end)

    case user_msgs do
      [_initial] -> nil
      [_initial | rest] -> List.last(rest)
      [] -> nil
    end
  end

  # Provider-specific latency. Used by Section C to demo races.
  def latency_ms(:anthropic), do: 600
  def latency_ms(:openai), do: 900
  def latency_ms(:google), do: 1200
end

defmodule PiAgent.MockTool do
  @moduledoc false

  def run({:ls, args}), do: "ls(#{args}): coloured_flow.ex, dsl.ex, runner/, definition/, …"
  def run({:read, args}), do: ""
  def run({:grep, args}), do: ""
  def run({:write, args}), do: "wrote: #{args}"
  def run({:edit, args}), do: "edited: #{args}"
  def run({:bash, args}), do: "$ #{args}\n"

  def risky?({tool, _}) when tool in [:write, :edit, :bash], do: true
  def risky?(_), do: false
end

defmodule PiAgent.UI do
  @moduledoc false
  alias ColouredFlow.MultiSet
  alias ColouredFlow.Runner.Storage

  # Renders dialog (left top), event history (right), and an optional
  # controls frame (left bottom) from the lifecycle hook options.
  #
  # The controls frame is rebuilt on every event so each widget only
  # appears when its target workitem is actually firable: plan-gate
  # buttons show up while `:proposed_call` has a token, the risky-tool
  # buttons show up while `:pending_call` carries a `{:risky, _}`, the
  # steering form is visible the whole run. After termination the
  # frame is replaced with a "controls disabled" marker.
  def render(opts, event) do
    terminated? = Map.has_key?(event.markings, "answer")

    if frame = opts[:dialog_frame] do
      Kino.Frame.render(frame, Kino.Markdown.new(format_dialog(event.markings, terminated?)))
    end

    if frame = opts[:events_frame] do
      Kino.Frame.render(frame, Kino.Markdown.new(format_events(event.enactment_id)))
    end

    if frame = opts[:controls_frame] do
      if terminated? do
        Kino.Frame.clear(frame)
      else
        Kino.Frame.render(frame, build_controls(opts, event.markings))
      end
    end

    :ok
  end

  defp build_controls(opts, markings) do
    parts =
      []
      |> maybe_add_plan_gate(opts, markings)
      |> maybe_add_risky_gate(opts, markings)
      |> maybe_add_steer(opts)

    case parts do
      [] -> Kino.Markdown.new("*waiting for the agent to act…*")
      [single] -> single
      many -> Kino.Layout.grid(many, columns: 1)
    end
  end

  defp maybe_add_plan_gate(parts, opts, markings) do
    cond do
      not has_token?(markings, "proposed_call") ->
        parts

      is_nil(opts[:approve_plan_btn]) or is_nil(opts[:reject_plan_btn]) ->
        parts

      true ->
        plan_row =
          Kino.Layout.grid([opts[:approve_plan_btn], opts[:reject_plan_btn]], columns: 2)

        manual = if form = opts[:manual_form], do: [form], else: []
        parts ++ [plan_row | manual]
    end
  end

  defp maybe_add_risky_gate(parts, opts, markings) do
    cond do
      not risky_pending?(markings) ->
        parts

      is_nil(opts[:approve_btn]) or is_nil(opts[:deny_btn]) ->
        parts

      true ->
        parts ++ [Kino.Layout.grid([opts[:approve_btn], opts[:deny_btn]], columns: 2)]
    end
  end

  defp maybe_add_steer(parts, opts) do
    if form = opts[:steer_form], do: parts ++ [form], else: parts
  end

  def read_dialog(markings) do
    case Map.get(markings, "dialog") do
      nil -> []
      ms -> ms |> MultiSet.to_list() |> List.first() || []
    end
  end

  defp format_dialog(markings, terminated? \\ false) do
    state = state_label(markings)
    body = markings |> read_dialog() |> Enum.map_join("\n\n", &format_msg/1)
    footer = if terminated?, do: "\n\n---\n\n**✓ workflow ended.**", else: ""
    "**state**: #{state}\n\n---\n\n#{body}#{footer}"
  end

  # Bindings that add noise to the events panel: :d is the full dialog
  # (already shown in the left frame), :s is the signal unit token.
  @event_noisy_keys [:d, :s]

  defp format_events(enactment_id) do
    occurrences =
      enactment_id
      |> Storage.occurrences_stream(0)
      |> Enum.to_list()

    body =
      case occurrences do
        [] ->
          "_no firings yet_"

        list ->
          list
          |> Enum.with_index(1)
          |> Enum.map_join("\n\n", fn {o, i} -> format_occurrence(o, i) end)
      end

    "**events** (#{length(occurrences)})\n\n#{body}"
  end

  defp format_occurrence(occurrence, idx) do
    name = occurrence.binding_element.transition

    bindings =
      (occurrence.binding_element.binding ++ occurrence.free_binding)
      |> Enum.reject(fn {k, _} -> k in @event_noisy_keys end)

    case bindings do
      [] -> "**#{idx}.** `:#{name}`"
      kvs -> "**#{idx}.** `:#{name}`  \n#{render_bindings(kvs)}"
    end
  end

  defp render_bindings(kvs) do
    Enum.map_join(kvs, "  \n", fn {k, v} ->
      "  `#{k}` = `#{inspect_short(v)}`"
    end)
  end

  defp inspect_short(v) do
    s = inspect(v, limit: 3, printable_limit: 30, structs: false)
    if String.length(s) > 50, do: String.slice(s, 0, 47) <> "…", else: s
  end

  defp state_label(markings) do
    cond do
      Map.has_key?(markings, "answer") -> "✅ done"
      has_token?(markings, "proposed_call") -> "🤔 awaiting plan approval"
      risky_pending?(markings) -> "⏸ waiting for human approval"
      has_token?(markings, "pending_call") -> "⚙ tool running"
      has_token?(markings, "steering_queue") -> "📣 steering pending"
      true -> "💭 thinking"
    end
  end

  defp risky_pending?(markings) do
    case Map.get(markings, "pending_call") do
      nil -> false
      ms -> ms |> MultiSet.to_list() |> Enum.any?(&amp;match?({:risky, _}, &amp;1))
    end
  end

  defp has_token?(markings, place) do
    case Map.get(markings, place) do
      nil -> false
      ms -> MultiSet.size(ms) > 0
    end
  end

  defp format_msg({:user, c}), do: "**user** › #{c}"
  defp format_msg({:assistant, c}), do: "**assistant** › #{c}"
  defp format_msg({:tool, c}), do: "```\n#{c}\n```"
end

Shared runtime: storage + supervisor

Started once for the whole notebook. All three enactments share this.

storage_pid = Kino.start_child!(ColouredFlow.Runner.Storage.InMemory)
runner_pid = Kino.start_child!(ColouredFlow.Runner.Supervisor)

defmodule PiAgent.Sup do
  use Supervisor

  def start_link(_ \\ []) do
    Process.whereis(__MODULE__) &amp;&amp; Supervisor.stop(__MODULE__)
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl Supervisor
  def init(_) do
    Supervisor.init(
      [{Task.Supervisor, name: PiAgent.TaskSup}],
      strategy: :one_for_one
    )
  end
end

Kino.start_child!(PiAgent.Sup)
"runtime ready (#{inspect(runner_pid)})"

A. Simple ReAct cycle

Five places, four transitions, one on_markings termination on :answer.

[ready]──┬──> :think_tool ─{tc}──> [pending_call] ──> :run_tool ─{tr}──> [tool_result]
[dialog]─┤                                                                     │
         │                                                                     v
         └──> :think_answer ─{ans}──> [answer]      [dialog] ──> :merge_result ──> [ready],[dialog]

Three transitions emit free output bindings (tc, tr, ans) — the runner leaves them open and the driver fills them when calling complete_workitem/2. That is how the LLM’s decision (which tool? which answer text?) crosses from impure Elixir into the pure CPN state.

:think_tool and :think_answer share an input pattern — classic deferred choice: both are enabled simultaneously, whichever the driver fires first wins. The driver here calls MockLLM.next_step/1 and routes to one or the other.

defmodule PiAgent.A do
  use ColouredFlow.DSL, task_supervisor: PiAgent.TaskSup

  alias ColouredFlow.Runner.Enactment.WorkitemTransition
  alias ColouredFlow.Runner.Storage

  name "Pi Agent A — Simple ReAct"

  colset role() :: :user | :assistant | :tool
  colset msg() :: {role(), binary()}
  colset dialog() :: list(msg())
  colset tool_name() :: :read | :grep | :ls | :write | :edit | :bash
  colset tool_call() :: {tool_name(), binary()}
  colset tool_result() :: binary()
  colset answer() :: binary()
  colset signal() :: {}

  var d :: dialog()
  var tc :: tool_call()
  var tr :: tool_result()
  var ans :: answer()
  var s :: signal()

  place :dialog, :dialog
  place :ready, :signal
  place :pending_call, :tool_call
  place :tool_result, :tool_result
  place :answer, :answer

  initial_marking :dialog,
    ColouredFlow.MultiSet.new([[{:user, "What's in lib/?"}]])

  initial_marking :ready, ~MS[{}]

  transition :think_tool do
    input :ready, bind({1, s})
    input :dialog, bind({1, d})

    output :dialog, {1, d ++ [{:assistant, "calling tool"}]}
    output :pending_call, {1, tc}

    action do
      tc = Keyword.fetch!(event.occurrence.free_binding, :tc)
      PiAgent.UI.render(options, event)
      PiAgent.A.run_tool(event.enactment_id, tc)
    end
  end

  transition :think_answer do
    input :ready, bind({1, s})
    input :dialog, bind({1, d})

    # Echo the dialog back so the final conversation stays visible
    # in the UI after :explicit termination fires on `:answer`.
    output :dialog, {1, d}
    output :answer, {1, ans}
  end

  transition :run_tool do
    input :pending_call, bind({1, tc})

    output :tool_result, {1, tr}

    action do
      PiAgent.A.merge(event.enactment_id)
    end
  end

  transition :merge_result do
    input :dialog, bind({1, d})
    input :tool_result, bind({1, tr})

    output :dialog, {1, d ++ [{:tool, tr}]}
    output :ready, {1, {}}

    action do
      PiAgent.UI.render(options, event)
      PiAgent.A.think(event.enactment_id, event.markings)
    end
  end

  termination do
    on_markings do
      match?(%{"answer" => _}, markings)
    end
  end

  on_enactment_start do
    PiAgent.UI.render(options, event)
    PiAgent.A.think(event.enactment_id, event.markings)
  end

  on_enactment_terminate do
    PiAgent.UI.render(options, event)
  end

  ## Driver — turns mock-LLM/mock-tool decisions into workitem firings.
  # Sleeps simulate LLM/tool latency so the dialog frame visibly steps
  # through each phase. Drop them in production.

  def think(enactment_id, markings) do
    dialog = PiAgent.UI.read_dialog(markings)
    Process.sleep(700)

    case PiAgent.MockLLM.next_step(dialog) do
      {:tool, tc} -> drive(enactment_id, "think_tool", tc: tc)
      {:answer, ans} -> drive(enactment_id, "think_answer", ans: ans)
    end
  end

  def run_tool(enactment_id, tc) do
    Process.sleep(900)
    drive(enactment_id, "run_tool", tr: PiAgent.MockTool.run(tc))
  end

  def merge(enactment_id) do
    Process.sleep(400)
    drive(enactment_id, "merge_result", [])
  end

  def to_mermaid do
    cpnet()
    |> ColouredFlow.Definition.Presentation.to_mermaid()
    |> Kino.Mermaid.new()
  end

  defp drive(enactment_id, name, free_binding) do
    enactment_id
    |> Storage.list_live_workitems()
    |> Enum.find(&amp;(&amp;1.state == :enabled and &amp;1.binding_element.transition == name))
    |> case do
      nil ->
        :ok

      %{id: wid} ->
        {:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)
        {:ok, _} = WorkitemTransition.complete_workitem(enactment_id, {wid, free_binding})
        :ok
    end
  end
end

PiAgent.A.to_mermaid()
import ColouredFlow.Runner.Storage.InMemory, only: :macros
alias ColouredFlow.Runner.Storage.InMemory

flow_a = InMemory.insert_flow!(PiAgent.A.cpnet())
{:ok, e_a} = PiAgent.A.insert_enactment(flow(flow_a, :id))
enactment_a = enactment(e_a, :id)

dialog_a = Kino.Frame.new()
events_a = Kino.Frame.new()

{:ok, _} =
  PiAgent.A.start_enactment(enactment_a,
    lifecycle_hooks: {PiAgent.A, [dialog_frame: dialog_a, events_frame: events_a]}
  )

Kino.Layout.grid([dialog_a, events_a], columns: 2, boxed: true)

B. Risky-tool human gate

Same loop — but the model now sometimes asks for a :bash call. We do not trust a model with a shell unattended. The fix is one extra hop: the :think_tool output flows into a union-typed :pending_call place, tagged {:safe, op} or {:risky, op}. Two consumers compete for it:

  • :run_safe matches {:safe, op} — driver auto-fires it.
  • :run_risky matches {:risky, op} — workitem stays in :enabled until a human clicks an approve button. The runner is happy to wait days.

The crucial CPN trick is that pattern-matching on a token’s shape is how guards work — no extra guard clause needed; bind({1, {:safe, op}}) itself is the discriminator.

defmodule PiAgent.B do
  use ColouredFlow.DSL, task_supervisor: PiAgent.TaskSup

  alias ColouredFlow.Runner.Enactment.WorkitemTransition
  alias ColouredFlow.Runner.Storage

  name "Pi Agent B — Permission Gate"

  colset role() :: :user | :assistant | :tool
  colset msg() :: {role(), binary()}
  colset dialog() :: list(msg())
  colset tool_name() :: :read | :grep | :ls | :write | :edit | :bash
  colset tool_op() :: {tool_name(), binary()}
  colset tool_call() :: {:safe, tool_op()} | {:risky, tool_op()}
  colset tool_result() :: binary()
  colset answer() :: binary()
  colset signal() :: {}

  var d :: dialog()
  var tc :: tool_call()
  var op :: tool_op()
  var tr :: tool_result()
  var ans :: answer()
  var s :: signal()

  place :dialog, :dialog
  place :ready, :signal
  place :pending_call, :tool_call
  place :tool_result, :tool_result
  place :answer, :answer

  initial_marking :dialog,
    ColouredFlow.MultiSet.new([[{:user, "Update lib/foo.ex via bash."}]])

  initial_marking :ready, ~MS[{}]

  transition :think_tool do
    input :ready, bind({1, s})
    input :dialog, bind({1, d})

    output :dialog, {1, d ++ [{:assistant, "calling tool"}]}
    output :pending_call, {1, tc}

    action do
      tc = Keyword.fetch!(event.occurrence.free_binding, :tc)
      PiAgent.UI.render(options, event)
      PiAgent.B.dispatch_call(event.enactment_id, tc)
    end
  end

  transition :think_answer do
    input :ready, bind({1, s})
    input :dialog, bind({1, d})

    # Echo the dialog back so the final conversation stays visible
    # in the UI after :explicit termination fires on `:answer`.
    output :dialog, {1, d}
    output :answer, {1, ans}
  end

  transition :run_safe do
    input :pending_call, bind({1, {:safe, op}})

    output :tool_result, {1, tr}

    action do
      PiAgent.B.merge(event.enactment_id)
    end
  end

  transition :run_risky do
    input :pending_call, bind({1, {:risky, op}})

    output :tool_result, {1, tr}

    action do
      PiAgent.B.merge(event.enactment_id)
    end
  end

  transition :merge_result do
    input :dialog, bind({1, d})
    input :tool_result, bind({1, tr})

    output :dialog, {1, d ++ [{:tool, tr}]}
    output :ready, {1, {}}

    action do
      PiAgent.UI.render(options, event)
      PiAgent.B.think(event.enactment_id, event.markings)
    end
  end

  termination do
    on_markings do
      match?(%{"answer" => _}, markings)
    end
  end

  on_enactment_start do
    PiAgent.UI.render(options, event)
    PiAgent.B.think(event.enactment_id, event.markings)
  end

  on_enactment_terminate do
    PiAgent.UI.render(options, event)
  end

  ## Driver

  def think(enactment_id, markings) do
    dialog = PiAgent.UI.read_dialog(markings)
    Process.sleep(700)

    case PiAgent.MockLLM.next_step(dialog) do
      {:tool, op} ->
        tagged = if PiAgent.MockTool.risky?(op), do: {:risky, op}, else: {:safe, op}
        drive(enactment_id, "think_tool", tc: tagged)

      {:answer, ans} ->
        drive(enactment_id, "think_answer", ans: ans)
    end
  end

  # After :think_tool fires, route based on the tag — auto-run safe,
  # leave risky workitem dangling for human approval.
  def dispatch_call(enactment_id, {:safe, op}) do
    Process.sleep(900)
    drive(enactment_id, "run_safe", tr: PiAgent.MockTool.run(op))
  end

  def dispatch_call(_enactment_id, {:risky, _op}), do: :ok

  def approve(enactment_id) do
    case find_risky(enactment_id) do
      nil ->
        :no_pending

      %{id: wid, binding_element: %{binding: binding}} ->
        op = Keyword.fetch!(binding, :op)
        Process.sleep(100)
        {:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)

        {:ok, _} =
          WorkitemTransition.complete_workitem(
            enactment_id,
            {wid, [tr: PiAgent.MockTool.run(op)]}
          )

        :ok
    end
  end

  # Compensation pattern: fire :run_risky with a synthetic "denied" result
  # so the agent's loop continues with the rejection in the dialog.
  def deny(enactment_id) do
    case find_risky(enactment_id) do
      nil ->
        :no_pending

      %{id: wid} ->
        {:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)

        {:ok, _} =
          WorkitemTransition.complete_workitem(
            enactment_id,
            {wid, [tr: "[denied by user]"]}
          )

        :denied
    end
  end

  def merge(enactment_id) do
    Process.sleep(400)
    drive(enactment_id, "merge_result", [])
  end

  def to_mermaid do
    cpnet()
    |> ColouredFlow.Definition.Presentation.to_mermaid()
    |> Kino.Mermaid.new()
  end

  defp find_risky(enactment_id) do
    enactment_id
    |> Storage.list_live_workitems()
    |> Enum.find(&amp;(&amp;1.state == :enabled and &amp;1.binding_element.transition == "run_risky"))
  end

  defp drive(enactment_id, name, free_binding) do
    enactment_id
    |> Storage.list_live_workitems()
    |> Enum.find(&amp;(&amp;1.state == :enabled and &amp;1.binding_element.transition == name))
    |> case do
      nil ->
        :ok

      %{id: wid} ->
        {:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)
        {:ok, _} = WorkitemTransition.complete_workitem(enactment_id, {wid, free_binding})
        :ok
    end
  end
end

PiAgent.B.to_mermaid()
flow_b = InMemory.insert_flow!(PiAgent.B.cpnet())
{:ok, e_b} = PiAgent.B.insert_enactment(flow(flow_b, :id))
enactment_b = enactment(e_b, :id)

dialog_b = Kino.Frame.new()
events_b = Kino.Frame.new()
controls_b = Kino.Frame.new()
approve_btn = Kino.Control.button("Approve risky tool")
deny_btn = Kino.Control.button("Deny")

Kino.listen(approve_btn, fn _ -> PiAgent.B.approve(enactment_b) end)
Kino.listen(deny_btn, fn _ -> PiAgent.B.deny(enactment_b) end)

{:ok, _} =
  PiAgent.B.start_enactment(enactment_b,
    lifecycle_hooks:
      {PiAgent.B,
       [
         dialog_frame: dialog_b,
         events_frame: events_b,
         controls_frame: controls_b,
         approve_btn: approve_btn,
         deny_btn: deny_btn
       ]}
  )

left_b = Kino.Layout.grid([dialog_b, controls_b], columns: 1)
Kino.Layout.grid([left_b, events_b], columns: 2, boxed: true)

The agent will run :ls and :read automatically (both safe), then stop on a bash call. The dialog renders ⏸ waiting for human approval until you click Approve — the workitem fires, the result merges, the loop resumes.

C. Multi-provider race + steering + plan-then-act

Three upgrades over Section B:

  1. Provider race. :propose_call and :think_answer are both eligible the moment (:ready, :dialog) align. Three independent driver tasks call Anthropic / OpenAI / Google mocks in parallel; the first to finish wins the workitem completion (CPN consumes the input tokens, the rest no-op when their drive call finds an empty workitem list).
  2. Steering queue. A new :steering_queue place plus an :append_steering transition lets the user push a follow-up message while the agent is mid-flight. :inject_steering drains the queue into :dialog before the next round.
  3. Plan-then-act gate with three branches. Section B let safe tools auto-fire and only gated risky ones. Section C drops the safe/risky tag and routes every tool call through a human review step: :propose_call → [:proposed_call] → one of three exits. Each exit appends a {:user, _} line to the dialog so the next planning round can quote it back via MockLLM.proposal_text/2:
    • :approve_proposal — call moves to :pending_call, :run_tool executes it; dialog gains {:user, "approve"}.
    • :reject_proposal — drops the call, restores :ready; dialog gains {:user, "reject: "}.
    • :manual_input — drops the call, restores :ready; dialog gains the human’s literal {:user, } (a free-form override that the next race round picks up via proposal_text/2).

append_steering consumes a :steering_capacity token (a counting semaphore — bounds how many follow-ups the user may push) and produces a msg() token in the queue. This is the canonical CPN pattern for externally-fed input: there is no “inject token” runner API; you declare a transition whose only job is to accept a value via a free binding. The same pattern (:approve_proposal / :reject_proposal, each with a free binding) gates the plan-then-act review.

defmodule PiAgent.C do
  use ColouredFlow.DSL, task_supervisor: PiAgent.TaskSup

  alias ColouredFlow.Runner.Enactment.WorkitemTransition
  alias ColouredFlow.Runner.Storage

  name "Pi Agent C — Race & Steering & Plan-then-act"

  colset role() :: :user | :assistant | :tool
  colset msg() :: {role(), binary()}
  colset dialog() :: list(msg())
  colset tool_name() :: :read | :grep | :ls | :write | :edit | :bash
  colset tool_call() :: {tool_name(), binary()}
  colset tool_result() :: binary()
  colset answer() :: binary()
  colset note() :: binary()
  colset proposal() :: binary()
  colset signal() :: {}

  var d :: dialog()
  var tc :: tool_call()
  var tr :: tool_result()
  var ans :: answer()
  var s :: signal()
  var m :: msg()
  var note :: note()
  var proposal :: proposal()

  place :dialog, :dialog
  place :ready, :signal
  place :proposed_call, :tool_call
  place :pending_call, :tool_call
  place :tool_result, :tool_result
  place :answer, :answer
  place :steering_queue, :msg
  place :steering_capacity, :signal

  initial_marking :dialog,
    ColouredFlow.MultiSet.new([[{:user, "What's in lib/?"}]])

  initial_marking :ready, ~MS[{}]
  # 5 user-steering slots: each :append_steering consumes one.
  initial_marking :steering_capacity, ~MS[5**{}]

  # Pre-seed a steering message so the demo always shows steering even
  # without typing into the live form.
  initial_marking :steering_queue,
    ColouredFlow.MultiSet.new([{:user, "Also count the files."}])

  # Race producers fire :propose_call instead of running the tool. The
  # call sits in :proposed_call until a human approves or rejects it.
  # `proposal` is a free binding the driver fills with the assistant's
  # planning message — it can quote the latest steering / rejection
  # so the human sees the model react to their note.
  transition :propose_call do
    input :ready, bind({1, s})
    input :dialog, bind({1, d})

    output :dialog, {1, d ++ [{:assistant, proposal}]}
    output :proposed_call, {1, tc}

    action do
      PiAgent.UI.render(options, event)
    end
  end

  transition :think_answer do
    input :ready, bind({1, s})
    input :dialog, bind({1, d})

    # Echo the dialog back so the final conversation stays visible
    # in the UI after :explicit termination fires on `:answer`.
    output :dialog, {1, d}
    output :answer, {1, ans}
  end

  # Human-driven option 1: approve the plan. The proposed call moves to
  # :pending_call and runs; an "approve" note lands in the dialog so the
  # next round sees the human assented.
  transition :approve_proposal do
    input :proposed_call, bind({1, tc})
    input :dialog, bind({1, d})

    output :dialog, {1, d ++ [{:user, "approve"}]}
    output :pending_call, {1, tc}

    action do
      PiAgent.UI.render(options, event)
      PiAgent.C.run_tool(event.enactment_id, tc)
    end
  end

  # Human-driven option 2: reject the plan. Drops :proposed_call, appends
  # a `reject: ` line to the dialog, and restores :ready.
  transition :reject_proposal do
    input :proposed_call, bind({1, tc})
    input :dialog, bind({1, d})

    output :dialog, {1, d ++ [{:user, "reject: " <> note}]}
    output :ready, {1, {}}

    action do
      PiAgent.UI.render(options, event)
      PiAgent.C.think(event.enactment_id, event.markings)
    end
  end

  # Human-driven option 3: override the plan with a manual instruction.
  # The dropped proposal yields to a fresh `{:user, note}` line; the
  # next race round picks the new instruction up via `proposal_text/2`.
  transition :manual_input do
    input :proposed_call, bind({1, tc})
    input :dialog, bind({1, d})

    output :dialog, {1, d ++ [{:user, note}]}
    output :ready, {1, {}}

    action do
      PiAgent.UI.render(options, event)
      PiAgent.C.think(event.enactment_id, event.markings)
    end
  end

  transition :run_tool do
    input :pending_call, bind({1, tc})

    output :tool_result, {1, tr}

    action do
      PiAgent.C.merge(event.enactment_id)
    end
  end

  transition :merge_result do
    input :dialog, bind({1, d})
    input :tool_result, bind({1, tr})

    output :dialog, {1, d ++ [{:tool, tr}]}
    output :ready, {1, {}}

    action do
      PiAgent.UI.render(options, event)
      PiAgent.C.think(event.enactment_id, event.markings)
    end
  end

  transition :append_steering do
    input :steering_capacity, bind({1, s})

    output :steering_queue, {1, m}
  end

  transition :inject_steering do
    input :ready, bind({1, s})
    input :dialog, bind({1, d})
    input :steering_queue, bind({1, m})

    output :dialog, {1, d ++ [m]}
    output :ready, {1, s}

    action do
      PiAgent.UI.render(options, event)
      PiAgent.C.think(event.enactment_id, event.markings)
    end
  end

  termination do
    on_markings do
      match?(%{"answer" => _}, markings)
    end
  end

  on_enactment_start do
    PiAgent.UI.render(options, event)
    PiAgent.C.think(event.enactment_id, event.markings)
  end

  on_enactment_terminate do
    PiAgent.UI.render(options, event)
  end

  ## Driver

  def think(enactment_id, markings) do
    # Steering takes precedence — drain the queue first.
    if drain_steering(enactment_id) == :drained do
      :ok
    else
      race_providers(enactment_id, markings)
    end
  end

  def race_providers(enactment_id, markings) do
    dialog = PiAgent.UI.read_dialog(markings)

    Enum.each([:anthropic, :openai, :google], fn vendor ->
      Task.Supervisor.start_child(PiAgent.TaskSup, fn ->
        Process.sleep(PiAgent.MockLLM.latency_ms(vendor))

        case PiAgent.MockLLM.next_step(dialog, vendor) do
          {:tool, op} ->
            text = PiAgent.MockLLM.proposal_text(dialog, op)
            drive(enactment_id, "propose_call", tc: op, proposal: text)

          {:answer, ans} ->
            drive(enactment_id, "think_answer", ans: ans)
        end
      end)
    end)
  end

  def run_tool(enactment_id, tc) do
    Process.sleep(900)
    drive(enactment_id, "run_tool", tr: PiAgent.MockTool.run(tc))
  end

  def approve(enactment_id) do
    drive(enactment_id, "approve_proposal", [])
  end

  # Reject the pending plan. Default note kept short so dialog stays
  # readable; the form-driven path passes a custom note.
  def reject(enactment_id, note \\ "try a different tool")
      when is_binary(note) do
    drive(enactment_id, "reject_proposal", note: note)
  end

  # Override the pending plan with a manual instruction. Drops the
  # proposal and lets the next planning round read the new note.
  def override(enactment_id, note) when is_binary(note) and note != "" do
    drive(enactment_id, "manual_input", note: note)
  end

  def steer(enactment_id, content) when is_binary(content) do
    drive(enactment_id, "append_steering", m: {:user, content})
  end

  def merge(enactment_id) do
    Process.sleep(400)
    drive(enactment_id, "merge_result", [])
  end

  def to_mermaid do
    cpnet()
    |> ColouredFlow.Definition.Presentation.to_mermaid()
    |> Kino.Mermaid.new()
  end

  defp drain_steering(enactment_id) do
    case find_workitem(enactment_id, "inject_steering") do
      nil ->
        :empty

      %{id: wid} ->
        {:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)
        {:ok, _} = WorkitemTransition.complete_workitem(enactment_id, {wid, []})
        :drained
    end
  end

  defp find_workitem(enactment_id, transition_name) do
    enactment_id
    |> Storage.list_live_workitems()
    |> Enum.find(&amp;(&amp;1.state == :enabled and &amp;1.binding_element.transition == transition_name))
  end

  defp drive(enactment_id, name, free_binding) do
    case find_workitem(enactment_id, name) do
      nil ->
        :ok

      %{id: wid} ->
        {:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)
        {:ok, _} = WorkitemTransition.complete_workitem(enactment_id, {wid, free_binding})
        :ok
    end
  end
end

PiAgent.C.to_mermaid()
flow_c = InMemory.insert_flow!(PiAgent.C.cpnet())
{:ok, e_c} = PiAgent.C.insert_enactment(flow(flow_c, :id))
enactment_c = enactment(e_c, :id)

dialog_c = Kino.Frame.new()
events_c = Kino.Frame.new()
controls_c = Kino.Frame.new()

steer_input =
  Kino.Control.form([msg: Kino.Input.text("Steering")],
    submit: "Send steering",
    reset_on_submit: [:msg]
  )

manual_input =
  Kino.Control.form([note: Kino.Input.text("Manual instruction")],
    submit: "Override plan",
    reset_on_submit: [:note]
  )

approve_plan_btn = Kino.Control.button("Approve plan")
reject_plan_btn = Kino.Control.button("Reject plan")

Kino.listen(steer_input, fn %{data: %{msg: text}} ->
  if text != "", do: PiAgent.C.steer(enactment_c, text)
end)

Kino.listen(manual_input, fn %{data: %{note: text}} ->
  if text != "", do: PiAgent.C.override(enactment_c, text)
end)

Kino.listen(approve_plan_btn, fn _ -> PiAgent.C.approve(enactment_c) end)
Kino.listen(reject_plan_btn, fn _ -> PiAgent.C.reject(enactment_c) end)

{:ok, _} =
  PiAgent.C.start_enactment(enactment_c,
    lifecycle_hooks:
      {PiAgent.C,
       [
         dialog_frame: dialog_c,
         events_frame: events_c,
         controls_frame: controls_c,
         approve_plan_btn: approve_plan_btn,
         reject_plan_btn: reject_plan_btn,
         manual_form: manual_input,
         steer_form: steer_input
       ]}
  )

left_c = Kino.Layout.grid([dialog_c, controls_c], columns: 1)
Kino.Layout.grid([left_c, events_c], columns: 2, boxed: true)

How to test the section

Four interactive controls. Each one lands a {:user, _} message in the dialog, and the next planning round picks the latest such message up via MockLLM.proposal_text/2 and quotes it back in the proposal so the human can verify the model heard them.

  1. Approve plan (button). Fires :approve_proposal. Dialog gains {:user, "approve"}; tool runs; agent re-plans for next round.
  2. Reject plan (button). Fires :reject_proposal with a fixed default note. Dialog gains {:user, "reject: try a different tool"}; the proposal is dropped; agent re-plans, and the next propose_call quotes the rejection back to you.
  3. Manual instruction (form). Type a custom note and click Override plan. Fires :manual_input with your text as the note free binding. Dialog gains {:user, }; proposal dropped; agent re-plans with your override as the new context.
  4. Steering (form). Type a follow-up and click Send steering. Fires :append_steering (queues the message). The next time :ready and :dialog align — typically right after the current tool merges or the current proposal resolves — :inject_steering drains the queue into the dialog.

Pre-seeded steering: :steering_queue starts with one token ({:user, "Also count the files."}). The very first think call drains it, so the first proposal already references it. You’ll see this on every run with no input from you.

If you click any control after the agent terminates (✅ done), nothing happens — drive/3 finds no enabled workitem and returns silently. Re-evaluate the C run cell to start a fresh enactment.

What this buys you

The same module is also:

  • Crash-safe. Kill the runner mid-bash and restart — the runner replays Occurrences from the latest snapshot, the workitem state comes back exactly where it was, no duplicate tool calls.
  • Time-travelable. Every firing is a stored event; an audit UI gets the agent’s full reasoning trace for free.
  • Branchable. Insert a fresh enactment from a snapshot of an existing one and you have a session fork — the same primitive Pi exposes via /fork and /clone, but earned automatically by the event-source runner.
  • Provider-agnostic. The CPN cares about token shapes, not vendors. Add a new provider in Section C by adding one line to race_providers/2; the structure of the net does not change.

The 200 lines of PiAgent.C give you a saga-style coding agent with human-in-loop, race semantics, mid-flight steering, and durable replay — properties that take thousands of lines to retrofit onto a hand-written ReAct loop.