Powered by AppSignal & Oban Pro

Composition Patterns

livebooks/10_composition_patterns.livemd

Composition Patterns

Mix.install(
  [
    {:jido_composer, ">= 0.0.0"},
    {:kino, "~> 0.14"}
  ],
  config: [
    jido_action: [default_timeout: :timer.minutes(5)]
  ]
)

Introduction

Jido Composer builds every composition from five fundamental constructors:

Constructor What It Does Covered In
Sequence Do A, then B Livebook 01 (ETL)
Choice Do A or B based on outcome Livebook 02 (Branching)
Parallel Do A and B simultaneously Livebook 02 (FanOut)
Traverse Apply A to each item in a list Livebook 09 (MapNode)
Identity Pass through unchanged (trivial — no-op node)

Plus one escape hatch: Bind — the Orchestrator’s ability to compute which workflow to run next via LLM decisions (Livebook 04).

This guide shows how these constructors compose together, the distinction between Workflow (compile-time graph) and Orchestrator (runtime graph), and what data-driven behavior looks like within a static graph.

Setup: Define Actions

defmodule Demo.ExtractAction do
  use Jido.Action,
    name: "extract",
    description: "Extracts items from a source",
    schema: [source: [type: :string, required: true]]

  def run(%{source: source}, _ctx) do
    items = [
      %{id: 1, name: "Widget A", price: 10, source: source},
      %{id: 2, name: "Widget B", price: -5, source: source},
      %{id: 3, name: "Widget C", price: 25, source: source}
    ]

    {:ok, %{items: items, count: length(items)}}
  end
end

defmodule Demo.ValidateAction do
  use Jido.Action,
    name: "validate",
    description: "Validates extracted items, returns custom outcome if any are invalid",
    schema: []

  def run(params, _ctx) do
    items = get_in(params, [:extract, :items]) || []
    {valid, invalid} = Enum.split_with(items, fn item -> item.price > 0 end)

    if invalid == [] do
      {:ok, %{items: valid, all_valid: true}}
    else
      {:ok, %{valid_items: valid, invalid_items: invalid, all_valid: false}, :has_invalid}
    end
  end
end

defmodule Demo.EnrichAction do
  use Jido.Action,
    name: "enrich",
    description: "Enriches an item with computed fields",
    schema: [
      id: [type: :integer, required: true],
      name: [type: :string, required: true],
      price: [type: :integer, required: true]
    ]

  def run(%{id: id, name: name, price: price}, _ctx) do
    {:ok, %{id: id, name: name, price: price, tax: Float.round(price * 0.1, 2), enriched: true}}
  end
end

defmodule Demo.AggregateAction do
  use Jido.Action,
    name: "aggregate",
    description: "Aggregates enriched results into a summary",
    schema: []

  def run(params, _ctx) do
    results = get_in(params, [:enrich, :results]) || []
    total = Enum.reduce(results, 0, fn r, acc -> acc + (r[:price] || 0) end)
    {:ok, %{total_price: total, item_count: length(results)}}
  end
end

defmodule Demo.QuarantineAction do
  use Jido.Action,
    name: "quarantine",
    description: "Quarantines invalid items for review",
    schema: []

  def run(params, _ctx) do
    invalid = get_in(params, [:validate, :invalid_items]) || []
    {:ok, %{quarantined: length(invalid), items: invalid}}
  end
end

defmodule Demo.DoubleValueAction do
  use Jido.Action,
    name: "double_value",
    description: "Doubles a numeric value",
    schema: [
      value: [type: :integer, required: true, doc: "Value to double"]
    ]

  def run(%{value: v}, _ctx), do: {:ok, %{value: v * 2}}
end

defmodule Demo.NoopAction do
  use Jido.Action,
    name: "noop",
    description: "Identity — passes through unchanged",
    schema: []

  def run(_params, _ctx), do: {:ok, %{}}
end

# Suppress doctests from Jido.Agent
defmodule Demo.Helpers do
  defmacro suppress_agent_doctests do
    quote do
      @doc false
      def plugins, do: super()
      @doc false
      def capabilities, do: super()
      @doc false
      def signal_types, do: super()
    end
  end
end

IO.puts("Actions defined.")

Part 1: Composing Constructors

