Pi Agent — Coding Agent as a Coloured Petri Net
Mix.install(
[
{:coloured_flow, github: "Byzanteam/coloured_flow"},
{:kino, "~> 0.14.1"}
],
config: [
coloured_flow: [
{ColouredFlow.Runner.Storage, [storage: ColouredFlow.Runner.Storage.InMemory]}
]
]
)
What we’re building
pi.dev ships a terminal coding agent — a model that reasons,
calls tools (read, grep, bash, …), and keeps a session tree you can
branch from. The execution loop has well-known shapes:
- think → tool → observe → repeat — the ReAct cycle.
-
safe vs. risky tool calls —
readruns unattended;bashwaits on a human approval. - multi-provider race — the same prompt fans out to several vendors; fastest answer wins.
- steering — the user injects a follow-up while the agent is mid-flight.
Coloured Petri Nets express exactly these shapes natively: places hold
state, transitions consume/produce tokens with guards, workitems sit in
:enabled until a driver (a model, a human, a scheduler) starts them. The
runner snapshots after every firing, so a crashed agent resumes from the
last token state instead of re-running tool calls.
We’ll model Pi’s loop in three passes:
| Adds | CPN feature | |
|---|---|---|
| A | minimal ReAct cycle |
deferred choice, free output bindings, self-driving via action |
| B | risky-tool human gate |
union colour set, workitem held in :enabled until human fires |
| C | provider race + steering injection | concurrent enabled workitems, external-fed input place |
Mock LLM, mock tools, dialog renderer
These are the only impure pieces — everything CPN-shaped lives in the
workflow modules below. The mock LLM is a deterministic function of dialog
length so the demo is reproducible. The dialog renderer is a pure markdown
function passed into each enactment via lifecycle_hooks options.
defmodule PiAgent.MockLLM do
@moduledoc false
# next_step/2: deterministic policy keyed off how many tool turns have run.
# Returns {:tool, {tool_name, args}} | {:answer, binary}.
def next_step(dialog, vendor \\ :anthropic) do
tools = Enum.count(dialog, fn {role, _} -> role == :tool end)
case {vendor, tools} do
{_, 0} -> {:tool, {:ls, "lib/"}}
{_, 1} -> {:tool, {:read, "lib/coloured_flow.ex"}}
{_, 2} -> {:tool, {:bash, "mix compile"}}
{v, _} -> {:answer, "[#{v}] lib/ has the ColouredFlow umbrella; compiles clean."}
end
end
# Builds the assistant's proposal message. If a steering / rejection
# has landed since the agent's last assistant turn, surface it so the
# human can confirm the model heard them.
#
# Pattern in action: after `:reject_proposal` or `:inject_steering`
# appends a `{:user, _}` to the dialog, the next planning round picks
# that message up and weaves it into the proposal.
def proposal_text(dialog, {tool, args}) do
call = "#{tool}(#{inspect(args)})"
case latest_steering(dialog) do
nil -> "proposing #{call}"
text -> "based on your note (#{inspect(text)}), proposing #{call}"
end
end
defp latest_steering(dialog) do
# The very first message is the user's original prompt; any
# subsequent {:user, _} is a steering or rejection note.
user_msgs =
dialog
|> Enum.filter(fn {role, _} -> role == :user end)
|> Enum.map(fn {:user, c} -> c end)
case user_msgs do
[_initial] -> nil
[_initial | rest] -> List.last(rest)
[] -> nil
end
end
# Provider-specific latency. Used by Section C to demo races.
def latency_ms(:anthropic), do: 600
def latency_ms(:openai), do: 900
def latency_ms(:google), do: 1200
end
defmodule PiAgent.MockTool do
@moduledoc false
def run({:ls, args}), do: "ls(#{args}): coloured_flow.ex, dsl.ex, runner/, definition/, …"
def run({:read, args}), do: ""
def run({:grep, args}), do: ""
def run({:write, args}), do: "wrote: #{args}"
def run({:edit, args}), do: "edited: #{args}"
def run({:bash, args}), do: "$ #{args}\n"
def risky?({tool, _}) when tool in [:write, :edit, :bash], do: true
def risky?(_), do: false
end
defmodule PiAgent.UI do
@moduledoc false
alias ColouredFlow.MultiSet
alias ColouredFlow.Runner.Storage
# Renders dialog (left top), event history (right), and an optional
# controls frame (left bottom) from the lifecycle hook options.
#
# The controls frame is rebuilt on every event so each widget only
# appears when its target workitem is actually firable: plan-gate
# buttons show up while `:proposed_call` has a token, the risky-tool
# buttons show up while `:pending_call` carries a `{:risky, _}`, the
# steering form is visible the whole run. After termination the
# frame is replaced with a "controls disabled" marker.
def render(opts, event) do
terminated? = Map.has_key?(event.markings, "answer")
if frame = opts[:dialog_frame] do
Kino.Frame.render(frame, Kino.Markdown.new(format_dialog(event.markings, terminated?)))
end
if frame = opts[:events_frame] do
Kino.Frame.render(frame, Kino.Markdown.new(format_events(event.enactment_id)))
end
if frame = opts[:controls_frame] do
if terminated? do
Kino.Frame.clear(frame)
else
Kino.Frame.render(frame, build_controls(opts, event.markings))
end
end
:ok
end
defp build_controls(opts, markings) do
parts =
[]
|> maybe_add_plan_gate(opts, markings)
|> maybe_add_risky_gate(opts, markings)
|> maybe_add_steer(opts)
case parts do
[] -> Kino.Markdown.new("*waiting for the agent to act…*")
[single] -> single
many -> Kino.Layout.grid(many, columns: 1)
end
end
defp maybe_add_plan_gate(parts, opts, markings) do
cond do
not has_token?(markings, "proposed_call") ->
parts
is_nil(opts[:approve_plan_btn]) or is_nil(opts[:reject_plan_btn]) ->
parts
true ->
plan_row =
Kino.Layout.grid([opts[:approve_plan_btn], opts[:reject_plan_btn]], columns: 2)
manual = if form = opts[:manual_form], do: [form], else: []
parts ++ [plan_row | manual]
end
end
defp maybe_add_risky_gate(parts, opts, markings) do
cond do
not risky_pending?(markings) ->
parts
is_nil(opts[:approve_btn]) or is_nil(opts[:deny_btn]) ->
parts
true ->
parts ++ [Kino.Layout.grid([opts[:approve_btn], opts[:deny_btn]], columns: 2)]
end
end
defp maybe_add_steer(parts, opts) do
if form = opts[:steer_form], do: parts ++ [form], else: parts
end
def read_dialog(markings) do
case Map.get(markings, "dialog") do
nil -> []
ms -> ms |> MultiSet.to_list() |> List.first() || []
end
end
defp format_dialog(markings, terminated? \\ false) do
state = state_label(markings)
body = markings |> read_dialog() |> Enum.map_join("\n\n", &format_msg/1)
footer = if terminated?, do: "\n\n---\n\n**✓ workflow ended.**", else: ""
"**state**: #{state}\n\n---\n\n#{body}#{footer}"
end
# Bindings that add noise to the events panel: :d is the full dialog
# (already shown in the left frame), :s is the signal unit token.
@event_noisy_keys [:d, :s]
defp format_events(enactment_id) do
occurrences =
enactment_id
|> Storage.occurrences_stream(0)
|> Enum.to_list()
body =
case occurrences do
[] ->
"_no firings yet_"
list ->
list
|> Enum.with_index(1)
|> Enum.map_join("\n\n", fn {o, i} -> format_occurrence(o, i) end)
end
"**events** (#{length(occurrences)})\n\n#{body}"
end
defp format_occurrence(occurrence, idx) do
name = occurrence.binding_element.transition
bindings =
(occurrence.binding_element.binding ++ occurrence.free_binding)
|> Enum.reject(fn {k, _} -> k in @event_noisy_keys end)
case bindings do
[] -> "**#{idx}.** `:#{name}`"
kvs -> "**#{idx}.** `:#{name}` \n#{render_bindings(kvs)}"
end
end
defp render_bindings(kvs) do
Enum.map_join(kvs, " \n", fn {k, v} ->
" `#{k}` = `#{inspect_short(v)}`"
end)
end
defp inspect_short(v) do
s = inspect(v, limit: 3, printable_limit: 30, structs: false)
if String.length(s) > 50, do: String.slice(s, 0, 47) <> "…", else: s
end
defp state_label(markings) do
cond do
Map.has_key?(markings, "answer") -> "✅ done"
has_token?(markings, "proposed_call") -> "🤔 awaiting plan approval"
risky_pending?(markings) -> "⏸ waiting for human approval"
has_token?(markings, "pending_call") -> "⚙ tool running"
has_token?(markings, "steering_queue") -> "📣 steering pending"
true -> "💭 thinking"
end
end
defp risky_pending?(markings) do
case Map.get(markings, "pending_call") do
nil -> false
ms -> ms |> MultiSet.to_list() |> Enum.any?(&match?({:risky, _}, &1))
end
end
defp has_token?(markings, place) do
case Map.get(markings, place) do
nil -> false
ms -> MultiSet.size(ms) > 0
end
end
defp format_msg({:user, c}), do: "**user** › #{c}"
defp format_msg({:assistant, c}), do: "**assistant** › #{c}"
defp format_msg({:tool, c}), do: "```\n#{c}\n```"
end
Shared runtime: storage + supervisor
Started once for the whole notebook. All three enactments share this.
storage_pid = Kino.start_child!(ColouredFlow.Runner.Storage.InMemory)
runner_pid = Kino.start_child!(ColouredFlow.Runner.Supervisor)
defmodule PiAgent.Sup do
use Supervisor
def start_link(_ \\ []) do
Process.whereis(__MODULE__) && Supervisor.stop(__MODULE__)
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
@impl Supervisor
def init(_) do
Supervisor.init(
[{Task.Supervisor, name: PiAgent.TaskSup}],
strategy: :one_for_one
)
end
end
Kino.start_child!(PiAgent.Sup)
"runtime ready (#{inspect(runner_pid)})"
A. Simple ReAct cycle
Five places, four transitions, one on_markings termination on :answer.
[ready]──┬──> :think_tool ─{tc}──> [pending_call] ──> :run_tool ─{tr}──> [tool_result]
[dialog]─┤ │
│ v
└──> :think_answer ─{ans}──> [answer] [dialog] ──> :merge_result ──> [ready],[dialog]
Three transitions emit free output bindings (tc, tr, ans) — the
runner leaves them open and the driver fills them when calling
complete_workitem/2. That is how the LLM’s decision (which tool? which
answer text?) crosses from impure Elixir into the pure CPN state.
:think_tool and :think_answer share an input pattern — classic
deferred choice: both are enabled simultaneously, whichever the driver
fires first wins. The driver here calls MockLLM.next_step/1 and routes
to one or the other.
defmodule PiAgent.A do
use ColouredFlow.DSL, task_supervisor: PiAgent.TaskSup
alias ColouredFlow.Runner.Enactment.WorkitemTransition
alias ColouredFlow.Runner.Storage
name "Pi Agent A — Simple ReAct"
colset role() :: :user | :assistant | :tool
colset msg() :: {role(), binary()}
colset dialog() :: list(msg())
colset tool_name() :: :read | :grep | :ls | :write | :edit | :bash
colset tool_call() :: {tool_name(), binary()}
colset tool_result() :: binary()
colset answer() :: binary()
colset signal() :: {}
var d :: dialog()
var tc :: tool_call()
var tr :: tool_result()
var ans :: answer()
var s :: signal()
place :dialog, :dialog
place :ready, :signal
place :pending_call, :tool_call
place :tool_result, :tool_result
place :answer, :answer
initial_marking :dialog,
ColouredFlow.MultiSet.new([[{:user, "What's in lib/?"}]])
initial_marking :ready, ~MS[{}]
transition :think_tool do
input :ready, bind({1, s})
input :dialog, bind({1, d})
output :dialog, {1, d ++ [{:assistant, "calling tool"}]}
output :pending_call, {1, tc}
action do
tc = Keyword.fetch!(event.occurrence.free_binding, :tc)
PiAgent.UI.render(options, event)
PiAgent.A.run_tool(event.enactment_id, tc)
end
end
transition :think_answer do
input :ready, bind({1, s})
input :dialog, bind({1, d})
# Echo the dialog back so the final conversation stays visible
# in the UI after :explicit termination fires on `:answer`.
output :dialog, {1, d}
output :answer, {1, ans}
end
transition :run_tool do
input :pending_call, bind({1, tc})
output :tool_result, {1, tr}
action do
PiAgent.A.merge(event.enactment_id)
end
end
transition :merge_result do
input :dialog, bind({1, d})
input :tool_result, bind({1, tr})
output :dialog, {1, d ++ [{:tool, tr}]}
output :ready, {1, {}}
action do
PiAgent.UI.render(options, event)
PiAgent.A.think(event.enactment_id, event.markings)
end
end
termination do
on_markings do
match?(%{"answer" => _}, markings)
end
end
on_enactment_start do
PiAgent.UI.render(options, event)
PiAgent.A.think(event.enactment_id, event.markings)
end
on_enactment_terminate do
PiAgent.UI.render(options, event)
end
## Driver — turns mock-LLM/mock-tool decisions into workitem firings.
# Sleeps simulate LLM/tool latency so the dialog frame visibly steps
# through each phase. Drop them in production.
def think(enactment_id, markings) do
dialog = PiAgent.UI.read_dialog(markings)
Process.sleep(700)
case PiAgent.MockLLM.next_step(dialog) do
{:tool, tc} -> drive(enactment_id, "think_tool", tc: tc)
{:answer, ans} -> drive(enactment_id, "think_answer", ans: ans)
end
end
def run_tool(enactment_id, tc) do
Process.sleep(900)
drive(enactment_id, "run_tool", tr: PiAgent.MockTool.run(tc))
end
def merge(enactment_id) do
Process.sleep(400)
drive(enactment_id, "merge_result", [])
end
def to_mermaid do
cpnet()
|> ColouredFlow.Definition.Presentation.to_mermaid()
|> Kino.Mermaid.new()
end
defp drive(enactment_id, name, free_binding) do
enactment_id
|> Storage.list_live_workitems()
|> Enum.find(&(&1.state == :enabled and &1.binding_element.transition == name))
|> case do
nil ->
:ok
%{id: wid} ->
{:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)
{:ok, _} = WorkitemTransition.complete_workitem(enactment_id, {wid, free_binding})
:ok
end
end
end
PiAgent.A.to_mermaid()
import ColouredFlow.Runner.Storage.InMemory, only: :macros
alias ColouredFlow.Runner.Storage.InMemory
flow_a = InMemory.insert_flow!(PiAgent.A.cpnet())
{:ok, e_a} = PiAgent.A.insert_enactment(flow(flow_a, :id))
enactment_a = enactment(e_a, :id)
dialog_a = Kino.Frame.new()
events_a = Kino.Frame.new()
{:ok, _} =
PiAgent.A.start_enactment(enactment_a,
lifecycle_hooks: {PiAgent.A, [dialog_frame: dialog_a, events_frame: events_a]}
)
Kino.Layout.grid([dialog_a, events_a], columns: 2, boxed: true)
B. Risky-tool human gate
Same loop — but the model now sometimes asks for a :bash call. We do
not trust a model with a shell unattended. The fix is one extra hop: the
:think_tool output flows into a union-typed :pending_call place,
tagged {:safe, op} or {:risky, op}. Two consumers compete for it:
-
:run_safematches{:safe, op}— driver auto-fires it. -
:run_riskymatches{:risky, op}— workitem stays in:enableduntil a human clicks an approve button. The runner is happy to wait days.
The crucial CPN trick is that pattern-matching on a token’s shape is
how guards work — no extra guard clause needed; bind({1, {:safe, op}})
itself is the discriminator.
defmodule PiAgent.B do
use ColouredFlow.DSL, task_supervisor: PiAgent.TaskSup
alias ColouredFlow.Runner.Enactment.WorkitemTransition
alias ColouredFlow.Runner.Storage
name "Pi Agent B — Permission Gate"
colset role() :: :user | :assistant | :tool
colset msg() :: {role(), binary()}
colset dialog() :: list(msg())
colset tool_name() :: :read | :grep | :ls | :write | :edit | :bash
colset tool_op() :: {tool_name(), binary()}
colset tool_call() :: {:safe, tool_op()} | {:risky, tool_op()}
colset tool_result() :: binary()
colset answer() :: binary()
colset signal() :: {}
var d :: dialog()
var tc :: tool_call()
var op :: tool_op()
var tr :: tool_result()
var ans :: answer()
var s :: signal()
place :dialog, :dialog
place :ready, :signal
place :pending_call, :tool_call
place :tool_result, :tool_result
place :answer, :answer
initial_marking :dialog,
ColouredFlow.MultiSet.new([[{:user, "Update lib/foo.ex via bash."}]])
initial_marking :ready, ~MS[{}]
transition :think_tool do
input :ready, bind({1, s})
input :dialog, bind({1, d})
output :dialog, {1, d ++ [{:assistant, "calling tool"}]}
output :pending_call, {1, tc}
action do
tc = Keyword.fetch!(event.occurrence.free_binding, :tc)
PiAgent.UI.render(options, event)
PiAgent.B.dispatch_call(event.enactment_id, tc)
end
end
transition :think_answer do
input :ready, bind({1, s})
input :dialog, bind({1, d})
# Echo the dialog back so the final conversation stays visible
# in the UI after :explicit termination fires on `:answer`.
output :dialog, {1, d}
output :answer, {1, ans}
end
transition :run_safe do
input :pending_call, bind({1, {:safe, op}})
output :tool_result, {1, tr}
action do
PiAgent.B.merge(event.enactment_id)
end
end
transition :run_risky do
input :pending_call, bind({1, {:risky, op}})
output :tool_result, {1, tr}
action do
PiAgent.B.merge(event.enactment_id)
end
end
transition :merge_result do
input :dialog, bind({1, d})
input :tool_result, bind({1, tr})
output :dialog, {1, d ++ [{:tool, tr}]}
output :ready, {1, {}}
action do
PiAgent.UI.render(options, event)
PiAgent.B.think(event.enactment_id, event.markings)
end
end
termination do
on_markings do
match?(%{"answer" => _}, markings)
end
end
on_enactment_start do
PiAgent.UI.render(options, event)
PiAgent.B.think(event.enactment_id, event.markings)
end
on_enactment_terminate do
PiAgent.UI.render(options, event)
end
## Driver
def think(enactment_id, markings) do
dialog = PiAgent.UI.read_dialog(markings)
Process.sleep(700)
case PiAgent.MockLLM.next_step(dialog) do
{:tool, op} ->
tagged = if PiAgent.MockTool.risky?(op), do: {:risky, op}, else: {:safe, op}
drive(enactment_id, "think_tool", tc: tagged)
{:answer, ans} ->
drive(enactment_id, "think_answer", ans: ans)
end
end
# After :think_tool fires, route based on the tag — auto-run safe,
# leave risky workitem dangling for human approval.
def dispatch_call(enactment_id, {:safe, op}) do
Process.sleep(900)
drive(enactment_id, "run_safe", tr: PiAgent.MockTool.run(op))
end
def dispatch_call(_enactment_id, {:risky, _op}), do: :ok
def approve(enactment_id) do
case find_risky(enactment_id) do
nil ->
:no_pending
%{id: wid, binding_element: %{binding: binding}} ->
op = Keyword.fetch!(binding, :op)
Process.sleep(100)
{:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)
{:ok, _} =
WorkitemTransition.complete_workitem(
enactment_id,
{wid, [tr: PiAgent.MockTool.run(op)]}
)
:ok
end
end
# Compensation pattern: fire :run_risky with a synthetic "denied" result
# so the agent's loop continues with the rejection in the dialog.
def deny(enactment_id) do
case find_risky(enactment_id) do
nil ->
:no_pending
%{id: wid} ->
{:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)
{:ok, _} =
WorkitemTransition.complete_workitem(
enactment_id,
{wid, [tr: "[denied by user]"]}
)
:denied
end
end
def merge(enactment_id) do
Process.sleep(400)
drive(enactment_id, "merge_result", [])
end
def to_mermaid do
cpnet()
|> ColouredFlow.Definition.Presentation.to_mermaid()
|> Kino.Mermaid.new()
end
defp find_risky(enactment_id) do
enactment_id
|> Storage.list_live_workitems()
|> Enum.find(&(&1.state == :enabled and &1.binding_element.transition == "run_risky"))
end
defp drive(enactment_id, name, free_binding) do
enactment_id
|> Storage.list_live_workitems()
|> Enum.find(&(&1.state == :enabled and &1.binding_element.transition == name))
|> case do
nil ->
:ok
%{id: wid} ->
{:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)
{:ok, _} = WorkitemTransition.complete_workitem(enactment_id, {wid, free_binding})
:ok
end
end
end
PiAgent.B.to_mermaid()
flow_b = InMemory.insert_flow!(PiAgent.B.cpnet())
{:ok, e_b} = PiAgent.B.insert_enactment(flow(flow_b, :id))
enactment_b = enactment(e_b, :id)
dialog_b = Kino.Frame.new()
events_b = Kino.Frame.new()
controls_b = Kino.Frame.new()
approve_btn = Kino.Control.button("Approve risky tool")
deny_btn = Kino.Control.button("Deny")
Kino.listen(approve_btn, fn _ -> PiAgent.B.approve(enactment_b) end)
Kino.listen(deny_btn, fn _ -> PiAgent.B.deny(enactment_b) end)
{:ok, _} =
PiAgent.B.start_enactment(enactment_b,
lifecycle_hooks:
{PiAgent.B,
[
dialog_frame: dialog_b,
events_frame: events_b,
controls_frame: controls_b,
approve_btn: approve_btn,
deny_btn: deny_btn
]}
)
left_b = Kino.Layout.grid([dialog_b, controls_b], columns: 1)
Kino.Layout.grid([left_b, events_b], columns: 2, boxed: true)
The agent will run :ls and :read automatically (both safe), then stop
on a bash call. The dialog renders ⏸ waiting for human approval until
you click Approve — the workitem fires, the result merges, the loop
resumes.
C. Multi-provider race + steering + plan-then-act
Three upgrades over Section B:
-
Provider race.
:propose_calland:think_answerare both eligible the moment(:ready, :dialog)align. Three independent driver tasks call Anthropic / OpenAI / Google mocks in parallel; the first to finish wins the workitem completion (CPN consumes the input tokens, the rest no-op when their drive call finds an empty workitem list). -
Steering queue. A new
:steering_queueplace plus an:append_steeringtransition lets the user push a follow-up message while the agent is mid-flight.:inject_steeringdrains the queue into:dialogbefore the next round. -
Plan-then-act gate with three branches. Section B let safe
tools auto-fire and only gated risky ones. Section C drops the
safe/risky tag and routes every tool call through a human review
step:
:propose_call → [:proposed_call] → one of three exits. Each exit appends a{:user, _}line to the dialog so the next planning round can quote it back viaMockLLM.proposal_text/2:-
:approve_proposal— call moves to:pending_call,:run_toolexecutes it; dialog gains{:user, "approve"}. -
:reject_proposal— drops the call, restores:ready; dialog gains{:user, "reject: "}. -
:manual_input— drops the call, restores:ready; dialog gains the human’s literal{:user, }(a free-form override that the next race round picks up viaproposal_text/2).
-
append_steering consumes a :steering_capacity token (a counting
semaphore — bounds how many follow-ups the user may push) and produces
a msg() token in the queue. This is the canonical CPN pattern for
externally-fed input: there is no “inject token” runner API; you
declare a transition whose only job is to accept a value via a free
binding. The same pattern (:approve_proposal / :reject_proposal,
each with a free binding) gates the plan-then-act review.
defmodule PiAgent.C do
use ColouredFlow.DSL, task_supervisor: PiAgent.TaskSup
alias ColouredFlow.Runner.Enactment.WorkitemTransition
alias ColouredFlow.Runner.Storage
name "Pi Agent C — Race & Steering & Plan-then-act"
colset role() :: :user | :assistant | :tool
colset msg() :: {role(), binary()}
colset dialog() :: list(msg())
colset tool_name() :: :read | :grep | :ls | :write | :edit | :bash
colset tool_call() :: {tool_name(), binary()}
colset tool_result() :: binary()
colset answer() :: binary()
colset note() :: binary()
colset proposal() :: binary()
colset signal() :: {}
var d :: dialog()
var tc :: tool_call()
var tr :: tool_result()
var ans :: answer()
var s :: signal()
var m :: msg()
var note :: note()
var proposal :: proposal()
place :dialog, :dialog
place :ready, :signal
place :proposed_call, :tool_call
place :pending_call, :tool_call
place :tool_result, :tool_result
place :answer, :answer
place :steering_queue, :msg
place :steering_capacity, :signal
initial_marking :dialog,
ColouredFlow.MultiSet.new([[{:user, "What's in lib/?"}]])
initial_marking :ready, ~MS[{}]
# 5 user-steering slots: each :append_steering consumes one.
initial_marking :steering_capacity, ~MS[5**{}]
# Pre-seed a steering message so the demo always shows steering even
# without typing into the live form.
initial_marking :steering_queue,
ColouredFlow.MultiSet.new([{:user, "Also count the files."}])
# Race producers fire :propose_call instead of running the tool. The
# call sits in :proposed_call until a human approves or rejects it.
# `proposal` is a free binding the driver fills with the assistant's
# planning message — it can quote the latest steering / rejection
# so the human sees the model react to their note.
transition :propose_call do
input :ready, bind({1, s})
input :dialog, bind({1, d})
output :dialog, {1, d ++ [{:assistant, proposal}]}
output :proposed_call, {1, tc}
action do
PiAgent.UI.render(options, event)
end
end
transition :think_answer do
input :ready, bind({1, s})
input :dialog, bind({1, d})
# Echo the dialog back so the final conversation stays visible
# in the UI after :explicit termination fires on `:answer`.
output :dialog, {1, d}
output :answer, {1, ans}
end
# Human-driven option 1: approve the plan. The proposed call moves to
# :pending_call and runs; an "approve" note lands in the dialog so the
# next round sees the human assented.
transition :approve_proposal do
input :proposed_call, bind({1, tc})
input :dialog, bind({1, d})
output :dialog, {1, d ++ [{:user, "approve"}]}
output :pending_call, {1, tc}
action do
PiAgent.UI.render(options, event)
PiAgent.C.run_tool(event.enactment_id, tc)
end
end
# Human-driven option 2: reject the plan. Drops :proposed_call, appends
# a `reject: ` line to the dialog, and restores :ready.
transition :reject_proposal do
input :proposed_call, bind({1, tc})
input :dialog, bind({1, d})
output :dialog, {1, d ++ [{:user, "reject: " <> note}]}
output :ready, {1, {}}
action do
PiAgent.UI.render(options, event)
PiAgent.C.think(event.enactment_id, event.markings)
end
end
# Human-driven option 3: override the plan with a manual instruction.
# The dropped proposal yields to a fresh `{:user, note}` line; the
# next race round picks the new instruction up via `proposal_text/2`.
transition :manual_input do
input :proposed_call, bind({1, tc})
input :dialog, bind({1, d})
output :dialog, {1, d ++ [{:user, note}]}
output :ready, {1, {}}
action do
PiAgent.UI.render(options, event)
PiAgent.C.think(event.enactment_id, event.markings)
end
end
transition :run_tool do
input :pending_call, bind({1, tc})
output :tool_result, {1, tr}
action do
PiAgent.C.merge(event.enactment_id)
end
end
transition :merge_result do
input :dialog, bind({1, d})
input :tool_result, bind({1, tr})
output :dialog, {1, d ++ [{:tool, tr}]}
output :ready, {1, {}}
action do
PiAgent.UI.render(options, event)
PiAgent.C.think(event.enactment_id, event.markings)
end
end
transition :append_steering do
input :steering_capacity, bind({1, s})
output :steering_queue, {1, m}
end
transition :inject_steering do
input :ready, bind({1, s})
input :dialog, bind({1, d})
input :steering_queue, bind({1, m})
output :dialog, {1, d ++ [m]}
output :ready, {1, s}
action do
PiAgent.UI.render(options, event)
PiAgent.C.think(event.enactment_id, event.markings)
end
end
termination do
on_markings do
match?(%{"answer" => _}, markings)
end
end
on_enactment_start do
PiAgent.UI.render(options, event)
PiAgent.C.think(event.enactment_id, event.markings)
end
on_enactment_terminate do
PiAgent.UI.render(options, event)
end
## Driver
def think(enactment_id, markings) do
# Steering takes precedence — drain the queue first.
if drain_steering(enactment_id) == :drained do
:ok
else
race_providers(enactment_id, markings)
end
end
def race_providers(enactment_id, markings) do
dialog = PiAgent.UI.read_dialog(markings)
Enum.each([:anthropic, :openai, :google], fn vendor ->
Task.Supervisor.start_child(PiAgent.TaskSup, fn ->
Process.sleep(PiAgent.MockLLM.latency_ms(vendor))
case PiAgent.MockLLM.next_step(dialog, vendor) do
{:tool, op} ->
text = PiAgent.MockLLM.proposal_text(dialog, op)
drive(enactment_id, "propose_call", tc: op, proposal: text)
{:answer, ans} ->
drive(enactment_id, "think_answer", ans: ans)
end
end)
end)
end
def run_tool(enactment_id, tc) do
Process.sleep(900)
drive(enactment_id, "run_tool", tr: PiAgent.MockTool.run(tc))
end
def approve(enactment_id) do
drive(enactment_id, "approve_proposal", [])
end
# Reject the pending plan. Default note kept short so dialog stays
# readable; the form-driven path passes a custom note.
def reject(enactment_id, note \\ "try a different tool")
when is_binary(note) do
drive(enactment_id, "reject_proposal", note: note)
end
# Override the pending plan with a manual instruction. Drops the
# proposal and lets the next planning round read the new note.
def override(enactment_id, note) when is_binary(note) and note != "" do
drive(enactment_id, "manual_input", note: note)
end
def steer(enactment_id, content) when is_binary(content) do
drive(enactment_id, "append_steering", m: {:user, content})
end
def merge(enactment_id) do
Process.sleep(400)
drive(enactment_id, "merge_result", [])
end
def to_mermaid do
cpnet()
|> ColouredFlow.Definition.Presentation.to_mermaid()
|> Kino.Mermaid.new()
end
defp drain_steering(enactment_id) do
case find_workitem(enactment_id, "inject_steering") do
nil ->
:empty
%{id: wid} ->
{:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)
{:ok, _} = WorkitemTransition.complete_workitem(enactment_id, {wid, []})
:drained
end
end
defp find_workitem(enactment_id, transition_name) do
enactment_id
|> Storage.list_live_workitems()
|> Enum.find(&(&1.state == :enabled and &1.binding_element.transition == transition_name))
end
defp drive(enactment_id, name, free_binding) do
case find_workitem(enactment_id, name) do
nil ->
:ok
%{id: wid} ->
{:ok, _} = WorkitemTransition.start_workitem(enactment_id, wid)
{:ok, _} = WorkitemTransition.complete_workitem(enactment_id, {wid, free_binding})
:ok
end
end
end
PiAgent.C.to_mermaid()
flow_c = InMemory.insert_flow!(PiAgent.C.cpnet())
{:ok, e_c} = PiAgent.C.insert_enactment(flow(flow_c, :id))
enactment_c = enactment(e_c, :id)
dialog_c = Kino.Frame.new()
events_c = Kino.Frame.new()
controls_c = Kino.Frame.new()
steer_input =
Kino.Control.form([msg: Kino.Input.text("Steering")],
submit: "Send steering",
reset_on_submit: [:msg]
)
manual_input =
Kino.Control.form([note: Kino.Input.text("Manual instruction")],
submit: "Override plan",
reset_on_submit: [:note]
)
approve_plan_btn = Kino.Control.button("Approve plan")
reject_plan_btn = Kino.Control.button("Reject plan")
Kino.listen(steer_input, fn %{data: %{msg: text}} ->
if text != "", do: PiAgent.C.steer(enactment_c, text)
end)
Kino.listen(manual_input, fn %{data: %{note: text}} ->
if text != "", do: PiAgent.C.override(enactment_c, text)
end)
Kino.listen(approve_plan_btn, fn _ -> PiAgent.C.approve(enactment_c) end)
Kino.listen(reject_plan_btn, fn _ -> PiAgent.C.reject(enactment_c) end)
{:ok, _} =
PiAgent.C.start_enactment(enactment_c,
lifecycle_hooks:
{PiAgent.C,
[
dialog_frame: dialog_c,
events_frame: events_c,
controls_frame: controls_c,
approve_plan_btn: approve_plan_btn,
reject_plan_btn: reject_plan_btn,
manual_form: manual_input,
steer_form: steer_input
]}
)
left_c = Kino.Layout.grid([dialog_c, controls_c], columns: 1)
Kino.Layout.grid([left_c, events_c], columns: 2, boxed: true)
How to test the section
Four interactive controls. Each one lands a {:user, _} message in
the dialog, and the next planning round picks the latest such message
up via MockLLM.proposal_text/2 and quotes it back in the proposal so
the human can verify the model heard them.
-
Approve plan (button). Fires
:approve_proposal. Dialog gains{:user, "approve"}; tool runs; agent re-plans for next round. -
Reject plan (button). Fires
:reject_proposalwith a fixed default note. Dialog gains{:user, "reject: try a different tool"}; the proposal is dropped; agent re-plans, and the nextpropose_callquotes the rejection back to you. -
Manual instruction (form). Type a custom note and click
Override plan. Fires
:manual_inputwith your text as thenotefree binding. Dialog gains{:user, }; proposal dropped; agent re-plans with your override as the new context. -
Steering (form). Type a follow-up and click Send steering.
Fires
:append_steering(queues the message). The next time:readyand:dialogalign — typically right after the current tool merges or the current proposal resolves —:inject_steeringdrains the queue into the dialog.
Pre-seeded steering: :steering_queue starts with one token
({:user, "Also count the files."}). The very first think call
drains it, so the first proposal already references it. You’ll see
this on every run with no input from you.
If you click any control after the agent terminates (✅ done),
nothing happens — drive/3 finds no enabled workitem and returns
silently. Re-evaluate the C run cell to start a fresh enactment.
What this buys you
The same module is also:
-
Crash-safe. Kill the runner mid-
bashand restart — the runner replaysOccurrences from the latest snapshot, the workitem state comes back exactly where it was, no duplicate tool calls. - Time-travelable. Every firing is a stored event; an audit UI gets the agent’s full reasoning trace for free.
-
Branchable. Insert a fresh enactment from a snapshot of an existing
one and you have a session fork — the same primitive Pi exposes via
/forkand/clone, but earned automatically by the event-source runner. -
Provider-agnostic. The CPN cares about token shapes, not vendors.
Add a new provider in Section C by adding one line to
race_providers/2; the structure of the net does not change.
The 200 lines of PiAgent.C give you a saga-style coding agent with
human-in-loop, race semantics, mid-flight steering, and durable replay —
properties that take thousands of lines to retrofit onto a hand-written
ReAct loop.