Powered by AppSignal & Oban Pro

Prerequisites

parent-child-agent-hierarchies.livemd

Prerequisites

Complete State machines with FSM before starting this tutorial. You need familiarity with Signal routing, Directives, and running Agents in the Jido runtime.

Setup

Mix.install([
  {:jido, "~> 2.0"}
])

A polling helper replaces the test-only eventually macro. You will use it later to wait for asynchronous results.

defmodule Helpers do
  def wait_for(fun, timeout \\ 10_000, interval \\ 200) do
    deadline = System.monotonic_time(:millisecond) + timeout
    do_wait(fun, deadline, interval)
  end

  defp do_wait(fun, deadline, interval) do
    if System.monotonic_time(:millisecond) > deadline do
      raise "Timed out waiting for condition"
    end

    case fun.() do
      {:ok, result} -> result
      :retry -> Process.sleep(interval); do_wait(fun, deadline, interval)
    end
  end
end

The hierarchy pattern

Some work decomposes naturally into layers. A job splits into tasks, and each task becomes an isolated unit of execution. This tutorial builds a three-layer processing system:

Orchestrator (layer 1)
    |
    |-- spawns --> Coordinator (layer 2)
    |                  |
    |                  |-- spawns --> Worker A (layer 3)
    |                  |-- spawns --> Worker B (layer 3)
    |                  |
    |                  |<-- task.result -- Worker A
    |                  |<-- task.result -- Worker B
    |                  |
    |<-- job.result -- Coordinator (aggregated)

Signals flow downward as work assignments (job.assign, task.execute) and upward as results (task.result, job.result). Each layer only talks to its direct parent or children. The runtime handles process lifecycle through the SpawnAgent Directive and the built-in jido.agent.child.started Signal.

Define the Worker

The Worker Agent processes individual tasks. Its single Action computes a result and emits it to the parent Coordinator using Directive.emit_to_parent/2.

defmodule MyApp.ExecuteTaskAction do
  use Jido.Action,
    name: "execute_task",
    schema: [
      task_id: [type: :string, required: true],
      job_id: [type: :string, required: true],
      operation: [type: :atom, required: true],
      value: [type: :integer, required: true]
    ]

  alias Jido.Agent.Directive

  def run(params, context) do
    result = execute(params.operation, params.value)

    result_signal =
      Jido.Signal.new!(
        "task.result",
        %{
          task_id: params.task_id,
          job_id: params.job_id,
          result: result,
          operation: params.operation
        },
        source: "/worker"
      )

    agent_like = %{state: context.state}
    emit_directive = Directive.emit_to_parent(agent_like, result_signal)

    {:ok,
     %{
       last_task: %{task_id: params.task_id, result: result},
       tasks_completed: Map.get(context.state, :tasks_completed, 0) + 1
     }, List.wrap(emit_directive)}
  end

  defp execute(:compute, v), do: v * 2 + 1
  defp execute(:validate, v), do: if(v > 0, do: :valid, else: :invalid)
  defp execute(:transform, v), do: "#{v}_processed"
  defp execute(_, v), do: v
end

emit_to_parent/2 reads the __parent__ reference from agent state (set automatically when the runtime spawns a child) and returns an Emit Directive targeting the parent PID. It returns nil when no parent exists, so List.wrap/1 safely handles both cases.

The Worker Agent itself is minimal. It maps the task.execute Signal to the Action above.

defmodule MyApp.WorkerAgent do
  use Jido.Agent,
    name: "worker_agent",
    schema: [
      last_task: [type: :map, default: nil],
      tasks_completed: [type: :integer, default: 0]
    ]

  def signal_routes(_ctx) do
    [{"task.execute", MyApp.ExecuteTaskAction}]
  end
end

Define the Coordinator

The Coordinator receives a job assignment, spawns Worker children, dispatches tasks, and aggregates results. This requires three Actions.

