Powered by AppSignal & Oban Pro

Prerequisites

task-planning-and-execution.livemd

Prerequisites

Complete AI agent with tools before starting. You need an OpenAI API key configured.

Setup

Mix.install([
  {:jido, "~> 2.0"},
  {:jido_ai, github: "agentjido/jido_ai", branch: "main"},
  {:req_llm, "~> 1.6"}
])

Configure credentials

Set your OpenAI API key. In Livebook, add OPENAI_API_KEY as a Livebook Secret prefixed with LB_.

openai_key = System.get_env("LB_OPENAI_API_KEY") || System.get_env("OPENAI_API_KEY")

if openai_key do
  ReqLLM.put_key(:openai_api_key, openai_key)
  :configured
else
  raise "Set OPENAI_API_KEY as a Livebook Secret or environment variable."
end

Beyond one-shot answers

A single LLM call can answer a question, but real work requires planning. “Write a README for a new library” breaks into research, outlining, drafting sections, and review. Each step depends on the last.

This tutorial builds an Agent that decomposes a goal into tasks, stores them in a Memory Space, and works through each task using the ReAct loop. By the end you will run this:

{:ok, pid} = Jido.AgentServer.start_link(agent: MyApp.TaskAgent)

{:ok, result} = MyApp.TaskAgent.execute(
  pid,
  "Write a README for an Elixir HTTP client library",
  timeout: 120_000
)

IO.puts(result)

The Agent creates 5 tasks, starts each one, produces a substantive result, marks it complete, and moves to the next. Tasks persist in Memory across ReAct iterations.

Memory Spaces for task storage

Jido.Memory.Agent provides helpers to manage structured data inside an Agent’s state. Memory is organized into Spaces, each identified by an atom key.

alias Jido.Memory.Agent, as: MemoryAgent

# Initialize Memory on the Agent (idempotent)
agent = MemoryAgent.ensure(agent)

# Read a Space (returns %{data: ...} or nil)
case MemoryAgent.space(agent, :tasks) do
  %{data: tasks} when is_list(tasks) -> tasks
  _ -> []
end

# Update a Space with a transform function
agent = MemoryAgent.update_space(agent, :tasks, fn space ->
  %{space | data: [new_task | space.data]}
end)

Spaces hold arbitrary data. The :tasks Space stores a list of task maps, each with id, title, description, status, priority, and timestamps. The Agent loads tasks from Memory before each ReAct iteration and writes changes back after.

Define the Task Tools

Each tool is a Jido.Action that the LLM calls through the ReAct loop. Tools read the current task list from context.tool_context.tasks, which the Agent injects via lifecycle hooks.

AddTasks creates new tasks from a goal decomposition:

defmodule MyApp.Tools.AddTasks do
  use Jido.Action,
    name: "tasklist_add_tasks",
    description: "Add new tasks to the task list.",
    schema: Zoi.object(%{
      tasks: Zoi.list(Zoi.map(),
        description: "List of task maps with title, description, priority"
      )
    })

  @impl true
  def run(%{tasks: tasks}, _context) do
    now = DateTime.utc_now() |> DateTime.to_iso8601()

    created = Enum.with_index(tasks, 1) |> Enum.map(fn {task, i} ->
      %{
        "id" => Base.url_encode64(:crypto.strong_rand_bytes(8), padding: false),
        "title" => Map.get(task, :title) || Map.get(task, "title", "Task #{i}"),
        "description" => Map.get(task, :description) || Map.get(task, "description"),
        "status" => "pending",
        "priority" => Map.get(task, :priority) || Map.get(task, "priority", 100),
        "created_at" => now, "updated_at" => now
      }
    end)

    {:ok, %{action: "tasks_added", created_tasks: created, count: length(created)}}
  end
end

GetState returns the full task list with summary counts:

defmodule MyApp.Tools.GetState do
  use Jido.Action,
    name: "tasklist_get_state",
    description: "Get current task list state and summary counts.",
    schema: [
      status_filter: [
        type: :string,
        required: false,
        doc: "Filter: 'pending', 'in_progress', 'done', 'blocked', or 'all'"
      ]
    ]

  @impl true
  def run(params, context) do
    tasks = context.tool_context.tasks
    filter = Map.get(params, :status_filter, "all")

    filtered = if filter == "all", do: tasks,
      else: Enum.filter(tasks, &(&1["status"] == filter))

    summary = %{
      total: length(tasks),
      pending: Enum.count(tasks, &(&1["status"] == "pending")),
      in_progress: Enum.count(tasks, &(&1["status"] == "in_progress")),
      done: Enum.count(tasks, &(&1["status"] == "done"))
    }

    {:ok, %{tasks: filtered, summary: summary}}
  end
end

NextTask returns the highest-priority pending task:

