Powered by AppSignal & Oban Pro

Prerequisites

multi-agent-orchestration.livemd

Prerequisites

Complete Parent-child agent hierarchies before starting. This notebook focuses on the orchestration layer itself, so the first successful run stays local and deterministic.

Setup

Mix.install([
  {{mix_dep:jido}},
  {{mix_dep:jido_ai}}
])

Logger.configure(level: :warning)

This tutorial runs entirely locally. Later sections show where AI planning and file-based skills fit, but the main orchestration path does not require provider keys.

Polling helper

defmodule MyApp.OrchestrationHelpers do
  def wait_for(fun, timeout \\ 30_000, interval \\ 200) do
    deadline = System.monotonic_time(:millisecond) + timeout
    do_wait(fun, deadline, interval)
  end

  defp do_wait(fun, deadline, interval) do
    if System.monotonic_time(:millisecond) > deadline do
      raise "Timed out waiting for condition"
    end

    case fun.() do
      {:ok, result} ->
        result

      :retry ->
        Process.sleep(interval)
        do_wait(fun, deadline, interval)
    end
  end
end

Start the runtime

case Jido.start() do
  {:ok, _} -> :ok
  {:error, {:already_started, _}} -> :ok
end

runtime = Jido.default_instance()

Register specialist skills

Jido.AI.Skill.Spec lets you describe what a specialist can do without hard-coding that description into every prompt or module. In this notebook, the registry is local metadata that the coordinator and specialists can inspect at runtime.

alias Jido.AI.Skill.{Registry, Spec}

:ok = Registry.clear()

planner_skill = %Spec{
  name: "release-planner",
  description: "Breaks a goal into concrete milestones and sequencing notes.",
  tags: ["planning", "coordination"],
  vsn: "1.0.0"
}

research_skill = %Spec{
  name: "release-researcher",
  description: "Collects supporting facts, risks, and open questions for a milestone.",
  tags: ["research", "analysis"],
  vsn: "1.0.0"
}

writer_skill = %Spec{
  name: "release-writer",
  description: "Turns specialist input into a concise stakeholder-ready summary.",
  tags: ["writing", "synthesis"],
  vsn: "1.0.0"
}

Enum.each([planner_skill, research_skill, writer_skill], &Registry.register/1)

Registry.list()

Architecture

The coordinator owns the overall goal, spawns three specialists, assigns work, and aggregates their outputs.

Coordinator
    |
    |-- spawns --> PlannerAgent
    |-- spawns --> ResearcherAgent
    |-- spawns --> WriterAgent
    |
    |-- emits --> specialist.work
    |<-- receives -- specialist.result
    |
    |--> assembled final result

Define the specialist action

Each specialist handles the same specialist.work signal. The skill registry tells the action what capability that specialist represents.

defmodule MyApp.HandleWorkAction do
  use Jido.Action,
    name: "handle_specialist_work",
    schema: [
      task: [type: :string, required: true],
      task_id: [type: :string, required: true],
      role: [type: :string, required: true],
      skill_name: [type: :string, required: true]
    ]

  alias Jido.Agent.Directive
  alias Jido.AI.Skill.Registry

  def run(params, context) do
    {:ok, skill} = Registry.lookup(params.skill_name)

    output = """
    #{String.capitalize(params.role)} output
    Skill: #{skill.name}
    Capability: #{skill.description}
    Completed task: #{params.task}
    """

    result_signal =
      Jido.Signal.new!(
        "specialist.result",
        %{
          task_id: params.task_id,
          role: params.role,
          skill_name: skill.name,
          output: output
        },
        source: "/specialist/#{params.role}"
      )

    agent_like = %{state: context.state}
    emit = Directive.emit_to_parent(agent_like, result_signal)

    {:ok,
     %{last_task: params.task_id, last_skill: skill.name, last_output: output},
     List.wrap(emit)}
  end
end

Define the specialist agents

These are plain Jido.Agent modules because the tutorial is about orchestration. Replacing HandleWorkAction with LLM-backed work is an extension, not the first step.

defmodule MyApp.PlannerAgent do
  use Jido.Agent,
    name: "planner_agent",
    description: "Planning specialist",
    schema: [
      last_task: [type: :string, default: nil],
      last_skill: [type: :string, default: nil],
      last_output: [type: :string, default: nil]
    ]

  def signal_routes(_ctx) do
    [{"specialist.work", MyApp.HandleWorkAction}]
  end
end

defmodule MyApp.ResearcherAgent do
  use Jido.Agent,
    name: "researcher_agent",
    description: "Research specialist",
    schema: [
      last_task: [type: :string, default: nil],
      last_skill: [type: :string, default: nil],
      last_output: [type: :string, default: nil]
    ]

  def signal_routes(_ctx) do
    [{"specialist.work", MyApp.HandleWorkAction}]
  end
end

defmodule MyApp.WriterAgent do
  use Jido.Agent,
    name: "writer_agent",
    description: "Writing specialist",
    schema: [
      last_task: [type: :string, default: nil],
      last_skill: [type: :string, default: nil],
      last_output: [type: :string, default: nil]
    ]

  def signal_routes(_ctx) do
    [{"specialist.work", MyApp.HandleWorkAction}]
  end
end

Build the coordinator

The coordinator turns one goal into three specialist tasks, spawns children, dispatches work when those children come online, and aggregates the final outputs.

