Prerequisites
Complete State machines with FSM before starting this tutorial. You need familiarity with Signal routing, Directives, and running Agents in the Jido runtime.
Setup
Mix.install([
{:jido, "~> 2.0"}
])
A polling helper replaces the test-only eventually macro. You will use it later to wait for asynchronous results.
defmodule Helpers do
def wait_for(fun, timeout \\ 10_000, interval \\ 200) do
deadline = System.monotonic_time(:millisecond) + timeout
do_wait(fun, deadline, interval)
end
defp do_wait(fun, deadline, interval) do
if System.monotonic_time(:millisecond) > deadline do
raise "Timed out waiting for condition"
end
case fun.() do
{:ok, result} -> result
:retry -> Process.sleep(interval); do_wait(fun, deadline, interval)
end
end
end
The hierarchy pattern
Some work decomposes naturally into layers. A job splits into tasks, and each task becomes an isolated unit of execution. This tutorial builds a three-layer processing system:
Orchestrator (layer 1)
|
|-- spawns --> Coordinator (layer 2)
| |
| |-- spawns --> Worker A (layer 3)
| |-- spawns --> Worker B (layer 3)
| |
| |<-- task.result -- Worker A
| |<-- task.result -- Worker B
| |
|<-- job.result -- Coordinator (aggregated)
Signals flow downward as work assignments (job.assign, task.execute) and upward as results (task.result, job.result). Each layer only talks to its direct parent or children. The runtime handles process lifecycle through the SpawnAgent Directive and the built-in jido.agent.child.started Signal.
Define the Worker
The Worker Agent processes individual tasks. Its single Action computes a result and emits it to the parent Coordinator using Directive.emit_to_parent/2.
defmodule MyApp.ExecuteTaskAction do
use Jido.Action,
name: "execute_task",
schema: [
task_id: [type: :string, required: true],
job_id: [type: :string, required: true],
operation: [type: :atom, required: true],
value: [type: :integer, required: true]
]
alias Jido.Agent.Directive
def run(params, context) do
result = execute(params.operation, params.value)
result_signal =
Jido.Signal.new!(
"task.result",
%{
task_id: params.task_id,
job_id: params.job_id,
result: result,
operation: params.operation
},
source: "/worker"
)
agent_like = %{state: context.state}
emit_directive = Directive.emit_to_parent(agent_like, result_signal)
{:ok,
%{
last_task: %{task_id: params.task_id, result: result},
tasks_completed: Map.get(context.state, :tasks_completed, 0) + 1
}, List.wrap(emit_directive)}
end
defp execute(:compute, v), do: v * 2 + 1
defp execute(:validate, v), do: if(v > 0, do: :valid, else: :invalid)
defp execute(:transform, v), do: "#{v}_processed"
defp execute(_, v), do: v
end
emit_to_parent/2 reads the __parent__ reference from agent state (set automatically when the runtime spawns a child) and returns an Emit Directive targeting the parent PID. It returns nil when no parent exists, so List.wrap/1 safely handles both cases.
The Worker Agent itself is minimal. It maps the task.execute Signal to the Action above.
defmodule MyApp.WorkerAgent do
use Jido.Agent,
name: "worker_agent",
schema: [
last_task: [type: :map, default: nil],
tasks_completed: [type: :integer, default: 0]
]
def signal_routes(_ctx) do
[{"task.execute", MyApp.ExecuteTaskAction}]
end
end
Define the Coordinator
The Coordinator receives a job assignment, spawns Worker children, dispatches tasks, and aggregates results. This requires three Actions.
Handle job assignment. When the Coordinator receives a job.assign Signal, it records the job metadata and spawns one Worker per task using Directive.spawn_agent/3.
defmodule MyApp.HandleJobAssignAction do
use Jido.Action,
name: "handle_job_assign",
schema: [
job_id: [type: :string, required: true],
tasks: [type: {:list, :map}, required: true]
]
alias Jido.Agent.Directive
def run(%{job_id: job_id, tasks: tasks}, context) do
pending = Map.get(context.state, :pending_tasks, %{})
job_info = %{
job_id: job_id,
total_tasks: length(tasks),
completed_tasks: 0,
results: [],
started_at: DateTime.utc_now()
}
updated_pending = Map.put(pending, job_id, job_info)
spawn_directives =
Enum.map(tasks, fn task ->
task_id = "#{job_id}-task-#{task.index}"
Directive.spawn_agent(
MyApp.WorkerAgent,
String.to_atom(task_id),
meta: %{
task_id: task_id,
job_id: job_id,
operation: task.operation,
value: task.value
}
)
end)
{:ok, %{pending_tasks: updated_pending, current_job: job_id},
spawn_directives}
end
end
The meta map on each SpawnAgent Directive carries task details. When the child starts, the runtime delivers a jido.agent.child.started Signal back to this Coordinator with that metadata attached.
React to child startup. When a Worker child starts, the Coordinator sends it a task.execute Signal using Directive.emit_to_pid/2.
defmodule MyApp.CoordinatorChildStartedAction do
use Jido.Action,
name: "coordinator_child_started",
schema: [
parent_id: [type: :string, required: true],
child_id: [type: :string, required: true],
child_module: [type: :any, required: true],
tag: [type: :any, required: true],
pid: [type: :any, required: true],
meta: [type: :map, default: %{}]
]
alias Jido.Agent.Directive
def run(%{pid: pid, meta: meta}, _context) do
task_signal =
Jido.Signal.new!(
"task.execute",
%{
task_id: meta.task_id,
job_id: meta.job_id,
operation: meta.operation,
value: meta.value
},
source: "/coordinator"
)
{:ok, %{}, [Directive.emit_to_pid(task_signal, pid)]}
end
end
The jido.agent.child.started Signal payload includes the child’s pid and the meta you passed in the SpawnAgent Directive. This is the hook that connects spawning to dispatching.
Aggregate task results. Each Worker emits a task.result Signal to its parent. The Coordinator collects them and, when the count matches the total, emits job.result up to the Orchestrator.
defmodule MyApp.HandleTaskResultAction do
use Jido.Action,
name: "handle_task_result",
schema: [
task_id: [type: :string, required: true],
job_id: [type: :string, required: true],
result: [type: :any, required: true],
operation: [type: :atom, required: true]
]
alias Jido.Agent.Directive
alias Jido.Agent.StateOp
def run(params, context) do
pending = Map.get(context.state, :pending_tasks, %{})
job_info =
Map.get(pending, params.job_id, %{
results: [],
completed_tasks: 0,
total_tasks: 0
})
task_result = %{
task_id: params.task_id,
result: params.result,
operation: params.operation
}
updated = %{
job_info
| results: [task_result | job_info.results],
completed_tasks: job_info.completed_tasks + 1
}
updated_pending = Map.put(pending, params.job_id, updated)
if updated.completed_tasks >= updated.total_tasks do
emit_job_result(params, updated, updated_pending, context)
else
{:ok, %{pending_tasks: updated_pending}}
end
end
defp emit_job_result(params, updated, pending, context) do
signal =
Jido.Signal.new!(
"job.result",
%{
job_id: params.job_id,
results: updated.results,
total_tasks: updated.total_tasks
},
source: "/coordinator"
)
agent_like = %{state: context.state}
emit = Directive.emit_to_parent(agent_like, signal)
completed = Map.get(context.state, :completed_jobs, [])
set_op = StateOp.set_path([:pending_tasks], Map.delete(pending, params.job_id))
{:ok, %{completed_jobs: [params.job_id | completed]},
[set_op | List.wrap(emit)]}
end
end
StateOp.set_path/2 directly overwrites a nested state key. This removes the completed job from pending_tasks without a merge conflict.
The Coordinator Agent wires these three Actions to their respective Signals.
defmodule MyApp.CoordinatorAgent do
use Jido.Agent,
name: "coordinator_agent",
schema: [
pending_tasks: [type: :map, default: %{}],
current_job: [type: :string, default: nil],
completed_jobs: [type: {:list, :string}, default: []]
]
def signal_routes(_ctx) do
[
{"job.assign", MyApp.HandleJobAssignAction},
{"jido.agent.child.started", MyApp.CoordinatorChildStartedAction},
{"task.result", MyApp.HandleTaskResultAction}
]
end
end
Define the Orchestrator
The Orchestrator is the entry point. It accepts job submissions, spawns a Coordinator per job, and collects final results.
Submit a job. Generate a unique job ID, record it as pending, and spawn a Coordinator with the job metadata.
defmodule MyApp.SubmitJobAction do
use Jido.Action,
name: "submit_job",
schema: [
job_name: [type: :string, required: true],
tasks: [type: {:list, :map}, required: true]
]
alias Jido.Agent.Directive
def run(%{job_name: job_name, tasks: tasks}, context) do
job_id = "job-#{System.unique_integer([:positive])}"
tag = String.to_atom("coordinator-#{job_id}")
pending = Map.get(context.state, :pending_jobs, %{})
job_info = %{
job_id: job_id,
job_name: job_name,
tasks: tasks,
coordinator_tag: tag,
submitted_at: DateTime.utc_now()
}
spawn =
Directive.spawn_agent(
MyApp.CoordinatorAgent,
tag,
meta: %{job_id: job_id, job_name: job_name, tasks: tasks}
)
{:ok, %{pending_jobs: Map.put(pending, job_id, job_info),
last_submitted: job_id}, [spawn]}
end
end
React to Coordinator startup. When the Coordinator child starts, send it the job.assign Signal with the task list. Tasks get indexed here so the Coordinator can track them.
defmodule MyApp.OrchestratorChildStartedAction do
use Jido.Action,
name: "orchestrator_child_started",
schema: [
parent_id: [type: :string, required: true],
child_id: [type: :string, required: true],
child_module: [type: :any, required: true],
tag: [type: :any, required: true],
pid: [type: :any, required: true],
meta: [type: :map, default: %{}]
]
alias Jido.Agent.Directive
def run(%{pid: pid, meta: meta}, _context) do
indexed_tasks =
meta.tasks
|> Enum.with_index(1)
|> Enum.map(fn {task, idx} -> Map.put(task, :index, idx) end)
signal =
Jido.Signal.new!(
"job.assign",
%{job_id: meta.job_id, tasks: indexed_tasks},
source: "/orchestrator"
)
{:ok, %{}, [Directive.emit_to_pid(signal, pid)]}
end
end
Aggregate job results. When a Coordinator finishes, it emits job.result to the Orchestrator. This Action moves the job from pending to completed.
defmodule MyApp.HandleJobResultAction do
use Jido.Action,
name: "handle_job_result",
schema: [
job_id: [type: :string, required: true],
results: [type: {:list, :map}, required: true],
total_tasks: [type: :integer, required: true]
]
alias Jido.Agent.StateOp
def run(params, context) do
pending = Map.get(context.state, :pending_jobs, %{})
completed = Map.get(context.state, :completed_jobs, [])
job_info = Map.get(pending, params.job_id, %{})
record = %{
job_id: params.job_id,
job_name: Map.get(job_info, :job_name, "unknown"),
total_tasks: params.total_tasks,
results: params.results,
completed_at: DateTime.utc_now()
}
set_op =
StateOp.set_path(
[:pending_jobs],
Map.delete(pending, params.job_id)
)
{:ok, %{completed_jobs: [record | completed]}, [set_op]}
end
end
Wire the Signal routes
The Orchestrator Agent maps three Signals: incoming job submissions, child lifecycle events, and final results from Coordinators.
defmodule MyApp.OrchestratorAgent do
use Jido.Agent,
name: "orchestrator_agent",
schema: [
pending_jobs: [type: :map, default: %{}],
completed_jobs: [type: {:list, :map}, default: []],
last_submitted: [type: :string, default: nil]
]
def signal_routes(_ctx) do
[
{"submit_job", MyApp.SubmitJobAction},
{"jido.agent.child.started", MyApp.OrchestratorChildStartedAction},
{"job.result", MyApp.HandleJobResultAction}
]
end
end
Each layer handles exactly the Signals it needs. Workers never see job.assign. Coordinators never see submit_job. The jido.agent.child.started Signal is the glue between spawning and dispatching at every level.
Run the hierarchy
Start the Jido runtime and launch the Orchestrator. Then submit a job with multiple tasks.
{:ok, jido} = Jido.start_link(name: :learn_hierarchy)
{:ok, orchestrator_pid} =
Jido.start_agent(
jido,
MyApp.OrchestratorAgent,
id: "orchestrator-1"
)
Build a submit_job Signal with two compute tasks and one validation task.
signal =
Jido.Signal.new!(
"submit_job",
%{
job_name: "mixed_job",
tasks: [
%{operation: :compute, value: 5},
%{operation: :compute, value: 10},
%{operation: :validate, value: 42}
]
},
source: "/livebook"
)
{:ok, _agent} = Jido.AgentServer.call(orchestrator_pid, signal)
The call returns immediately after the Orchestrator processes the submit_job Signal and emits the SpawnAgent Directive. The Coordinator and Workers start asynchronously. Poll for the completed results.
result =
Helpers.wait_for(fn ->
case Jido.AgentServer.state(orchestrator_pid) do
{:ok, %{agent: %{state: %{completed_jobs: [job | _]}}}} ->
{:ok, job}
_ ->
:retry
end
end)
IO.inspect(result.job_name, label: "Job")
IO.inspect(result.total_tasks, label: "Tasks completed")
for r <- result.results do
IO.inspect({r.operation, r.result}, label: " result")
end
You should see three results: :compute yielding 11 and 21, and :validate yielding :valid.
Inspect state across layers
Use AgentServer.state/1 to examine the hierarchy at each level. The Orchestrator’s children map contains the Coordinator, and the Coordinator’s children map contains the Workers.
{:ok, orch_state} = Jido.AgentServer.state(orchestrator_pid)
IO.inspect(
Map.keys(orch_state.children),
label: "Orchestrator children"
)
Each entry in the children map holds the child’s PID, module, and tag. You can walk further down the tree.
[coordinator_info | _] = Map.values(orch_state.children)
{:ok, coord_state} =
Jido.AgentServer.state(coordinator_info.pid)
IO.inspect(
Map.keys(coord_state.children),
label: "Coordinator children (workers)"
)
IO.inspect(
coord_state.agent.state.completed_jobs,
label: "Coordinator completed jobs"
)
The Orchestrator tracks Coordinators. Each Coordinator tracks its Workers. No layer knows about agents two levels away.
Next steps
- Sensors and real-time events connects external event sources to agent hierarchies
-
Directives covers the full Directive API including
stop_child/1andschedule/2 -
Agent runtime explains supervision, process lifecycle, and the
childrenmap