defmodule MyApp.Tools.NextTask do
  use Jido.Action,
    name: "tasklist_next_task",
    description: "Get the next pending task (highest priority first).",
    schema: []

  @impl true
  def run(_params, context) do
    tasks = context.tool_context.tasks

    next = tasks
      |> Enum.filter(&(&1["status"] == "pending"))
      |> Enum.sort_by(&(&1["priority"] || 100))
      |> List.first()

    case next do
      nil -> {:ok, %{status: "all_complete", message: "All tasks are complete."}}
      task -> {:ok, %{status: "next_task", task: task}}
    end
  end
end

StartTask and CompleteTask transition task status:

defmodule MyApp.Tools.StartTask do
  use Jido.Action,
    name: "tasklist_start_task",
    description: "Mark a task as in-progress before working on it.",
    schema: [
      task_id: [type: :string, required: true, doc: "The task ID to start"]
    ]

  @impl true
  def run(%{task_id: id}, context) do
    case Enum.find(context.tool_context.tasks, &(&1["id"] == id)) do
      nil -> {:ok, %{action: "task_not_found", task_id: id}}
      task ->
        updated = %{task | "status" => "in_progress",
          "updated_at" => DateTime.utc_now() |> DateTime.to_iso8601()}
        {:ok, %{action: "task_started", task: updated}}
    end
  end
end
defmodule MyApp.Tools.CompleteTask do
  use Jido.Action,
    name: "tasklist_complete_task",
    description: "Mark a task as done with a result summary.",
    schema: [
      task_id: [type: :string, required: true, doc: "The task ID to complete"],
      result: [type: :string, required: false, doc: "What was accomplished"]
    ]

  @impl true
  def run(%{task_id: id} = params, context) do
    case Enum.find(context.tool_context.tasks, &(&1["id"] == id)) do
      nil -> {:ok, %{action: "task_not_found", task_id: id}}
      task ->
        updated = %{task | "status" => "done", "result" => params[:result],
          "updated_at" => DateTime.utc_now() |> DateTime.to_iso8601()}
        {:ok, %{action: "task_completed", task: updated}}
    end
  end
end

Build the Agent

Wire the tools into an Agent with a system prompt that enforces a mandatory workflow. The prompt is critical: without explicit instructions, the LLM tends to skip tools and answer directly.

defmodule MyApp.TaskAgent do
  alias Jido.Memory.Agent, as: MemoryAgent

  use Jido.AI.Agent,
    name: "task_agent",
    description: "Decomposes goals into tasks and executes them",
    tools: [
      MyApp.Tools.AddTasks,
      MyApp.Tools.GetState,
      MyApp.Tools.NextTask,
      MyApp.Tools.StartTask,
      MyApp.Tools.CompleteTask
    ],
    model: "openai:gpt-4o-mini",
    max_iterations: 25,
    system_prompt: """
    You are a task planning agent. You MUST use tasklist tools
    to manage your work. NEVER answer directly without using tools.

    MANDATORY WORKFLOW:
    1. Call tasklist_get_state to check for existing tasks
    2. If no tasks exist, call tasklist_add_tasks with 3-7 tasks
    3. Call tasklist_next_task to get the next pending task
    4. Call tasklist_start_task with the task_id
    5. Do the work and call tasklist_complete_task with a result
    6. Repeat steps 3-5 until all tasks are complete
    7. Only then provide your final summary

    Call exactly ONE tool per message. Wait for each result.
    """
end

Setting max_iterations: 25 gives the Agent enough room for a 5-task plan: each task needs get_next, start, and complete calls, plus the initial get_state and add_tasks. That is roughly 17 iterations for 5 tasks.

Lifecycle hooks

The Agent needs to load tasks from Memory before each iteration and sync changes back after. This happens in two callbacks.

on_before_cmd/2 runs before the ReAct strategy starts. It ensures Memory is initialized, loads the current task list, and injects it into tool_context so every tool can access it:

@impl true
def on_before_cmd(agent, {:ai_react_start, %{query: _} = params} = _action) do
  agent = MemoryAgent.ensure(agent)
  tasks = load_tasks(agent)

  existing_ctx = Map.get(params, :tool_context, %{})
  updated_params = Map.put(params, :tool_context, Map.put(existing_ctx, :tasks, tasks))

  {:ok, agent, {:ai_react_start, updated_params}}
end

@impl true
def on_before_cmd(agent, action), do: {:ok, agent, action}

defp load_tasks(agent) do
  case MemoryAgent.space(agent, :tasks) do
    %{data: tasks} when is_list(tasks) -> tasks
    _ -> []
  end
end

on_after_cmd/3 runs after each ReAct iteration completes. It extracts tool results from the conversation and updates Memory:

