Prerequisites
Complete AI agent with tools before starting. You need an OpenAI API key configured.
Setup
Mix.install([
{:jido, "~> 2.0"},
{:jido_ai, github: "agentjido/jido_ai", branch: "main"},
{:req_llm, "~> 1.6"}
])
Configure credentials
Set your OpenAI API key. In Livebook, add OPENAI_API_KEY as a Livebook Secret prefixed with LB_.
openai_key = System.get_env("LB_OPENAI_API_KEY") || System.get_env("OPENAI_API_KEY")
if openai_key do
ReqLLM.put_key(:openai_api_key, openai_key)
:configured
else
raise "Set OPENAI_API_KEY as a Livebook Secret or environment variable."
end
Beyond one-shot answers
A single LLM call can answer a question, but real work requires planning. “Write a README for a new library” breaks into research, outlining, drafting sections, and review. Each step depends on the last.
This tutorial builds an Agent that decomposes a goal into tasks, stores them in a Memory Space, and works through each task using the ReAct loop. By the end you will run this:
{:ok, pid} = Jido.AgentServer.start_link(agent: MyApp.TaskAgent)
{:ok, result} = MyApp.TaskAgent.execute(
pid,
"Write a README for an Elixir HTTP client library",
timeout: 120_000
)
IO.puts(result)
The Agent creates 5 tasks, starts each one, produces a substantive result, marks it complete, and moves to the next. Tasks persist in Memory across ReAct iterations.
Memory Spaces for task storage
Jido.Memory.Agent provides helpers to manage structured data inside an Agent’s state. Memory is organized into Spaces, each identified by an atom key.
alias Jido.Memory.Agent, as: MemoryAgent
# Initialize Memory on the Agent (idempotent)
agent = MemoryAgent.ensure(agent)
# Read a Space (returns %{data: ...} or nil)
case MemoryAgent.space(agent, :tasks) do
%{data: tasks} when is_list(tasks) -> tasks
_ -> []
end
# Update a Space with a transform function
agent = MemoryAgent.update_space(agent, :tasks, fn space ->
%{space | data: [new_task | space.data]}
end)
Spaces hold arbitrary data. The :tasks Space stores a list of task maps, each with id, title, description, status, priority, and timestamps. The Agent loads tasks from Memory before each ReAct iteration and writes changes back after.
Define the Task Tools
Each tool is a Jido.Action that the LLM calls through the ReAct loop. Tools read the current task list from context.tool_context.tasks, which the Agent injects via lifecycle hooks.
AddTasks creates new tasks from a goal decomposition:
defmodule MyApp.Tools.AddTasks do
use Jido.Action,
name: "tasklist_add_tasks",
description: "Add new tasks to the task list.",
schema: Zoi.object(%{
tasks: Zoi.list(Zoi.map(),
description: "List of task maps with title, description, priority"
)
})
@impl true
def run(%{tasks: tasks}, _context) do
now = DateTime.utc_now() |> DateTime.to_iso8601()
created = Enum.with_index(tasks, 1) |> Enum.map(fn {task, i} ->
%{
"id" => Base.url_encode64(:crypto.strong_rand_bytes(8), padding: false),
"title" => Map.get(task, :title) || Map.get(task, "title", "Task #{i}"),
"description" => Map.get(task, :description) || Map.get(task, "description"),
"status" => "pending",
"priority" => Map.get(task, :priority) || Map.get(task, "priority", 100),
"created_at" => now, "updated_at" => now
}
end)
{:ok, %{action: "tasks_added", created_tasks: created, count: length(created)}}
end
end
GetState returns the full task list with summary counts:
defmodule MyApp.Tools.GetState do
use Jido.Action,
name: "tasklist_get_state",
description: "Get current task list state and summary counts.",
schema: [
status_filter: [
type: :string,
required: false,
doc: "Filter: 'pending', 'in_progress', 'done', 'blocked', or 'all'"
]
]
@impl true
def run(params, context) do
tasks = context.tool_context.tasks
filter = Map.get(params, :status_filter, "all")
filtered = if filter == "all", do: tasks,
else: Enum.filter(tasks, &(&1["status"] == filter))
summary = %{
total: length(tasks),
pending: Enum.count(tasks, &(&1["status"] == "pending")),
in_progress: Enum.count(tasks, &(&1["status"] == "in_progress")),
done: Enum.count(tasks, &(&1["status"] == "done"))
}
{:ok, %{tasks: filtered, summary: summary}}
end
end
NextTask returns the highest-priority pending task:
defmodule MyApp.Tools.NextTask do
use Jido.Action,
name: "tasklist_next_task",
description: "Get the next pending task (highest priority first).",
schema: []
@impl true
def run(_params, context) do
tasks = context.tool_context.tasks
next = tasks
|> Enum.filter(&(&1["status"] == "pending"))
|> Enum.sort_by(&(&1["priority"] || 100))
|> List.first()
case next do
nil -> {:ok, %{status: "all_complete", message: "All tasks are complete."}}
task -> {:ok, %{status: "next_task", task: task}}
end
end
end
StartTask and CompleteTask transition task status:
defmodule MyApp.Tools.StartTask do
use Jido.Action,
name: "tasklist_start_task",
description: "Mark a task as in-progress before working on it.",
schema: [
task_id: [type: :string, required: true, doc: "The task ID to start"]
]
@impl true
def run(%{task_id: id}, context) do
case Enum.find(context.tool_context.tasks, &(&1["id"] == id)) do
nil -> {:ok, %{action: "task_not_found", task_id: id}}
task ->
updated = %{task | "status" => "in_progress",
"updated_at" => DateTime.utc_now() |> DateTime.to_iso8601()}
{:ok, %{action: "task_started", task: updated}}
end
end
end
defmodule MyApp.Tools.CompleteTask do
use Jido.Action,
name: "tasklist_complete_task",
description: "Mark a task as done with a result summary.",
schema: [
task_id: [type: :string, required: true, doc: "The task ID to complete"],
result: [type: :string, required: false, doc: "What was accomplished"]
]
@impl true
def run(%{task_id: id} = params, context) do
case Enum.find(context.tool_context.tasks, &(&1["id"] == id)) do
nil -> {:ok, %{action: "task_not_found", task_id: id}}
task ->
updated = %{task | "status" => "done", "result" => params[:result],
"updated_at" => DateTime.utc_now() |> DateTime.to_iso8601()}
{:ok, %{action: "task_completed", task: updated}}
end
end
end
Build the Agent
Wire the tools into an Agent with a system prompt that enforces a mandatory workflow. The prompt is critical: without explicit instructions, the LLM tends to skip tools and answer directly.
defmodule MyApp.TaskAgent do
alias Jido.Memory.Agent, as: MemoryAgent
use Jido.AI.Agent,
name: "task_agent",
description: "Decomposes goals into tasks and executes them",
tools: [
MyApp.Tools.AddTasks,
MyApp.Tools.GetState,
MyApp.Tools.NextTask,
MyApp.Tools.StartTask,
MyApp.Tools.CompleteTask
],
model: "openai:gpt-4o-mini",
max_iterations: 25,
system_prompt: """
You are a task planning agent. You MUST use tasklist tools
to manage your work. NEVER answer directly without using tools.
MANDATORY WORKFLOW:
1. Call tasklist_get_state to check for existing tasks
2. If no tasks exist, call tasklist_add_tasks with 3-7 tasks
3. Call tasklist_next_task to get the next pending task
4. Call tasklist_start_task with the task_id
5. Do the work and call tasklist_complete_task with a result
6. Repeat steps 3-5 until all tasks are complete
7. Only then provide your final summary
Call exactly ONE tool per message. Wait for each result.
"""
end
Setting max_iterations: 25 gives the Agent enough room for a 5-task plan: each task needs get_next, start, and complete calls, plus the initial get_state and add_tasks. That is roughly 17 iterations for 5 tasks.
Lifecycle hooks
The Agent needs to load tasks from Memory before each iteration and sync changes back after. This happens in two callbacks.
on_before_cmd/2 runs before the ReAct strategy starts. It ensures Memory is initialized, loads the current task list, and injects it into tool_context so every tool can access it:
@impl true
def on_before_cmd(agent, {:ai_react_start, %{query: _} = params} = _action) do
agent = MemoryAgent.ensure(agent)
tasks = load_tasks(agent)
existing_ctx = Map.get(params, :tool_context, %{})
updated_params = Map.put(params, :tool_context, Map.put(existing_ctx, :tasks, tasks))
{:ok, agent, {:ai_react_start, updated_params}}
end
@impl true
def on_before_cmd(agent, action), do: {:ok, agent, action}
defp load_tasks(agent) do
case MemoryAgent.space(agent, :tasks) do
%{data: tasks} when is_list(tasks) -> tasks
_ -> []
end
end
on_after_cmd/3 runs after each ReAct iteration completes. It extracts tool results from the conversation and updates Memory:
@impl true
def on_after_cmd(agent, _action, directives) do
snap = strategy_snapshot(agent)
agent = sync_tasks_from_conversation(agent, snap)
{:ok, agent, directives}
end
The data flow is: Memory -> tool_context -> tools -> LLM -> tool calls -> conversation -> extract -> Memory.
Task state synchronization
After each iteration, the Agent scans the conversation for tool results and applies them to the task list. This is the sync logic:
defp sync_tasks_from_conversation(agent, snap) do
conversation = get_in(snap, [:details, :conversation]) || []
current_tasks = load_tasks(agent)
tool_results = conversation
|> Enum.filter(fn msg ->
Map.get(msg, :role) == :tool || Map.get(msg, "role") == "tool"
end)
|> Enum.flat_map(fn msg ->
content = Map.get(msg, :content) || Map.get(msg, "content", "")
case Jason.decode(content) do
{:ok, %{"action" => _} = result} -> [result]
{:ok, %{"created_tasks" => _} = result} -> [result]
_ -> []
end
end)
updated_tasks = Enum.reduce(tool_results, current_tasks, &apply_action/2)
persist_tasks(agent, updated_tasks)
end
apply_action/2 pattern-matches on the action type to merge changes:
defp apply_action(%{"action" => "tasks_added", "created_tasks" => new}, tasks) do
existing_ids = MapSet.new(tasks, & &1["id"])
to_add = Enum.reject(new, &MapSet.member?(existing_ids, &1["id"]))
tasks ++ to_add
end
defp apply_action(%{"action" => action, "task" => updated}, tasks)
when action in ~w(task_started task_completed) do
Enum.map(tasks, fn t ->
if t["id"] == updated["id"], do: updated, else: t
end)
end
defp apply_action(_, tasks), do: tasks
defp persist_tasks(agent, tasks) do
agent = MemoryAgent.ensure(agent)
MemoryAgent.update_space(agent, :tasks, fn space ->
%{space | data: tasks}
end)
end
Helper methods
Wrap ask_sync/3 in domain-specific functions. Callers use structured methods instead of constructing prompt strings:
@default_timeout 120_000
@spec plan(pid(), String.t(), keyword()) :: {:ok, String.t()} | {:error, term()}
def plan(pid, goal, opts \\ []) do
query = """
Plan the following goal by creating a task list, but DO NOT execute.
Just create the plan and show the task list.
Goal: #{goal}
"""
ask_sync(pid, query, Keyword.put_new(opts, :timeout, @default_timeout))
end
@spec execute(pid(), String.t(), keyword()) :: {:ok, String.t()} | {:error, term()}
def execute(pid, goal, opts \\ []) do
query = """
Plan and execute the following goal. Break it into tasks, then work
through each task to completion.
Goal: #{goal}
"""
ask_sync(pid, query, Keyword.put_new(opts, :timeout, @default_timeout))
end
@spec status(pid(), keyword()) :: {:ok, String.t()} | {:error, term()}
def status(pid, opts \\ []) do
ask_sync(pid, "Show the current status of all tasks.", Keyword.put_new(opts, :timeout, @default_timeout))
end
@spec resume(pid(), keyword()) :: {:ok, String.t()} | {:error, term()}
def resume(pid, opts \\ []) do
ask_sync(pid, "Continue working on pending tasks until all are complete.", Keyword.put_new(opts, :timeout, @default_timeout))
end
Run a multi-step goal
Start the Agent and give it a goal. The Agent will decompose it into tasks, work through each one, and deliver combined results:
{:ok, pid} = Jido.AgentServer.start_link(agent: MyApp.TaskAgent)
{:ok, result} = MyApp.TaskAgent.execute(
pid,
"Write a README for an Elixir HTTP client library called Fetch",
timeout: 120_000
)
IO.puts(result)
The Agent typically creates tasks like “Define project overview”, “Document installation steps”, “Write usage examples”, “Add configuration reference”, and “Create contributing guidelines”. It starts each task, produces content, marks it complete, and moves to the next.
Check progress at any point with status/2:
{:ok, status} = MyApp.TaskAgent.status(pid)
IO.puts(status)
If the Agent hits max_iterations before finishing, resume with resume/2:
{:ok, result} = MyApp.TaskAgent.resume(pid)
IO.puts(result)
Use plan/3 to generate a task list without executing. This is useful for reviewing the decomposition before committing to execution:
{:ok, plan} = MyApp.TaskAgent.plan(
pid,
"Set up CI/CD for an Elixir project on GitHub Actions"
)
IO.puts(plan)
> Output varies between runs. The LLM generates different task decompositions and content each time.
Next steps
- Add persistent memory and semantic recall in Memory and retrieval-augmented agents.
- Understand how Memory and Thread work as default Plugins in Plugins concept.
- Revisit foundational tool patterns in AI agent with tools.