This workflow combines Sequence + Choice + Traverse in one pipeline:

  1. Extract (sequence) — Pull items from a source
  2. Validate (choice) — Branch based on whether all items are valid
  3. Enrich (traverse via MapNode) — Map an action over each valid item
  4. Aggregate (sequence) — Summarize the enriched results
stateDiagram-v2
    [*] --> extract
    extract --> validate : ok
    validate --> enrich : ok (all valid)
    validate --> quarantine : has_invalid
    enrich --> aggregate : ok
    aggregate --> done : ok
    quarantine --> done : ok
    extract --> failed : error
alias Jido.Composer.Node.MapNode

{:ok, enrich_node} = MapNode.new(
  name: :enrich,
  over: [:validate, :items],
  node: Demo.EnrichAction
)

defmodule Demo.ComposedPipeline do
  @moduledoc false
  use Jido.Composer.Workflow,
    name: "composed_pipeline",
    description: "Sequence + Choice + Traverse in one workflow",
    nodes: %{
      extract: Demo.ExtractAction,
      validate: Demo.ValidateAction,
      enrich: enrich_node,
      aggregate: Demo.AggregateAction,
      quarantine: Demo.QuarantineAction
    },
    transitions: %{
      {:extract, :ok} => :validate,
      {:validate, :ok} => :enrich,
      {:validate, :has_invalid} => :quarantine,
      {:enrich, :ok} => :aggregate,
      {:aggregate, :ok} => :done,
      {:quarantine, :ok} => :done,
      {:_, :error} => :failed
    },
    initial: :extract,
    terminal_states: [:done, :failed],
    success_states: [:done]

  require Demo.Helpers
  Demo.Helpers.suppress_agent_doctests()
end

agent = Demo.ComposedPipeline.new()
{:ok, ctx} = Demo.ComposedPipeline.run_sync(agent, %{source: "inventory_db"})

IO.puts("=== Composed Pipeline (Sequence + Choice + Traverse) ===\n")
IO.puts("Extracted #{ctx[:extract][:count]} items")
IO.puts("Validation result: all_valid=#{ctx[:validate][:all_valid]}")

if ctx[:quarantine] do
  IO.puts("Path taken: extract → validate → quarantine (has_invalid)")
  IO.puts("Quarantined: #{ctx[:quarantine][:quarantined]} items")
  IO.puts("Invalid items: #{inspect(ctx[:quarantine][:items])}")
else
  IO.puts("Path taken: extract → validate → enrich → aggregate")
  IO.puts("Enriched results: #{inspect(ctx[:enrich][:results])}")
  IO.puts("Aggregate: #{inspect(ctx[:aggregate])}")
end

Part 2: Static Graph, Data-Driven Execution

The Workflow DSL defines the full graph statically at compile time via the use Jido.Composer.Workflow macro. The graph — nodes, transitions, terminal states — is fixed at defmodule time. There is no way to add nodes or transitions at runtime.

Fixed at compile time (the graph):

  • Which nodes exist and their names
  • All possible transitions between states
  • The initial and terminal states

Data-driven at runtime (within the fixed graph):

  • Which outcome a node returns (drives choice — but all branches are declared)
  • How many items are in a collection (drives traverse — but the MapNode position is fixed)
  • The actual data flowing through the context

This is not runtime composition — the structure never changes. It’s data-driven behavior within a static graph. The same graph can take different paths depending on the data, but the set of possible paths is fully known and validated at compile time.

# Example: the same workflow with all-valid data takes the enrich path

defmodule Demo.ExtractValidAction do
  use Jido.Action,
    name: "extract_valid",
    description: "Extracts only valid items",
    schema: [source: [type: :string, required: true]]

  def run(%{source: source}, _ctx) do
    items = [
      %{id: 1, name: "Alpha", price: 15, source: source},
      %{id: 2, name: "Beta", price: 30, source: source}
    ]

    {:ok, %{items: items, count: length(items)}}
  end
end

alias Jido.Composer.Node.MapNode

{:ok, enrich_node_v2} = MapNode.new(
  name: :enrich,
  over: [:validate, :items],
  node: Demo.EnrichAction
)

