Choreo Workflow: Comprehensive Walkthrough
Section
Mix.install([
{:choreo, "~> 0.6"},
{:kino_vizjs, "~> 0.5.0"}
])
> Rendering diagrams: This livebook uses Kino.VizJS to render DOT diagrams inline. You can also copy DOT output into PlantText or run dot -Tpng diagram.dot -o diagram.png locally.
What is Choreo.Workflow?
Choreo.Workflow models business process orchestration and Saga transactions as directed graphs. Unlike drawing tools where boxes and arrows are just pictures, Choreo workflows are executable blueprints you can analyse for latency, parallelism, failure coverage, and structural correctness.
Common use cases:
- E-commerce order processing with rollback handlers
- CI/CD pipeline orchestration
- Multi-step approval flows
- Distributed Saga transactions
Node Types
| Type | Shape | Purpose |
|---|---|---|
start |
● circle | Entry point — triggers the workflow |
end |
◎ doublecircle | Terminal — successful completion |
task |
📦 box3d | Automated step with timeout and retry |
decision |
💎 diamond | Conditional branching (yes/no, approved/rejected) |
fork |
▽ invhouse | Splits execution into parallel branches |
join |
△ house | Waits for all parallel branches to complete |
compensation |
📝 note | Saga rollback handler (dashed, red) |
event |
☁️ cloud | External trigger, timer, or signal |
Edge Types
| Type | Style | When to use |
|---|---|---|
:sequence |
solid | Normal flow |
:compensation |
dashed red | Rollback path |
:retry |
dashed orange | Retry loop |
:failure |
dotted red | Error handler |
:timeout |
dotted orange | Timeout handler |
alias Choreo.Workflow
alias Choreo.Workflow.Analysis
legend =
Workflow.new()
|> Workflow.add_start(:start)
|> Workflow.add_end(:end)
|> Workflow.add_task(:task)
|> Workflow.add_decision(:decision)
|> Workflow.add_fork(:fork)
|> Workflow.add_join(:join)
|> Workflow.add_compensation(:compensation, for: :task)
|> Workflow.add_event(:event)
Kino.VizJS.render(Workflow.to_dot(legend))
Example 1: E-Commerce Saga (The Classic)
A customer places an order. We charge their card, reserve inventory, pack, and ship. If anything fails, we roll back previous steps.
order_saga =
Workflow.new()
|> Workflow.add_start(:order_received)
|> Workflow.add_task(:charge_card,
label: "Charge Card",
timeout_ms: 5000,
retry: 3,
retry_backoff_ms: 1000
)
|> Workflow.add_task(:reserve_inventory,
label: "Reserve Inventory",
timeout_ms: 3000
)
|> Workflow.add_decision(:sufficient_stock,
label: "Stock OK?"
)
|> Workflow.add_task(:pack_items,
label: "Pack Items",
timeout_ms: 10_000
)
|> Workflow.add_task(:ship_order,
label: "Ship Order",
timeout_ms: 5000
)
|> Workflow.add_compensation(:refund_payment,
label: "Refund Payment",
for: :charge_card
)
|> Workflow.add_compensation(:release_inventory,
label: "Release Inventory",
for: :reserve_inventory
)
|> Workflow.add_end(:done)
# Happy path
|> Workflow.connect(:order_received, :charge_card)
|> Workflow.connect(:charge_card, :reserve_inventory)
|> Workflow.connect(:reserve_inventory, :sufficient_stock)
|> Workflow.connect(:sufficient_stock, :pack_items, condition: "yes")
|> Workflow.connect(:pack_items, :ship_order)
|> Workflow.connect(:ship_order, :done)
# Compensation paths
|> Workflow.connect(:sufficient_stock, :refund_payment, condition: "no", edge_type: :compensation)
|> Workflow.connect(:sufficient_stock, :release_inventory, condition: "no", edge_type: :compensation)
|> Workflow.connect(:refund_payment, :done, edge_type: :compensation)
Kino.VizJS.render(Workflow.to_dot(order_saga))
Critical Path Analysis
The critical path is the longest-latency chain from start to end. It tells you the minimum time any order will take, assuming no failures.
{:ok, path, latency_ms} = Analysis.critical_path(order_saga)
IO.puts("Critical path: #{Enum.join(path, " → ")}")
IO.puts("Total latency: #{latency_ms}ms")
Parallelizable Tasks
Which tasks have no dependencies on each other and could run concurrently?
Analysis.parallelizable_tasks(order_saga)
|> Enum.with_index()
|> Enum.each(fn {tasks, level} ->
IO.puts("Level #{level}: #{Enum.join(tasks, ", ")}")
end)
In this linear saga, every level has exactly one task — nothing can be parallelised. Let’s fix that in the next example.
Missing Compensations
Tasks with retry configured should have compensation handlers. Let’s check:
Analysis.missing_compensations(order_saga)
|> IO.inspect(label: "Tasks with retry but no compensation")
charge_card has retry: 3 but no direct compensation edge from it — the refund is triggered from the decision node instead. Depending on your design, this may or may not be acceptable. validate/1 will flag it if you prefer every retried task to have a direct compensation route.
Example 2: Parallelised Order Processing
Real systems do independent work in parallel. Charging the card and checking fraud can happen simultaneously. So can packing and sending the confirmation email.
parallel_order =
Workflow.new()
|> Workflow.add_start(:order_received)
|> Workflow.add_fork(:fork_checks, label: "Parallel Checks")
|> Workflow.add_task(:charge_card,
label: "Charge Card\n5s",
timeout_ms: 5000
)
|> Workflow.add_task(:fraud_check,
label: "Fraud Check\n2s",
timeout_ms: 2000
)
|> Workflow.add_join(:join_checks, label: "Checks Complete")
|> Workflow.add_decision(:checks_passed, label: "Passed?")
|> Workflow.add_fork(:fork_fulfillment, label: "Fulfillment")
|> Workflow.add_task(:pack_items,
label: "Pack Items\n8s",
timeout_ms: 8000
)
|> Workflow.add_task(:send_confirmation,
label: "Send Email\n1s",
timeout_ms: 1000
)
|> Workflow.add_join(:join_fulfillment, label: "Fulfillment Done")
|> Workflow.add_task(:ship_order,
label: "Ship\n3s",
timeout_ms: 3000
)
|> Workflow.add_end(:done)
|> Workflow.add_compensation(:refund_payment, label: "Refund", for: :charge_card)
|> Workflow.add_compensation(:cancel_fraud_flag, label: "Clear Flag", for: :fraud_check)
# Start → parallel checks
|> Workflow.connect(:order_received, :fork_checks)
|> Workflow.connect(:fork_checks, :charge_card)
|> Workflow.connect(:fork_checks, :fraud_check)
|> Workflow.connect(:charge_card, :join_checks)
|> Workflow.connect(:fraud_check, :join_checks)
|> Workflow.connect(:join_checks, :checks_passed)
# Decision branches
|> Workflow.connect(:checks_passed, :fork_fulfillment, condition: "yes")
|> Workflow.connect(:checks_passed, :refund_payment, condition: "no", edge_type: :compensation)
|> Workflow.connect(:checks_passed, :cancel_fraud_flag, condition: "no", edge_type: :compensation)
# Parallel fulfillment
|> Workflow.connect(:fork_fulfillment, :pack_items)
|> Workflow.connect(:fork_fulfillment, :send_confirmation)
|> Workflow.connect(:pack_items, :join_fulfillment)
|> Workflow.connect(:send_confirmation, :join_fulfillment)
|> Workflow.connect(:join_fulfillment, :ship_order)
|> Workflow.connect(:ship_order, :done)
Kino.VizJS.render(Workflow.to_dot(parallel_order))
Critical Path (Parallel)
{:ok, path, latency_ms} = Analysis.critical_path(parallel_order)
IO.puts("Critical path: #{Enum.join(path, " → ")}")
IO.puts("Total latency: #{latency_ms}ms")
Notice the total is less than the sum of all timeouts because fork/join pairs only count the slowest branch.
Parallel Levels
Analysis.parallelizable_tasks(parallel_order)
|> Enum.with_index()
|> Enum.each(fn {tasks, level} ->
IO.puts("Level #{level}: #{Enum.join(tasks, ", ")}")
end)
Now we see charge_card and fraud_check at the same level — they can run simultaneously. Same for pack_items and send_confirmation.
Example 3: CI/CD Pipeline with Swimlanes
Swimlanes group nodes by team or service. Here we model a deployment pipeline with Dev, QA, and Platform teams.
cicd =
Workflow.new()
# Dev swimlane
|> Workflow.add_swimlane(:dev, label: "Dev Team", fillcolor: "#dbeafe")
|> Workflow.add_event(:git_push, label: "Git Push", swimlane: :dev)
|> Workflow.add_task(:run_tests,
label: "Unit Tests\n3m",
timeout_ms: 180_000,
swimlane: :dev
)
|> Workflow.add_task(:build_image,
label: "Build Docker Image\n5m",
timeout_ms: 300_000,
swimlane: :dev
)
# QA swimlane
|> Workflow.add_swimlane(:qa, label: "QA Team", fillcolor: "#dcfce7")
|> Workflow.add_task(:deploy_staging,
label: "Deploy to Staging\n2m",
timeout_ms: 120_000,
swimlane: :qa
)
|> Workflow.add_task(:run_integration_tests,
label: "Integration Tests\n10m",
timeout_ms: 600_000,
retry: 1,
swimlane: :qa
)
|> Workflow.add_decision(:tests_passed, label: "Tests Pass?", swimlane: :qa)
# Platform swimlane
|> Workflow.add_swimlane(:platform, label: "Platform Team", fillcolor: "#fef3c7")
|> Workflow.add_task(:deploy_production,
label: "Deploy to Prod\n3m",
timeout_ms: 180_000,
swimlane: :platform
)
|> Workflow.add_task(:run_smoke_tests,
label: "Smoke Tests\n2m",
timeout_ms: 120_000,
swimlane: :platform
)
|> Workflow.add_end(:live)
# Rollback
|> Workflow.add_compensation(:rollback_production,
label: "Rollback Prod",
for: :deploy_production,
swimlane: :platform
)
# Flow
|> Workflow.connect(:git_push, :run_tests)
|> Workflow.connect(:run_tests, :build_image)
|> Workflow.connect(:build_image, :deploy_staging)
|> Workflow.connect(:deploy_staging, :run_integration_tests)
|> Workflow.connect(:run_integration_tests, :tests_passed)
|> Workflow.connect(:tests_passed, :deploy_production, condition: "yes")
|> Workflow.connect(:tests_passed, :rollback_production, condition: "no", edge_type: :compensation)
|> Workflow.connect(:deploy_production, :run_smoke_tests)
|> Workflow.connect(:run_smoke_tests, :live)
Kino.VizJS.render(Workflow.to_dot(cicd, theme: :dark))
Bottleneck Detection
Analysis.bottlenecks(cicd, latency_threshold: 300_000)
|> IO.inspect(label: "High-latency bottlenecks (> 5m)")
Analysis.bottlenecks(cicd, retry_threshold: 1)
|> IO.inspect(label: "High-retry bottlenecks")
Simulation
Analysis.simulate(cicd)
|> Enum.sort_by(fn {_id, vals} -> vals.cumulative_latency end, :desc)
|> Enum.take(5)
|> Enum.each(fn {id, vals} ->
total = div(vals.cumulative_latency, 1000)
IO.puts("#{id}: #{total}s total (task: #{div(vals.task_latency, 1000)}s, retry: #{div(vals.retry_latency, 1000)}s)")
end)
Example 4: Approval Workflow with Retry and Timeout
Human-in-the-loop processes need timeout handling. If a manager doesn’t approve within 24 hours, escalate.
approval =
Workflow.new()
|> Workflow.add_start(:request_submitted)
|> Workflow.add_task(:validate_request,
label: "Auto-Validate\n500ms",
timeout_ms: 500
)
|> Workflow.add_decision(:valid, label: "Valid?")
|> Workflow.add_task(:manager_review,
label: "Manager Review\n24h",
timeout_ms: 86_400_000
)
|> Workflow.add_decision(:approved, label: "Approved?")
|> Workflow.add_task(:process_request,
label: "Process\n2s",
timeout_ms: 2000
)
|> Workflow.add_end(:completed)
|> Workflow.add_task(:auto_escalate,
label: "Escalate to VP\n48h",
timeout_ms: 172_800_000
)
|> Workflow.add_task(:notify_rejection,
label: "Notify Rejection\n1s",
timeout_ms: 1000
)
|> Workflow.add_end(:rejected)
|> Workflow.add_end(:escalated)
# Happy / sad paths
|> Workflow.connect(:request_submitted, :validate_request)
|> Workflow.connect(:validate_request, :valid)
|> Workflow.connect(:valid, :manager_review, condition: "yes")
|> Workflow.connect(:valid, :notify_rejection, condition: "no")
|> Workflow.connect(:notify_rejection, :rejected)
|> Workflow.connect(:manager_review, :approved)
|> Workflow.connect(:approved, :process_request, condition: "yes")
|> Workflow.connect(:approved, :notify_rejection, condition: "no")
|> Workflow.connect(:process_request, :completed)
# Timeout path
|> Workflow.connect(:manager_review, :auto_escalate, edge_type: :timeout)
|> Workflow.connect(:auto_escalate, :escalated)
Kino.VizJS.render(Workflow.to_dot(approval))
Failure Scenarios
Which tasks have explicit compensation or failure paths?
Analysis.failure_scenarios(approval)
|> IO.inspect(label: "Tasks with failure/compensation edges")
Uncompensated Paths
Which error paths don’t lead to a terminal node via compensations?
Analysis.uncompensated_paths(approval)
|> IO.inspect(label: "Uncompensated error paths")
Example 5: Fixing a Broken Workflow
Start with a workflow that has structural problems and use validate/1 as a checklist.
broken =
Workflow.new()
|> Workflow.add_task(:step_a, label: "Step A")
|> Workflow.add_task(:step_b, label: "Step B")
|> Workflow.add_task(:orphan_step, label: "Orphan")
|> Workflow.add_task(:dead_end, label: "Dead End")
|> Workflow.add_end(:finish)
|> Workflow.add_compensation(:rollback, label: "Rollback", for: :step_a)
|> Workflow.connect(:step_a, :step_b)
|> Workflow.connect(:step_b, :finish)
# orphan_step has no inputs
# dead_end has no outputs
# rollback has no incoming edge
Kino.VizJS.render(Workflow.to_dot(broken))
Diagnosis
Analysis.validate(broken)
|> Enum.each(fn {sev, msg} ->
icon = if sev == :error, do: "❌", else: "⚠️"
IO.puts("#{icon} #{msg}")
end)
The validator catches:
- No start node
-
No end node connected from
dead_end - Orphan tasks
- Dead-end tasks
- Unreachable compensation node
The Fix
fixed =
Workflow.new()
|> Workflow.add_start(:start)
|> Workflow.add_task(:step_a, label: "Step A")
|> Workflow.add_task(:step_b, label: "Step B")
|> Workflow.add_end(:finish)
|> Workflow.add_compensation(:rollback, label: "Rollback", for: :step_a)
|> Workflow.connect(:start, :step_a)
|> Workflow.connect(:step_a, :step_b)
|> Workflow.connect(:step_b, :finish)
|> Workflow.connect(:step_a, :rollback, edge_type: :compensation)
IO.puts("After fix:")
Analysis.validate(fixed) |> IO.inspect()
Kino.VizJS.render(Workflow.to_dot(fixed))
Advanced: Custom Theming
brand_theme =
Choreo.Theme.custom(
colors: %{
start: "#10b981",
end: "#ef4444",
task: "#3b82f6",
decision: "#8b5cf6",
compensation: "#f87171"
},
node_fontcolor: "white",
edge_color: "#94a3b8",
graph_bgcolor: "#0f172a"
)
Kino.VizJS.render(Workflow.to_dot(parallel_order, theme: brand_theme))
Summary
| Question | Function |
|---|---|
| “How slow is the slowest path?” |
Analysis.critical_path/1 |
| “What can run in parallel?” |
Analysis.parallelizable_tasks/1 |
| “Which tasks lack rollback?” |
Analysis.missing_compensations/1 |
| “Which error paths are dead ends?” |
Analysis.uncompensated_paths/1 |
| “Where are the bottlenecks?” |
Analysis.bottlenecks/2 |
| “What’s the estimated timeline?” |
Analysis.simulate/1 |
| “Is the workflow structurally sound?” |
Analysis.validate/1 |
| “Group by team” |
Workflow.add_swimlane/3 + :swimlane option |
| “Render to DOT” |
Workflow.to_dot/2 |
Workflow diagrams as code mean your business processes are version-controlled, reviewable, and auditable. Every time you add a task, change a timeout, or introduce a new decision branch, you can immediately see the impact on critical path latency, parallelisation opportunities, and failure coverage — before you ship to production.