Powered by AppSignal & Oban Pro

Choreo Workflow: Comprehensive Walkthrough

livebooks/workflow_walkthrough.livemd

Choreo Workflow: Comprehensive Walkthrough

Section

Mix.install([
  {:choreo, "~> 0.6"},
  {:kino_vizjs, "~> 0.5.0"}
])

> Rendering diagrams: This livebook uses Kino.VizJS to render DOT diagrams inline. You can also copy DOT output into PlantText or run dot -Tpng diagram.dot -o diagram.png locally.


What is Choreo.Workflow?

Choreo.Workflow models business process orchestration and Saga transactions as directed graphs. Unlike drawing tools where boxes and arrows are just pictures, Choreo workflows are executable blueprints you can analyse for latency, parallelism, failure coverage, and structural correctness.

Common use cases:

  • E-commerce order processing with rollback handlers
  • CI/CD pipeline orchestration
  • Multi-step approval flows
  • Distributed Saga transactions

Node Types

Type Shape Purpose
start ● circle Entry point — triggers the workflow
end ◎ doublecircle Terminal — successful completion
task 📦 box3d Automated step with timeout and retry
decision 💎 diamond Conditional branching (yes/no, approved/rejected)
fork ▽ invhouse Splits execution into parallel branches
join △ house Waits for all parallel branches to complete
compensation 📝 note Saga rollback handler (dashed, red)
event ☁️ cloud External trigger, timer, or signal

Edge Types

Type Style When to use
:sequence solid Normal flow
:compensation dashed red Rollback path
:retry dashed orange Retry loop
:failure dotted red Error handler
:timeout dotted orange Timeout handler
alias Choreo.Workflow
alias Choreo.Workflow.Analysis

legend =
  Workflow.new()
  |> Workflow.add_start(:start)
  |> Workflow.add_end(:end)
  |> Workflow.add_task(:task)
  |> Workflow.add_decision(:decision)
  |> Workflow.add_fork(:fork)
  |> Workflow.add_join(:join)
  |> Workflow.add_compensation(:compensation, for: :task)
  |> Workflow.add_event(:event)

Kino.VizJS.render(Workflow.to_dot(legend))

Example 1: E-Commerce Saga (The Classic)

A customer places an order. We charge their card, reserve inventory, pack, and ship. If anything fails, we roll back previous steps.

order_saga =
  Workflow.new()
  |> Workflow.add_start(:order_received)
  |> Workflow.add_task(:charge_card,
    label: "Charge Card",
    timeout_ms: 5000,
    retry: 3,
    retry_backoff_ms: 1000
  )
  |> Workflow.add_task(:reserve_inventory,
    label: "Reserve Inventory",
    timeout_ms: 3000
  )
  |> Workflow.add_decision(:sufficient_stock,
    label: "Stock OK?"
  )
  |> Workflow.add_task(:pack_items,
    label: "Pack Items",
    timeout_ms: 10_000
  )
  |> Workflow.add_task(:ship_order,
    label: "Ship Order",
    timeout_ms: 5000
  )
  |> Workflow.add_compensation(:refund_payment,
    label: "Refund Payment",
    for: :charge_card
  )
  |> Workflow.add_compensation(:release_inventory,
    label: "Release Inventory",
    for: :reserve_inventory
  )
  |> Workflow.add_end(:done)
  # Happy path
  |> Workflow.connect(:order_received, :charge_card)
  |> Workflow.connect(:charge_card, :reserve_inventory)
  |> Workflow.connect(:reserve_inventory, :sufficient_stock)
  |> Workflow.connect(:sufficient_stock, :pack_items, condition: "yes")
  |> Workflow.connect(:pack_items, :ship_order)
  |> Workflow.connect(:ship_order, :done)
  # Compensation paths
  |> Workflow.connect(:sufficient_stock, :refund_payment, condition: "no", edge_type: :compensation)
  |> Workflow.connect(:sufficient_stock, :release_inventory, condition: "no", edge_type: :compensation)
  |> Workflow.connect(:refund_payment, :done, edge_type: :compensation)

Kino.VizJS.render(Workflow.to_dot(order_saga))

Critical Path Analysis

The critical path is the longest-latency chain from start to end. It tells you the minimum time any order will take, assuming no failures.

{:ok, path, latency_ms} = Analysis.critical_path(order_saga)

IO.puts("Critical path: #{Enum.join(path, " → ")}")
IO.puts("Total latency: #{latency_ms}ms")