Handle job assignment. When the Coordinator receives a job.assign Signal, it records the job metadata and spawns one Worker per task using Directive.spawn_agent/3.

defmodule MyApp.HandleJobAssignAction do
  use Jido.Action,
    name: "handle_job_assign",
    schema: [
      job_id: [type: :string, required: true],
      tasks: [type: {:list, :map}, required: true]
    ]

  alias Jido.Agent.Directive

  def run(%{job_id: job_id, tasks: tasks}, context) do
    pending = Map.get(context.state, :pending_tasks, %{})

    job_info = %{
      job_id: job_id,
      total_tasks: length(tasks),
      completed_tasks: 0,
      results: [],
      started_at: DateTime.utc_now()
    }

    updated_pending = Map.put(pending, job_id, job_info)

    spawn_directives =
      Enum.map(tasks, fn task ->
        task_id = "#{job_id}-task-#{task.index}"

        Directive.spawn_agent(
          MyApp.WorkerAgent,
          String.to_atom(task_id),
          meta: %{
            task_id: task_id,
            job_id: job_id,
            operation: task.operation,
            value: task.value
          }
        )
      end)

    {:ok, %{pending_tasks: updated_pending, current_job: job_id},
     spawn_directives}
  end
end

The meta map on each SpawnAgent Directive carries task details. When the child starts, the runtime delivers a jido.agent.child.started Signal back to this Coordinator with that metadata attached.

React to child startup. When a Worker child starts, the Coordinator sends it a task.execute Signal using Directive.emit_to_pid/2.

defmodule MyApp.CoordinatorChildStartedAction do
  use Jido.Action,
    name: "coordinator_child_started",
    schema: [
      parent_id: [type: :string, required: true],
      child_id: [type: :string, required: true],
      child_module: [type: :any, required: true],
      tag: [type: :any, required: true],
      pid: [type: :any, required: true],
      meta: [type: :map, default: %{}]
    ]

  alias Jido.Agent.Directive

  def run(%{pid: pid, meta: meta}, _context) do
    task_signal =
      Jido.Signal.new!(
        "task.execute",
        %{
          task_id: meta.task_id,
          job_id: meta.job_id,
          operation: meta.operation,
          value: meta.value
        },
        source: "/coordinator"
      )

    {:ok, %{}, [Directive.emit_to_pid(task_signal, pid)]}
  end
end

The jido.agent.child.started Signal payload includes the child’s pid and the meta you passed in the SpawnAgent Directive. This is the hook that connects spawning to dispatching.

Aggregate task results. Each Worker emits a task.result Signal to its parent. The Coordinator collects them and, when the count matches the total, emits job.result up to the Orchestrator.

defmodule MyApp.HandleTaskResultAction do
  use Jido.Action,
    name: "handle_task_result",
    schema: [
      task_id: [type: :string, required: true],
      job_id: [type: :string, required: true],
      result: [type: :any, required: true],
      operation: [type: :atom, required: true]
    ]

  alias Jido.Agent.Directive
  alias Jido.Agent.StateOp

  def run(params, context) do
    pending = Map.get(context.state, :pending_tasks, %{})

    job_info =
      Map.get(pending, params.job_id, %{
        results: [],
        completed_tasks: 0,
        total_tasks: 0
      })

    task_result = %{
      task_id: params.task_id,
      result: params.result,
      operation: params.operation
    }

    updated = %{
      job_info
      | results: [task_result | job_info.results],
        completed_tasks: job_info.completed_tasks + 1
    }

    updated_pending = Map.put(pending, params.job_id, updated)

    if updated.completed_tasks >= updated.total_tasks do
      emit_job_result(params, updated, updated_pending, context)
    else
      {:ok, %{pending_tasks: updated_pending}}
    end
  end

  defp emit_job_result(params, updated, pending, context) do
    signal =
      Jido.Signal.new!(
        "job.result",
        %{
          job_id: params.job_id,
          results: updated.results,
          total_tasks: updated.total_tasks
        },
        source: "/coordinator"
      )

    agent_like = %{state: context.state}
    emit = Directive.emit_to_parent(agent_like, signal)
    completed = Map.get(context.state, :completed_jobs, [])
    set_op = StateOp.set_path([:pending_tasks], Map.delete(pending, params.job_id))

    {:ok, %{completed_jobs: [params.job_id | completed]},
     [set_op | List.wrap(emit)]}
  end