defmodule MyApp.DecomposeGoalAction do
  use Jido.Action,
    name: "decompose_goal",
    schema: [
      goal: [type: :string, required: true]
    ]

  alias Jido.Agent.Directive

  def run(%{goal: goal}, _context) do
    tasks = [
      %{
        id: "plan",
        role: "planner",
        skill_name: "release-planner",
        task: "Outline the release milestones for: #{goal}"
      },
      %{
        id: "research",
        role: "researcher",
        skill_name: "release-researcher",
        task: "List supporting facts and risks for: #{goal}"
      },
      %{
        id: "write",
        role: "writer",
        skill_name: "release-writer",
        task: "Draft the stakeholder-ready summary for: #{goal}"
      }
    ]

    spawns = [
      Directive.spawn_agent(MyApp.PlannerAgent, :planner, meta: Enum.at(tasks, 0)),
      Directive.spawn_agent(MyApp.ResearcherAgent, :researcher, meta: Enum.at(tasks, 1)),
      Directive.spawn_agent(MyApp.WriterAgent, :writer, meta: Enum.at(tasks, 2))
    ]

    {:ok, %{goal: goal, plan: tasks, pending: length(tasks), results: %{}, status: :running}, spawns}
  end
end

defmodule MyApp.CoordinatorChildStartedAction do
  use Jido.Action,
    name: "coordinator_child_started",
    schema: [
      parent_id: [type: :string, required: true],
      child_id: [type: :string, required: true],
      child_module: [type: :any, required: true],
      tag: [type: :any, required: true],
      pid: [type: :any, required: true],
      meta: [type: :map, default: %{}]
    ]

  alias Jido.Agent.Directive

  def run(%{pid: pid, meta: meta}, _context) do
    work_signal =
      Jido.Signal.new!(
        "specialist.work",
        %{
          task_id: Map.get(meta, :task_id) || Map.get(meta, :id),
          role: Map.get(meta, :role),
          task: Map.get(meta, :task),
          skill_name: Map.get(meta, :skill_name)
        },
        source: "/coordinator"
      )

    {:ok, %{}, [Directive.emit_to_pid(work_signal, pid)]}
  end
end

defmodule MyApp.HandleSpecialistResultAction do
  use Jido.Action,
    name: "handle_specialist_result",
    schema: [
      task_id: [type: :string, required: true],
      role: [type: :string, required: true],
      skill_name: [type: :string, required: true],
      output: [type: :string, required: true]
    ]

  def run(params, context) do
    results = Map.get(context.state, :results, %{})
    updated_results = Map.put(results, params.role, Map.take(params, [:task_id, :skill_name, :output]))
    pending = max(Map.get(context.state, :pending, 1) - 1, 0)
    status = if pending == 0, do: :complete, else: :running

    {:ok, %{results: updated_results, pending: pending, status: status}}
  end
end

defmodule MyApp.CoordinatorAgent do
  use Jido.Agent,
    name: "coordinator_agent",
    description: "Coordinates specialized child agents for one goal",
    schema: [
      goal: [type: :string, default: nil],
      plan: [type: {:list, :map}, default: []],
      pending: [type: :integer, default: 0],
      results: [type: :map, default: %{}],
      status: [type: :atom, default: :idle]
    ]

  def signal_routes(_ctx) do
    [
      {"orchestrate", MyApp.DecomposeGoalAction},
      {"jido.agent.child.started", MyApp.CoordinatorChildStartedAction},
      {"specialist.result", MyApp.HandleSpecialistResultAction}
    ]
  end
end

Run the orchestration

Start the coordinator, submit one goal, and wait until all specialists have reported back.

coordinator_id = "coordinator-#{System.unique_integer([:positive])}"

{:ok, coordinator_pid} =
  Jido.start_agent(
    runtime,
    MyApp.CoordinatorAgent,
    id: coordinator_id
  )
signal =
  Jido.Signal.new!(
    "orchestrate",
    %{goal: "Prepare a release readiness brief for the new command palette"},
    source: "/livebook"
  )

{:ok, _agent} = Jido.AgentServer.call(coordinator_pid, signal)
result =
  MyApp.OrchestrationHelpers.wait_for(fn ->
    case Jido.AgentServer.state(coordinator_pid) do
      {:ok, %{agent: %{state: %{status: :complete} = state}}} -> {:ok, state}
      _ -> :retry
    end
  end)
IO.inspect(result.goal, label: "Goal")
IO.inspect(Enum.map(result.plan, &amp;{&amp;1.role, &amp;1.skill_name}), label: "Plan")
IO.inspect(Map.keys(result.results), label: "Completed roles")
IO.inspect(result.pending, label: "Pending")

Enum.each(result.results, fn {role, payload} ->
  IO.puts("\n#{role}")
  IO.puts(payload.output)
end)

You should see three completed roles, one per specialist, and a pending count of 0.

Inspect the hierarchy

{:ok, coordinator_state} = Jido.AgentServer.state(coordinator_pid)

IO.inspect(
  Map.keys(coordinator_state.children),
  label: "Coordinator children"
)

Each child is tracked by tag in the standard children map:

Enum.map(coordinator_state.children, fn {tag, child} ->
  {tag, child.module, child.meta.skill_name}
end)

Where AI planning fits

The runnable path above keeps task decomposition deterministic so the notebook works locally. In a production AI orchestrator, the natural next step is to replace DecomposeGoalAction‘s fixed task list with Planning actions or a mounted Planning plugin.

Use the Planning action directly:

{:ok, result} =
  Jido.Exec.run(Jido.AI.Actions.Planning.Decompose, %{
    goal: "Prepare a release readiness brief for the new command palette",
    max_depth: 2,
    model: "openai:gpt-4o-mini"
  })

Or mount the plugin on the coordinator so the same functionality is available through signals:

defmodule MyApp.AIEnabledCoordinator do
  use Jido.Agent,
    name: "ai_enabled_coordinator",
    plugins: [{Jido.AI.Plugins.Planning, []}]
end

You can also swap the inline Spec structs for file-backed skills with Jido.AI.Skill.Loader.load/1 once you are ready to maintain reusable SKILL.md assets.

Next steps