Prerequisites
Complete Build your first workflow before starting. This notebook is about stateful task execution, so the first pass stays local and deterministic.
Setup
Mix.install([
{{mix_dep:jido}}
])
Logger.configure(level: :warning)
This tutorial runs entirely locally. The main path focuses on task state, memory, and resume behavior. A later section shows where AI planning can plug into the same pattern.
Start the runtime
case Jido.start() do
{:ok, _} -> :ok
{:error, {:already_started, _}} -> :ok
end
runtime = Jido.default_instance()
Memory helper
Jido.Memory.Agent stores task data under the reserved :__memory__ key in agent state. This helper gives the notebook one place to read and write the task list.
defmodule MyApp.TaskMemory do
alias Jido.Memory.Agent, as: MemoryAgent
def ensure(agent) do
agent
|> MemoryAgent.ensure()
|> MemoryAgent.ensure_space(:tasks, [])
end
def tasks(agent) do
agent = ensure(agent)
case MemoryAgent.space(agent, :tasks) do
%{data: tasks} when is_list(tasks) -> tasks
_ -> []
end
end
def put_tasks(agent, tasks) do
agent = ensure(agent)
agent
|> MemoryAgent.update_space(:tasks, fn space -> %{space | data: tasks} end)
end
def summary(tasks) do
%{
total: length(tasks),
pending: Enum.count(tasks, &(&1.status == :pending)),
done: Enum.count(tasks, &(&1.status == :done))
}
end
end
Define the actions
The agent only needs two domain actions:
-
goal.plancreates the initial task list -
task.run_nextexecutes exactly one pending task
defmodule MyApp.PlanGoalAction do
use Jido.Action,
name: "plan_goal",
schema: [
goal: [type: :string, required: true]
]
def run(%{goal: goal}, context) do
tasks = build_tasks(goal)
agent = MyApp.TaskAgent.new(state: context.state)
agent = MyApp.TaskMemory.put_tasks(agent, tasks)
{:ok,
%{
goal: goal,
status: :planned,
current_task_id: nil,
last_result: nil,
planned_count: length(tasks),
summary: MyApp.TaskMemory.summary(tasks),
__memory__: agent.state.__memory__
}}
end
defp build_tasks(goal) do
[
%{
id: "scope",
title: "Clarify scope",
description: "Define the deliverable and audience for #{goal}",
status: :pending
},
%{
id: "draft",
title: "Create first draft",
description: "Produce the first working output for #{goal}",
status: :pending
},
%{
id: "review",
title: "Review and finalize",
description: "Check gaps, polish the output, and finalize #{goal}",
status: :pending
}
]
end
end
defmodule MyApp.RunNextTaskAction do
use Jido.Action,
name: "run_next_task",
schema: []
def run(_params, context) do
agent = MyApp.TaskAgent.new(state: context.state)
tasks = MyApp.TaskMemory.tasks(agent)
case Enum.find(tasks, &(&1.status == :pending)) do
nil ->
{:ok,
%{
status: :complete,
current_task_id: nil,
last_result: "All tasks are complete.",
summary: MyApp.TaskMemory.summary(tasks)
}}
task ->
result = execute_task(task, context.state.goal)
updated_tasks =
Enum.map(tasks, fn current ->
if current.id == task.id do
Map.merge(current, %{status: :done, result: result})
else
current
end
end)
agent = MyApp.TaskMemory.put_tasks(agent, updated_tasks)
summary = MyApp.TaskMemory.summary(updated_tasks)
status = if summary.pending == 0, do: :complete, else: :working
{:ok,
%{
status: status,
current_task_id: task.id,
last_result: result,
summary: summary,
__memory__: agent.state.__memory__
}}
end
end
defp execute_task(task, goal) do
case task.id do
"scope" ->
"Scope defined for #{goal}. Audience, deliverable, and success criteria are now explicit."
"draft" ->
"First draft prepared for #{goal}. The core structure exists and is ready for review."
"review" ->
"Review completed for #{goal}. Final polish and delivery notes are recorded."
end
end
end
Define the agent
The task list itself lives in Memory. The visible schema only tracks high-level progress.
defmodule MyApp.TaskAgent do
use Jido.Agent,
name: "task_agent",
description: "Plans a goal into tasks and executes them one step at a time",
schema: [
goal: [type: :string, default: nil],
status: [type: :atom, default: :idle],
current_task_id: [type: :string, default: nil],
last_result: [type: :string, default: nil],
planned_count: [type: :integer, default: 0],
summary: [type: :map, default: %{total: 0, pending: 0, done: 0}]
]
def signal_routes(_ctx) do
[
{"goal.plan", MyApp.PlanGoalAction},
{"task.run_next", MyApp.RunNextTaskAction}
]
end
end
Livebook helpers
defmodule MyApp.TaskHelpers do
def send_signal(pid, type, data \\ %{}) do
signal = Jido.Signal.new!(type, data, source: "/livebook")
Jido.AgentServer.call(pid, signal)
end
def snapshot(pid) do
{:ok, server_state} = Jido.AgentServer.state(pid)
%{
state: server_state.agent.state,
tasks: MyApp.TaskMemory.tasks(server_state.agent)
}
end
def run_until_complete(pid, max_steps \\ 10) do
Enum.reduce_while(1..max_steps, snapshot(pid), fn _, _acc ->
current = snapshot(pid)
if current.state.status == :complete do
{:halt, current}
else
{:ok, _agent} = send_signal(pid, "task.run_next")
{:cont, snapshot(pid)}
end
end)
end
end
Plan one goal
Start the agent and create the task list.
task_agent_id = "task-agent-#{System.unique_integer([:positive])}"
{:ok, pid} =
Jido.start_agent(
runtime,
MyApp.TaskAgent,
id: task_agent_id
)
{:ok, _agent} =
MyApp.TaskHelpers.send_signal(
pid,
"goal.plan",
%{goal: "Write a README for an Elixir HTTP client library called Fetch"}
)
plan_snapshot = MyApp.TaskHelpers.snapshot(pid)
IO.inspect(plan_snapshot.state.summary, label: "Summary")
Enum.each(plan_snapshot.tasks, fn task ->
IO.inspect({task.id, task.title, task.status}, label: "Task")
end)
You should see three pending tasks in memory.
Execute one task
Advance exactly one task so the state transition is easy to inspect.
{:ok, _agent} = MyApp.TaskHelpers.send_signal(pid, "task.run_next")
one_step_snapshot = MyApp.TaskHelpers.snapshot(pid)
IO.inspect(one_step_snapshot.state.status, label: "Agent status")
IO.inspect(one_step_snapshot.state.last_result, label: "Last result")
Enum.each(one_step_snapshot.tasks, fn task ->
IO.inspect({task.id, task.status, Map.get(task, :result)}, label: "Task state")
end)
At this point, one task is :done and the rest remain :pending. That is the key resume property: you can stop here and continue later without rebuilding the plan.
Resume until complete
final_snapshot = MyApp.TaskHelpers.run_until_complete(pid)
IO.inspect(final_snapshot.state.summary, label: "Final summary")
Enum.each(final_snapshot.tasks, fn task ->
IO.inspect({task.id, task.status, task.result}, label: "Completed task")
end)
The agent reuses the task list already stored in Memory, so each task.run_next call only needs the current agent state.
Where AI planning fits
The runnable path above keeps planning deterministic so the task loop is easy to understand. In a production AI agent, the most common upgrade is to replace build_tasks/1 with model-backed decomposition and keep the rest of the state machine intact.
For example, you can generate the initial task list with a planning action:
{:ok, result} =
Jido.Exec.run(Jido.AI.Actions.Planning.Decompose, %{
goal: "Write a README for an Elixir HTTP client library called Fetch",
max_depth: 2,
model: "openai:gpt-4o-mini"
})
Or expose task actions as tools inside a Jido.AI.Agent so the model decides when to advance:
defmodule MyApp.AITaskAgent do
use Jido.AI.Agent,
name: "ai_task_agent",
tools: [MyApp.PlanGoalAction, MyApp.RunNextTaskAction]
end
The important part is the separation of concerns:
- task storage stays in Memory
- execution stays one step at a time
- AI, if you add it, only decides what task list to create or when to advance
Next steps
- Add richer memory layers in Memory and retrieval-augmented agents
- Coordinate multiple task agents in Multi-agent orchestration
- Compare this explicit loop with the chat-style AI interface in Build an AI chat agent