Prerequisites
Complete Parent-child agent hierarchies before starting. This notebook focuses on the orchestration layer itself, so the first successful run stays local and deterministic.
Setup
Mix.install([
{{mix_dep:jido}},
{{mix_dep:jido_ai}}
])
Logger.configure(level: :warning)
This tutorial runs entirely locally. Later sections show where AI planning and file-based skills fit, but the main orchestration path does not require provider keys.
Polling helper
defmodule MyApp.OrchestrationHelpers do
def wait_for(fun, timeout \\ 30_000, interval \\ 200) do
deadline = System.monotonic_time(:millisecond) + timeout
do_wait(fun, deadline, interval)
end
defp do_wait(fun, deadline, interval) do
if System.monotonic_time(:millisecond) > deadline do
raise "Timed out waiting for condition"
end
case fun.() do
{:ok, result} ->
result
:retry ->
Process.sleep(interval)
do_wait(fun, deadline, interval)
end
end
end
Start the runtime
case Jido.start() do
{:ok, _} -> :ok
{:error, {:already_started, _}} -> :ok
end
runtime = Jido.default_instance()
Register specialist skills
Jido.AI.Skill.Spec lets you describe what a specialist can do without hard-coding that description into every prompt or module. In this notebook, the registry is local metadata that the coordinator and specialists can inspect at runtime.
alias Jido.AI.Skill.{Registry, Spec}
:ok = Registry.clear()
planner_skill = %Spec{
name: "release-planner",
description: "Breaks a goal into concrete milestones and sequencing notes.",
tags: ["planning", "coordination"],
vsn: "1.0.0"
}
research_skill = %Spec{
name: "release-researcher",
description: "Collects supporting facts, risks, and open questions for a milestone.",
tags: ["research", "analysis"],
vsn: "1.0.0"
}
writer_skill = %Spec{
name: "release-writer",
description: "Turns specialist input into a concise stakeholder-ready summary.",
tags: ["writing", "synthesis"],
vsn: "1.0.0"
}
Enum.each([planner_skill, research_skill, writer_skill], &Registry.register/1)
Registry.list()
Architecture
The coordinator owns the overall goal, spawns three specialists, assigns work, and aggregates their outputs.
Coordinator
|
|-- spawns --> PlannerAgent
|-- spawns --> ResearcherAgent
|-- spawns --> WriterAgent
|
|-- emits --> specialist.work
|<-- receives -- specialist.result
|
|--> assembled final result
Define the specialist action
Each specialist handles the same specialist.work signal. The skill registry tells the action what capability that specialist represents.
defmodule MyApp.HandleWorkAction do
use Jido.Action,
name: "handle_specialist_work",
schema: [
task: [type: :string, required: true],
task_id: [type: :string, required: true],
role: [type: :string, required: true],
skill_name: [type: :string, required: true]
]
alias Jido.Agent.Directive
alias Jido.AI.Skill.Registry
def run(params, context) do
{:ok, skill} = Registry.lookup(params.skill_name)
output = """
#{String.capitalize(params.role)} output
Skill: #{skill.name}
Capability: #{skill.description}
Completed task: #{params.task}
"""
result_signal =
Jido.Signal.new!(
"specialist.result",
%{
task_id: params.task_id,
role: params.role,
skill_name: skill.name,
output: output
},
source: "/specialist/#{params.role}"
)
agent_like = %{state: context.state}
emit = Directive.emit_to_parent(agent_like, result_signal)
{:ok,
%{last_task: params.task_id, last_skill: skill.name, last_output: output},
List.wrap(emit)}
end
end
Define the specialist agents
These are plain Jido.Agent modules because the tutorial is about orchestration. Replacing HandleWorkAction with LLM-backed work is an extension, not the first step.
defmodule MyApp.PlannerAgent do
use Jido.Agent,
name: "planner_agent",
description: "Planning specialist",
schema: [
last_task: [type: :string, default: nil],
last_skill: [type: :string, default: nil],
last_output: [type: :string, default: nil]
]
def signal_routes(_ctx) do
[{"specialist.work", MyApp.HandleWorkAction}]
end
end
defmodule MyApp.ResearcherAgent do
use Jido.Agent,
name: "researcher_agent",
description: "Research specialist",
schema: [
last_task: [type: :string, default: nil],
last_skill: [type: :string, default: nil],
last_output: [type: :string, default: nil]
]
def signal_routes(_ctx) do
[{"specialist.work", MyApp.HandleWorkAction}]
end
end
defmodule MyApp.WriterAgent do
use Jido.Agent,
name: "writer_agent",
description: "Writing specialist",
schema: [
last_task: [type: :string, default: nil],
last_skill: [type: :string, default: nil],
last_output: [type: :string, default: nil]
]
def signal_routes(_ctx) do
[{"specialist.work", MyApp.HandleWorkAction}]
end
end
Build the coordinator
The coordinator turns one goal into three specialist tasks, spawns children, dispatches work when those children come online, and aggregates the final outputs.
defmodule MyApp.DecomposeGoalAction do
use Jido.Action,
name: "decompose_goal",
schema: [
goal: [type: :string, required: true]
]
alias Jido.Agent.Directive
def run(%{goal: goal}, _context) do
tasks = [
%{
id: "plan",
role: "planner",
skill_name: "release-planner",
task: "Outline the release milestones for: #{goal}"
},
%{
id: "research",
role: "researcher",
skill_name: "release-researcher",
task: "List supporting facts and risks for: #{goal}"
},
%{
id: "write",
role: "writer",
skill_name: "release-writer",
task: "Draft the stakeholder-ready summary for: #{goal}"
}
]
spawns = [
Directive.spawn_agent(MyApp.PlannerAgent, :planner, meta: Enum.at(tasks, 0)),
Directive.spawn_agent(MyApp.ResearcherAgent, :researcher, meta: Enum.at(tasks, 1)),
Directive.spawn_agent(MyApp.WriterAgent, :writer, meta: Enum.at(tasks, 2))
]
{:ok, %{goal: goal, plan: tasks, pending: length(tasks), results: %{}, status: :running}, spawns}
end
end
defmodule MyApp.CoordinatorChildStartedAction do
use Jido.Action,
name: "coordinator_child_started",
schema: [
parent_id: [type: :string, required: true],
child_id: [type: :string, required: true],
child_module: [type: :any, required: true],
tag: [type: :any, required: true],
pid: [type: :any, required: true],
meta: [type: :map, default: %{}]
]
alias Jido.Agent.Directive
def run(%{pid: pid, meta: meta}, _context) do
work_signal =
Jido.Signal.new!(
"specialist.work",
%{
task_id: Map.get(meta, :task_id) || Map.get(meta, :id),
role: Map.get(meta, :role),
task: Map.get(meta, :task),
skill_name: Map.get(meta, :skill_name)
},
source: "/coordinator"
)
{:ok, %{}, [Directive.emit_to_pid(work_signal, pid)]}
end
end
defmodule MyApp.HandleSpecialistResultAction do
use Jido.Action,
name: "handle_specialist_result",
schema: [
task_id: [type: :string, required: true],
role: [type: :string, required: true],
skill_name: [type: :string, required: true],
output: [type: :string, required: true]
]
def run(params, context) do
results = Map.get(context.state, :results, %{})
updated_results = Map.put(results, params.role, Map.take(params, [:task_id, :skill_name, :output]))
pending = max(Map.get(context.state, :pending, 1) - 1, 0)
status = if pending == 0, do: :complete, else: :running
{:ok, %{results: updated_results, pending: pending, status: status}}
end
end
defmodule MyApp.CoordinatorAgent do
use Jido.Agent,
name: "coordinator_agent",
description: "Coordinates specialized child agents for one goal",
schema: [
goal: [type: :string, default: nil],
plan: [type: {:list, :map}, default: []],
pending: [type: :integer, default: 0],
results: [type: :map, default: %{}],
status: [type: :atom, default: :idle]
]
def signal_routes(_ctx) do
[
{"orchestrate", MyApp.DecomposeGoalAction},
{"jido.agent.child.started", MyApp.CoordinatorChildStartedAction},
{"specialist.result", MyApp.HandleSpecialistResultAction}
]
end
end
Run the orchestration
Start the coordinator, submit one goal, and wait until all specialists have reported back.
coordinator_id = "coordinator-#{System.unique_integer([:positive])}"
{:ok, coordinator_pid} =
Jido.start_agent(
runtime,
MyApp.CoordinatorAgent,
id: coordinator_id
)
signal =
Jido.Signal.new!(
"orchestrate",
%{goal: "Prepare a release readiness brief for the new command palette"},
source: "/livebook"
)
{:ok, _agent} = Jido.AgentServer.call(coordinator_pid, signal)
result =
MyApp.OrchestrationHelpers.wait_for(fn ->
case Jido.AgentServer.state(coordinator_pid) do
{:ok, %{agent: %{state: %{status: :complete} = state}}} -> {:ok, state}
_ -> :retry
end
end)
IO.inspect(result.goal, label: "Goal")
IO.inspect(Enum.map(result.plan, &{&1.role, &1.skill_name}), label: "Plan")
IO.inspect(Map.keys(result.results), label: "Completed roles")
IO.inspect(result.pending, label: "Pending")
Enum.each(result.results, fn {role, payload} ->
IO.puts("\n#{role}")
IO.puts(payload.output)
end)
You should see three completed roles, one per specialist, and a pending count of 0.
Inspect the hierarchy
{:ok, coordinator_state} = Jido.AgentServer.state(coordinator_pid)
IO.inspect(
Map.keys(coordinator_state.children),
label: "Coordinator children"
)
Each child is tracked by tag in the standard children map:
Enum.map(coordinator_state.children, fn {tag, child} ->
{tag, child.module, child.meta.skill_name}
end)
Where AI planning fits
The runnable path above keeps task decomposition deterministic so the notebook works locally. In a production AI orchestrator, the natural next step is to replace DecomposeGoalAction‘s fixed task list with Planning actions or a mounted Planning plugin.
Use the Planning action directly:
{:ok, result} =
Jido.Exec.run(Jido.AI.Actions.Planning.Decompose, %{
goal: "Prepare a release readiness brief for the new command palette",
max_depth: 2,
model: "openai:gpt-4o-mini"
})
Or mount the plugin on the coordinator so the same functionality is available through signals:
defmodule MyApp.AIEnabledCoordinator do
use Jido.Agent,
name: "ai_enabled_coordinator",
plugins: [{Jido.AI.Plugins.Planning, []}]
end
You can also swap the inline Spec structs for file-backed skills with Jido.AI.Skill.Loader.load/1 once you are ready to maintain reusable SKILL.md assets.
Next steps
- Extend the hierarchy with real worker trees in Parent-child agent hierarchies
- Add planning prompts and LLM-backed specialists in Reasoning strategies compared
- Reuse this signal pattern inside larger systems with Build your first workflow