Parallelizable Tasks

Which tasks have no dependencies on each other and could run concurrently?

Analysis.parallelizable_tasks(order_saga)
|> Enum.with_index()
|> Enum.each(fn {tasks, level} ->
  IO.puts("Level #{level}: #{Enum.join(tasks, ", ")}")
end)

In this linear saga, every level has exactly one task — nothing can be parallelised. Let’s fix that in the next example.

Missing Compensations

Tasks with retry configured should have compensation handlers. Let’s check:

Analysis.missing_compensations(order_saga)
|> IO.inspect(label: "Tasks with retry but no compensation")

charge_card has retry: 3 but no direct compensation edge from it — the refund is triggered from the decision node instead. Depending on your design, this may or may not be acceptable. validate/1 will flag it if you prefer every retried task to have a direct compensation route.


Example 2: Parallelised Order Processing

Real systems do independent work in parallel. Charging the card and checking fraud can happen simultaneously. So can packing and sending the confirmation email.

parallel_order =
  Workflow.new()
  |> Workflow.add_start(:order_received)
  |> Workflow.add_fork(:fork_checks, label: "Parallel Checks")
  |> Workflow.add_task(:charge_card,
    label: "Charge Card\n5s",
    timeout_ms: 5000
  )
  |> Workflow.add_task(:fraud_check,
    label: "Fraud Check\n2s",
    timeout_ms: 2000
  )
  |> Workflow.add_join(:join_checks, label: "Checks Complete")
  |> Workflow.add_decision(:checks_passed, label: "Passed?")
  |> Workflow.add_fork(:fork_fulfillment, label: "Fulfillment")
  |> Workflow.add_task(:pack_items,
    label: "Pack Items\n8s",
    timeout_ms: 8000
  )
  |> Workflow.add_task(:send_confirmation,
    label: "Send Email\n1s",
    timeout_ms: 1000
  )
  |> Workflow.add_join(:join_fulfillment, label: "Fulfillment Done")
  |> Workflow.add_task(:ship_order,
    label: "Ship\n3s",
    timeout_ms: 3000
  )
  |> Workflow.add_end(:done)
  |> Workflow.add_compensation(:refund_payment, label: "Refund", for: :charge_card)
  |> Workflow.add_compensation(:cancel_fraud_flag, label: "Clear Flag", for: :fraud_check)
  # Start → parallel checks
  |> Workflow.connect(:order_received, :fork_checks)
  |> Workflow.connect(:fork_checks, :charge_card)
  |> Workflow.connect(:fork_checks, :fraud_check)
  |> Workflow.connect(:charge_card, :join_checks)
  |> Workflow.connect(:fraud_check, :join_checks)
  |> Workflow.connect(:join_checks, :checks_passed)
  # Decision branches
  |> Workflow.connect(:checks_passed, :fork_fulfillment, condition: "yes")
  |> Workflow.connect(:checks_passed, :refund_payment, condition: "no", edge_type: :compensation)
  |> Workflow.connect(:checks_passed, :cancel_fraud_flag, condition: "no", edge_type: :compensation)
  # Parallel fulfillment
  |> Workflow.connect(:fork_fulfillment, :pack_items)
  |> Workflow.connect(:fork_fulfillment, :send_confirmation)
  |> Workflow.connect(:pack_items, :join_fulfillment)
  |> Workflow.connect(:send_confirmation, :join_fulfillment)
  |> Workflow.connect(:join_fulfillment, :ship_order)
  |> Workflow.connect(:ship_order, :done)

Kino.VizJS.render(Workflow.to_dot(parallel_order))

Critical Path (Parallel)

{:ok, path, latency_ms} = Analysis.critical_path(parallel_order)
IO.puts("Critical path: #{Enum.join(path, " → ")}")
IO.puts("Total latency: #{latency_ms}ms")

Notice the total is less than the sum of all timeouts because fork/join pairs only count the slowest branch.

Parallel Levels

Analysis.parallelizable_tasks(parallel_order)
|> Enum.with_index()
|> Enum.each(fn {tasks, level} ->
  IO.puts("Level #{level}: #{Enum.join(tasks, ", ")}")
end)

Now we see charge_card and fraud_check at the same level — they can run simultaneously. Same for pack_items and send_confirmation.


Example 3: CI/CD Pipeline with Swimlanes

Swimlanes group nodes by team or service. Here we model a deployment pipeline with Dev, QA, and Platform teams.