defmodule Demo.ValidPathPipeline do
  @moduledoc false
  use Jido.Composer.Workflow,
    name: "valid_path_pipeline",
    description: "Same graph — different runtime path",
    nodes: %{
      extract: Demo.ExtractValidAction,
      validate: Demo.ValidateAction,
      enrich: enrich_node_v2,
      aggregate: Demo.AggregateAction,
      quarantine: Demo.QuarantineAction
    },
    transitions: %{
      {:extract, :ok} => :validate,
      {:validate, :ok} => :enrich,
      {:validate, :has_invalid} => :quarantine,
      {:enrich, :ok} => :aggregate,
      {:aggregate, :ok} => :done,
      {:quarantine, :ok} => :done,
      {:_, :error} => :failed
    },
    initial: :extract,
    terminal_states: [:done, :failed],
    success_states: [:done]

  require Demo.Helpers
  Demo.Helpers.suppress_agent_doctests()
end

agent = Demo.ValidPathPipeline.new()
{:ok, ctx} = Demo.ValidPathPipeline.run_sync(agent, %{source: "clean_db"})

IO.puts("=== Same Graph, Different Runtime Path ===\n")
IO.puts("Validation: all_valid=#{ctx[:validate][:all_valid]}")
IO.puts("Path taken: extract → validate → enrich → aggregate")
IO.puts("Enriched #{length(ctx[:enrich][:results])} items")
IO.puts("Total price: #{ctx[:aggregate][:total_price]}")

IO.puts("\nThe GRAPH is identical to Part 1 — only the runtime DATA changed.")
IO.puts("Same static structure, different data-driven execution path.")

Part 3: Orchestrator — True Runtime Composition

The Orchestrator is true runtime composition — the LLM decides which tools to invoke, in what order, with what arguments. The execution graph is not defined at compile time; it’s discovered at runtime.

Composer also supports runtime configuration of existing patterns (adjusting parameters without changing structure):

# Orchestrator configure/2 — runtime configuration, not graph construction
#
# In a real app:
#   agent = MyOrchestrator.new()
#   agent = MyOrchestrator.configure(agent, nodes: filtered_tools)
#
# This changes WHICH TOOLS are available, not the orchestrator's
# fundamental nature. The LLM still decides the execution path.
#
# Common patterns:
#   - RBAC: filter tools by user role
#   - Context-dependent: add/remove tools based on conversation state
#   - A/B testing: swap tool implementations

IO.puts("=== Orchestrator: True Runtime Composition ===\n")
IO.puts("The Orchestrator is the only pattern where the execution graph")
IO.puts("is determined at runtime (by the LLM).\n")
IO.puts("configure/2 adjusts parameters (tools, prompt, model) but")
IO.puts("doesn't change the fundamental pattern — the LLM still decides.\n")
IO.puts("See livebook 04 (LLM Orchestrator) for working examples with real LLM calls.")

Skill.assemble/2

Skill.assemble/2 builds an orchestrator agent from capability bundles at runtime. This is runtime configuration of an orchestrator, not runtime workflow construction — the resulting agent still uses the LLM to decide tool sequence.

# Skills are pure data structs:
#
#   math_skill = %Jido.Composer.Skill{
#     name: "math",
#     description: "Arithmetic operations",
#     prompt_fragment: "Use add and multiply for calculations.",
#     tools: [AddAction, MultiplyAction]
#   }
#
#   {:ok, agent} = Skill.assemble([math_skill, data_skill],
#     base_prompt: "You are a helpful assistant.",
#     model: "anthropic:claude-sonnet-4-20250514"
#   )
#
# assemble/2 composes prompt fragments, deduplicates tools, and returns
# a configured orchestrator agent ready for query_sync.

IO.puts("=== Skill.assemble/2 — Runtime Orchestrator Assembly ===\n")
IO.puts("Skills package capabilities as data (no module definitions).")
IO.puts("assemble/2 returns a configured orchestrator — the LLM still drives execution.")
IO.puts("\nSee livebook 08 (Dynamic Skill Nodes) for working examples with real LLM calls.")

Part 4: Workflow vs Orchestrator

The key decision: do you know the execution paths at compile time?