end

StateOp.set_path/2 directly overwrites a nested state key. This removes the completed job from pending_tasks without a merge conflict.

The Coordinator Agent wires these three Actions to their respective Signals.

defmodule MyApp.CoordinatorAgent do
  use Jido.Agent,
    name: "coordinator_agent",
    schema: [
      pending_tasks: [type: :map, default: %{}],
      current_job: [type: :string, default: nil],
      completed_jobs: [type: {:list, :string}, default: []]
    ]

  def signal_routes(_ctx) do
    [
      {"job.assign", MyApp.HandleJobAssignAction},
      {"jido.agent.child.started", MyApp.CoordinatorChildStartedAction},
      {"task.result", MyApp.HandleTaskResultAction}
    ]
  end
end

Define the Orchestrator

The Orchestrator is the entry point. It accepts job submissions, spawns a Coordinator per job, and collects final results.

Submit a job. Generate a unique job ID, record it as pending, and spawn a Coordinator with the job metadata.

defmodule MyApp.SubmitJobAction do
  use Jido.Action,
    name: "submit_job",
    schema: [
      job_name: [type: :string, required: true],
      tasks: [type: {:list, :map}, required: true]
    ]

  alias Jido.Agent.Directive

  def run(%{job_name: job_name, tasks: tasks}, context) do
    job_id = "job-#{System.unique_integer([:positive])}"
    tag = String.to_atom("coordinator-#{job_id}")
    pending = Map.get(context.state, :pending_jobs, %{})

    job_info = %{
      job_id: job_id,
      job_name: job_name,
      tasks: tasks,
      coordinator_tag: tag,
      submitted_at: DateTime.utc_now()
    }

    spawn =
      Directive.spawn_agent(
        MyApp.CoordinatorAgent,
        tag,
        meta: %{job_id: job_id, job_name: job_name, tasks: tasks}
      )

    {:ok, %{pending_jobs: Map.put(pending, job_id, job_info),
            last_submitted: job_id}, [spawn]}
  end
end

React to Coordinator startup. When the Coordinator child starts, send it the job.assign Signal with the task list. Tasks get indexed here so the Coordinator can track them.

defmodule MyApp.OrchestratorChildStartedAction do
  use Jido.Action,
    name: "orchestrator_child_started",
    schema: [
      parent_id: [type: :string, required: true],
      child_id: [type: :string, required: true],
      child_module: [type: :any, required: true],
      tag: [type: :any, required: true],
      pid: [type: :any, required: true],
      meta: [type: :map, default: %{}]
    ]

  alias Jido.Agent.Directive

  def run(%{pid: pid, meta: meta}, _context) do
    indexed_tasks =
      meta.tasks
      |> Enum.with_index(1)
      |> Enum.map(fn {task, idx} -> Map.put(task, :index, idx) end)

    signal =
      Jido.Signal.new!(
        "job.assign",
        %{job_id: meta.job_id, tasks: indexed_tasks},
        source: "/orchestrator"
      )

    {:ok, %{}, [Directive.emit_to_pid(signal, pid)]}
  end
end

Aggregate job results. When a Coordinator finishes, it emits job.result to the Orchestrator. This Action moves the job from pending to completed.