cicd =
  Workflow.new()
  # Dev swimlane
  |> Workflow.add_swimlane(:dev, label: "Dev Team", fillcolor: "#dbeafe")
  |> Workflow.add_event(:git_push, label: "Git Push", swimlane: :dev)
  |> Workflow.add_task(:run_tests,
    label: "Unit Tests\n3m",
    timeout_ms: 180_000,
    swimlane: :dev
  )
  |> Workflow.add_task(:build_image,
    label: "Build Docker Image\n5m",
    timeout_ms: 300_000,
    swimlane: :dev
  )
  # QA swimlane
  |> Workflow.add_swimlane(:qa, label: "QA Team", fillcolor: "#dcfce7")
  |> Workflow.add_task(:deploy_staging,
    label: "Deploy to Staging\n2m",
    timeout_ms: 120_000,
    swimlane: :qa
  )
  |> Workflow.add_task(:run_integration_tests,
    label: "Integration Tests\n10m",
    timeout_ms: 600_000,
    retry: 1,
    swimlane: :qa
  )
  |> Workflow.add_decision(:tests_passed, label: "Tests Pass?", swimlane: :qa)
  # Platform swimlane
  |> Workflow.add_swimlane(:platform, label: "Platform Team", fillcolor: "#fef3c7")
  |> Workflow.add_task(:deploy_production,
    label: "Deploy to Prod\n3m",
    timeout_ms: 180_000,
    swimlane: :platform
  )
  |> Workflow.add_task(:run_smoke_tests,
    label: "Smoke Tests\n2m",
    timeout_ms: 120_000,
    swimlane: :platform
  )
  |> Workflow.add_end(:live)
  # Rollback
  |> Workflow.add_compensation(:rollback_production,
    label: "Rollback Prod",
    for: :deploy_production,
    swimlane: :platform
  )
  # Flow
  |> Workflow.connect(:git_push, :run_tests)
  |> Workflow.connect(:run_tests, :build_image)
  |> Workflow.connect(:build_image, :deploy_staging)
  |> Workflow.connect(:deploy_staging, :run_integration_tests)
  |> Workflow.connect(:run_integration_tests, :tests_passed)
  |> Workflow.connect(:tests_passed, :deploy_production, condition: "yes")
  |> Workflow.connect(:tests_passed, :rollback_production, condition: "no", edge_type: :compensation)
  |> Workflow.connect(:deploy_production, :run_smoke_tests)
  |> Workflow.connect(:run_smoke_tests, :live)

Kino.VizJS.render(Workflow.to_dot(cicd, theme: :dark))

Bottleneck Detection

Analysis.bottlenecks(cicd, latency_threshold: 300_000)
|> IO.inspect(label: "High-latency bottlenecks (> 5m)")

Analysis.bottlenecks(cicd, retry_threshold: 1)
|> IO.inspect(label: "High-retry bottlenecks")

Simulation

Analysis.simulate(cicd)
|> Enum.sort_by(fn {_id, vals} -> vals.cumulative_latency end, :desc)
|> Enum.take(5)
|> Enum.each(fn {id, vals} ->
  total = div(vals.cumulative_latency, 1000)
  IO.puts("#{id}: #{total}s total (task: #{div(vals.task_latency, 1000)}s, retry: #{div(vals.retry_latency, 1000)}s)")
end)

Example 4: Approval Workflow with Retry and Timeout

Human-in-the-loop processes need timeout handling. If a manager doesn’t approve within 24 hours, escalate.

approval =
  Workflow.new()
  |> Workflow.add_start(:request_submitted)
  |> Workflow.add_task(:validate_request,
    label: "Auto-Validate\n500ms",
    timeout_ms: 500
  )
  |> Workflow.add_decision(:valid, label: "Valid?")
  |> Workflow.add_task(:manager_review,
    label: "Manager Review\n24h",
    timeout_ms: 86_400_000
  )
  |> Workflow.add_decision(:approved, label: "Approved?")
  |> Workflow.add_task(:process_request,
    label: "Process\n2s",
    timeout_ms: 2000
  )
  |> Workflow.add_end(:completed)
  |> Workflow.add_task(:auto_escalate,
    label: "Escalate to VP\n48h",
    timeout_ms: 172_800_000
  )
  |> Workflow.add_task(:notify_rejection,
    label: "Notify Rejection\n1s",
    timeout_ms: 1000
  )
  |> Workflow.add_end(:rejected)
  |> Workflow.add_end(:escalated)
  # Happy / sad paths
  |> Workflow.connect(:request_submitted, :validate_request)
  |> Workflow.connect(:validate_request, :valid)
  |> Workflow.connect(:valid, :manager_review, condition: "yes")
  |> Workflow.connect(:valid, :notify_rejection, condition: "no")
  |> Workflow.connect(:notify_rejection, :rejected)
  |> Workflow.connect(:manager_review, :approved)
  |> Workflow.connect(:approved, :process_request, condition: "yes")
  |> Workflow.connect(:approved, :notify_rejection, condition: "no")
  |> Workflow.connect(:process_request, :completed)
  # Timeout path
  |> Workflow.connect(:manager_review, :auto_escalate, edge_type: :timeout)
  |> Workflow.connect(:auto_escalate, :escalated)