@impl true
def on_after_cmd(agent, _action, directives) do
  snap = strategy_snapshot(agent)
  agent = sync_tasks_from_conversation(agent, snap)
  {:ok, agent, directives}
end

The data flow is: Memory -> tool_context -> tools -> LLM -> tool calls -> conversation -> extract -> Memory.

Task state synchronization

After each iteration, the Agent scans the conversation for tool results and applies them to the task list. This is the sync logic:

defp sync_tasks_from_conversation(agent, snap) do
  conversation = get_in(snap, [:details, :conversation]) || []
  current_tasks = load_tasks(agent)

  tool_results = conversation
    |> Enum.filter(fn msg ->
      Map.get(msg, :role) == :tool || Map.get(msg, "role") == "tool"
    end)
    |> Enum.flat_map(fn msg ->
      content = Map.get(msg, :content) || Map.get(msg, "content", "")
      case Jason.decode(content) do
        {:ok, %{"action" => _} = result} -> [result]
        {:ok, %{"created_tasks" => _} = result} -> [result]
        _ -> []
      end
    end)

  updated_tasks = Enum.reduce(tool_results, current_tasks, &apply_action/2)
  persist_tasks(agent, updated_tasks)
end

apply_action/2 pattern-matches on the action type to merge changes:

defp apply_action(%{"action" => "tasks_added", "created_tasks" => new}, tasks) do
  existing_ids = MapSet.new(tasks, & &1["id"])
  to_add = Enum.reject(new, &MapSet.member?(existing_ids, &1["id"]))
  tasks ++ to_add
end

defp apply_action(%{"action" => action, "task" => updated}, tasks)
     when action in ~w(task_started task_completed) do
  Enum.map(tasks, fn t ->
    if t["id"] == updated["id"], do: updated, else: t
  end)
end

defp apply_action(_, tasks), do: tasks
defp persist_tasks(agent, tasks) do
  agent = MemoryAgent.ensure(agent)
  MemoryAgent.update_space(agent, :tasks, fn space ->
    %{space | data: tasks}
  end)
end

Helper methods

Wrap ask_sync/3 in domain-specific functions. Callers use structured methods instead of constructing prompt strings:

@default_timeout 120_000

@spec plan(pid(), String.t(), keyword()) :: {:ok, String.t()} | {:error, term()}
def plan(pid, goal, opts \\ []) do
  query = """
  Plan the following goal by creating a task list, but DO NOT execute.
  Just create the plan and show the task list.

  Goal: #{goal}
  """
  ask_sync(pid, query, Keyword.put_new(opts, :timeout, @default_timeout))
end

@spec execute(pid(), String.t(), keyword()) :: {:ok, String.t()} | {:error, term()}
def execute(pid, goal, opts \\ []) do
  query = """
  Plan and execute the following goal. Break it into tasks, then work
  through each task to completion.

  Goal: #{goal}
  """
  ask_sync(pid, query, Keyword.put_new(opts, :timeout, @default_timeout))
end

@spec status(pid(), keyword()) :: {:ok, String.t()} | {:error, term()}
def status(pid, opts \\ []) do
  ask_sync(pid, "Show the current status of all tasks.", Keyword.put_new(opts, :timeout, @default_timeout))
end

@spec resume(pid(), keyword()) :: {:ok, String.t()} | {:error, term()}
def resume(pid, opts \\ []) do
  ask_sync(pid, "Continue working on pending tasks until all are complete.", Keyword.put_new(opts, :timeout, @default_timeout))
end

Run a multi-step goal

Start the Agent and give it a goal. The Agent will decompose it into tasks, work through each one, and deliver combined results:

{:ok, pid} = Jido.AgentServer.start_link(agent: MyApp.TaskAgent)

{:ok, result} = MyApp.TaskAgent.execute(
  pid,
  "Write a README for an Elixir HTTP client library called Fetch",
  timeout: 120_000
)

IO.puts(result)

The Agent typically creates tasks like “Define project overview”, “Document installation steps”, “Write usage examples”, “Add configuration reference”, and “Create contributing guidelines”. It starts each task, produces content, marks it complete, and moves to the next.

Check progress at any point with status/2:

{:ok, status} = MyApp.TaskAgent.status(pid)
IO.puts(status)

If the Agent hits max_iterations before finishing, resume with resume/2:

{:ok, result} = MyApp.TaskAgent.resume(pid)
IO.puts(result)

Use plan/3 to generate a task list without executing. This is useful for reviewing the decomposition before committing to execution:

{:ok, plan} = MyApp.TaskAgent.plan(
  pid,
  "Set up CI/CD for an Elixir project on GitHub Actions"
)

IO.puts(plan)

> Output varies between runs. The LLM generates different task decompositions and content each time.

Next steps