defmodule MyApp.HandleJobResultAction do
  use Jido.Action,
    name: "handle_job_result",
    schema: [
      job_id: [type: :string, required: true],
      results: [type: {:list, :map}, required: true],
      total_tasks: [type: :integer, required: true]
    ]

  alias Jido.Agent.StateOp

  def run(params, context) do
    pending = Map.get(context.state, :pending_jobs, %{})
    completed = Map.get(context.state, :completed_jobs, [])
    job_info = Map.get(pending, params.job_id, %{})

    record = %{
      job_id: params.job_id,
      job_name: Map.get(job_info, :job_name, "unknown"),
      total_tasks: params.total_tasks,
      results: params.results,
      completed_at: DateTime.utc_now()
    }

    set_op =
      StateOp.set_path(
        [:pending_jobs],
        Map.delete(pending, params.job_id)
      )

    {:ok, %{completed_jobs: [record | completed]}, [set_op]}
  end
end

Wire the Signal routes

The Orchestrator Agent maps three Signals: incoming job submissions, child lifecycle events, and final results from Coordinators.

defmodule MyApp.OrchestratorAgent do
  use Jido.Agent,
    name: "orchestrator_agent",
    schema: [
      pending_jobs: [type: :map, default: %{}],
      completed_jobs: [type: {:list, :map}, default: []],
      last_submitted: [type: :string, default: nil]
    ]

  def signal_routes(_ctx) do
    [
      {"submit_job", MyApp.SubmitJobAction},
      {"jido.agent.child.started", MyApp.OrchestratorChildStartedAction},
      {"job.result", MyApp.HandleJobResultAction}
    ]
  end
end

Each layer handles exactly the Signals it needs. Workers never see job.assign. Coordinators never see submit_job. The jido.agent.child.started Signal is the glue between spawning and dispatching at every level.

Run the hierarchy

Start the Jido runtime and launch the Orchestrator. Then submit a job with multiple tasks.

{:ok, jido} = Jido.start_link(name: :learn_hierarchy)

{:ok, orchestrator_pid} =
  Jido.start_agent(
    jido,
    MyApp.OrchestratorAgent,
    id: "orchestrator-1"
  )

Build a submit_job Signal with two compute tasks and one validation task.

signal =
  Jido.Signal.new!(
    "submit_job",
    %{
      job_name: "mixed_job",
      tasks: [
        %{operation: :compute, value: 5},
        %{operation: :compute, value: 10},
        %{operation: :validate, value: 42}
      ]
    },
    source: "/livebook"
  )

{:ok, _agent} = Jido.AgentServer.call(orchestrator_pid, signal)

The call returns immediately after the Orchestrator processes the submit_job Signal and emits the SpawnAgent Directive. The Coordinator and Workers start asynchronously. Poll for the completed results.

result =
  Helpers.wait_for(fn ->
    case Jido.AgentServer.state(orchestrator_pid) do
      {:ok, %{agent: %{state: %{completed_jobs: [job | _]}}}} ->
        {:ok, job}

      _ ->
        :retry
    end
  end)

IO.inspect(result.job_name, label: "Job")
IO.inspect(result.total_tasks, label: "Tasks completed")

for r <- result.results do
  IO.inspect({r.operation, r.result}, label: "  result")
end

You should see three results: :compute yielding 11 and 21, and :validate yielding :valid.

Inspect state across layers

Use AgentServer.state/1 to examine the hierarchy at each level. The Orchestrator’s children map contains the Coordinator, and the Coordinator’s children map contains the Workers.

{:ok, orch_state} = Jido.AgentServer.state(orchestrator_pid)

IO.inspect(
  Map.keys(orch_state.children),
  label: "Orchestrator children"
)

Each entry in the children map holds the child’s PID, module, and tag. You can walk further down the tree.

[coordinator_info | _] = Map.values(orch_state.children)

{:ok, coord_state} =
  Jido.AgentServer.state(coordinator_info.pid)

IO.inspect(
  Map.keys(coord_state.children),
  label: "Coordinator children (workers)"
)

IO.inspect(
  coord_state.agent.state.completed_jobs,
  label: "Coordinator completed jobs"
)

The Orchestrator tracks Coordinators. Each Coordinator tracks its Workers. No layer knows about agents two levels away.

Next steps