Kino.VizJS.render(Workflow.to_dot(approval))

Failure Scenarios

Which tasks have explicit compensation or failure paths?

Analysis.failure_scenarios(approval)
|> IO.inspect(label: "Tasks with failure/compensation edges")

Uncompensated Paths

Which error paths don’t lead to a terminal node via compensations?

Analysis.uncompensated_paths(approval)
|> IO.inspect(label: "Uncompensated error paths")

Example 5: Fixing a Broken Workflow

Start with a workflow that has structural problems and use validate/1 as a checklist.

broken =
  Workflow.new()
  |> Workflow.add_task(:step_a, label: "Step A")
  |> Workflow.add_task(:step_b, label: "Step B")
  |> Workflow.add_task(:orphan_step, label: "Orphan")
  |> Workflow.add_task(:dead_end, label: "Dead End")
  |> Workflow.add_end(:finish)
  |> Workflow.add_compensation(:rollback, label: "Rollback", for: :step_a)
  |> Workflow.connect(:step_a, :step_b)
  |> Workflow.connect(:step_b, :finish)
  # orphan_step has no inputs
  # dead_end has no outputs
  # rollback has no incoming edge

Kino.VizJS.render(Workflow.to_dot(broken))

Diagnosis

Analysis.validate(broken)
|> Enum.each(fn {sev, msg} ->
  icon = if sev == :error, do: "❌", else: "⚠️"
  IO.puts("#{icon} #{msg}")
end)

The validator catches:

  • No start node
  • No end node connected from dead_end
  • Orphan tasks
  • Dead-end tasks
  • Unreachable compensation node

The Fix

fixed =
  Workflow.new()
  |> Workflow.add_start(:start)
  |> Workflow.add_task(:step_a, label: "Step A")
  |> Workflow.add_task(:step_b, label: "Step B")
  |> Workflow.add_end(:finish)
  |> Workflow.add_compensation(:rollback, label: "Rollback", for: :step_a)
  |> Workflow.connect(:start, :step_a)
  |> Workflow.connect(:step_a, :step_b)
  |> Workflow.connect(:step_b, :finish)
  |> Workflow.connect(:step_a, :rollback, edge_type: :compensation)

IO.puts("After fix:")
Analysis.validate(fixed) |> IO.inspect()
Kino.VizJS.render(Workflow.to_dot(fixed))

Advanced: Custom Theming

brand_theme =
  Choreo.Theme.custom(
    colors: %{
      start: "#10b981",
      end: "#ef4444",
      task: "#3b82f6",
      decision: "#8b5cf6",
      compensation: "#f87171"
    },
    node_fontcolor: "white",
    edge_color: "#94a3b8",
    graph_bgcolor: "#0f172a"
  )

Kino.VizJS.render(Workflow.to_dot(parallel_order, theme: brand_theme))

Summary

Question Function
“How slow is the slowest path?” Analysis.critical_path/1
“What can run in parallel?” Analysis.parallelizable_tasks/1
“Which tasks lack rollback?” Analysis.missing_compensations/1
“Which error paths are dead ends?” Analysis.uncompensated_paths/1
“Where are the bottlenecks?” Analysis.bottlenecks/2
“What’s the estimated timeline?” Analysis.simulate/1
“Is the workflow structurally sound?” Analysis.validate/1
“Group by team” Workflow.add_swimlane/3 + :swimlane option
“Render to DOT” Workflow.to_dot/2

Workflow diagrams as code mean your business processes are version-controlled, reviewable, and auditable. Every time you add a task, change a timeout, or introduce a new decision branch, you can immediately see the impact on critical path latency, parallelisation opportunities, and failure coverage — before you ship to production.