Question Yes → Workflow No → Orchestrator
Are all possible paths known? Transitions define them LLM discovers them
Can you draw the full graph? Yes — static FSM Only available tools
Need compile-time validation? Yes — dead ends caught Only tool availability
Need deterministic execution? Yes — same input = same path LLM may vary
Is the collection size variable? Use MapNode (traverse) LLM iterates as needed

Same Problem, Both Ways

Problem: Given a list of numbers, double each one and sum the results.

alias Jido.Composer.Node.MapNode

# --- Workflow approach: static graph with MapNode ---

defmodule Demo.SumAction do
  use Jido.Action,
    name: "sum",
    description: "Sums the values from mapped results",
    schema: []

  def run(params, _ctx) do
    results = get_in(params, [:double, :results]) || []
    total = Enum.reduce(results, 0, fn r, acc -> acc + (r[:value] || 0) end)
    {:ok, %{total: total}}
  end
end

defmodule Demo.GenerateNumbersAction do
  use Jido.Action,
    name: "generate_numbers",
    description: "Generates a list of numbers",
    schema: [numbers: [type: {:list, :integer}, required: true]]

  def run(%{numbers: numbers}, _ctx) do
    items = Enum.map(numbers, fn n -> %{value: n} end)
    {:ok, %{items: items}}
  end
end

{:ok, double_node} = MapNode.new(
  name: :double,
  over: [:generate, :items],
  node: Demo.DoubleValueAction
)

defmodule Demo.DoubleSumWorkflow do
  @moduledoc false
  use Jido.Composer.Workflow,
    name: "double_and_sum",
    description: "Double each number, then sum — fully deterministic",
    nodes: %{
      generate: Demo.GenerateNumbersAction,
      double: double_node,
      sum: Demo.SumAction
    },
    transitions: %{
      {:generate, :ok} => :double,
      {:double, :ok} => :sum,
      {:sum, :ok} => :done,
      {:_, :error} => :failed
    },
    initial: :generate,
    terminal_states: [:done, :failed],
    success_states: [:done]

  require Demo.Helpers
  Demo.Helpers.suppress_agent_doctests()
end

agent = Demo.DoubleSumWorkflow.new()
{:ok, ctx} = Demo.DoubleSumWorkflow.run_sync(agent, %{numbers: [1, 2, 3, 4, 5]})

IO.puts("=== Workflow Approach ===\n")
IO.puts("Input:   [1, 2, 3, 4, 5]")
IO.puts("Doubled: #{inspect(Enum.map(ctx[:double][:results], & &1[:value]))}")
IO.puts("Sum:     #{ctx[:sum][:total]}")
IO.puts("\nFully deterministic. Same input always produces the same result.")
IO.puts("The graph is known at compile time: generate → double(MapNode) → sum.")
# --- Orchestrator approach (conceptual) ---
#
# defmodule MathOrchestrator do
#   use Jido.Composer.Orchestrator,
#     name: "math_orchestrator",
#     model: "anthropic:claude-sonnet-4-20250514",
#     nodes: [DoubleAction, SumAction],
#     system_prompt: "Double each number and sum the results."
# end
#
# The LLM decides how to use the tools. It might:
# - Call double once per number, then sum
# - Call double in a different order
# - Make mistakes and retry
#
# More flexible, but non-deterministic and requires an API key.

IO.puts("=== Orchestrator Approach (Conceptual) ===\n")
IO.puts("The LLM decides tool invocation order at runtime.")
IO.puts("More flexible but non-deterministic — requires an API key.")
IO.puts("\nDecision framework:")
IO.puts("  Predictable paths  → Workflow (compile-time constructors)")
IO.puts("  Discovered paths   → Orchestrator (bind / LLM)")
IO.puts("  Mix of both        → Nest one inside the other")

Next Steps

You’ve seen the full composition constructor framework:

  • Five constructors (sequence, parallel, choice, traverse, identity) define static graphs at compile time via the Workflow DSL
  • Bind (Orchestrator) is true runtime composition — the LLM discovers the execution path
  • Within a static graph, data drives behavior (outcomes, collection sizes) — but the graph structure is fixed
  • Constructors compose freely — any constructor can contain any other

Next guides:

  • LLM Orchestrator (livebook 04) — Orchestrator in depth with real LLM calls
  • Dynamic Skill Nodes (livebook 08) — Runtime orchestrator assembly with skills