Powered by AppSignal & Oban Pro

Meta Planner: Autonomous Planning

livebooks/meta_planner.livemd

Meta Planner: Autonomous Planning

repo_root = Path.expand("..", __DIR__)

deps =
  if File.exists?(Path.join(repo_root, "mix.exs")) do
    [{:ptc_runner, path: repo_root}, {:llm_client, path: Path.join(repo_root, "llm_client")}]
  else
    [{:ptc_runner, "~> 0.6.0"}]
  end

Mix.install(deps ++ [{:req_llm, "~> 1.0"}, {:kino, "~> 0.14"}], consolidate_protocols: false)

Setup

local_path = Path.join(__DIR__, "llm_setup.exs")

if File.exists?(local_path) do
  Code.require_file(local_path)
else
  %{body: code} = Req.get!("https://raw.githubusercontent.com/andreasronge/ptc_runner/main/livebooks/llm_setup.exs")
  Code.eval_string(code)
end

setup = LLMSetup.setup()
setup = LLMSetup.choose_provider(setup)
my_llm = LLMSetup.choose_model(setup)

Overview

The Meta Planner generates execution plans from natural language missions and automatically self-corrects when tasks fail verification.

Component Purpose
MetaPlanner Generates plans from missions
PlanExecutor Executes with automatic replanning
PlanRunner Low-level single-attempt execution

Debug: Inspect LLM Communication

Use a debug wrapper to see exactly what’s being sent and returned:

# Wrap the LLM to inspect requests and responses
debug_llm = fn input ->
  IO.puts("\n--- LLM Request ---")
  IO.puts("Output mode: #{inspect(input[:output])}")
  IO.puts("Schema: #{inspect(input[:schema], pretty: true, limit: 5)}")
  IO.puts("System prompt length: #{String.length(input[:system])} chars")
  IO.puts("User message: #{String.slice(hd(input[:messages]).content, 0, 200)}...")

  result = my_llm.(input)

  IO.puts("\n--- LLM Response ---")
  case result do
    {:ok, %{content: content}} ->
      IO.puts("Content type: #{if is_binary(content), do: "string", else: "other"}")
      IO.puts("Content preview: #{String.slice(to_string(content), 0, 300)}...")
    {:ok, content} when is_binary(content) ->
      IO.puts("Raw string: #{String.slice(content, 0, 300)}...")
    {:error, reason} ->
      IO.puts("Error: #{inspect(reason)}")
  end

  result
end

:ok

Basic Plan Generation

Generate a plan from a mission without executing it:

alias PtcRunner.MetaPlanner
alias PtcRunner.Plan

mission = "Compare the prices of AAPL and MSFT stocks"

# Use debug_llm to see what's happening, or my_llm for normal operation
result = MetaPlanner.plan(mission,
  llm: my_llm,  # Change to my_llm after debugging
  available_tools: %{
    "fetch_price" => "Fetch stock price. Input: {symbol: string}. Output: {symbol, price, currency}"
  }
)

case result do
  {:ok, plan} ->
    IO.puts("\nGenerated #{length(plan.tasks)} task(s)")
    IO.puts("Agents: #{inspect(Map.keys(plan.agents))}")

    for task <- plan.tasks do
      deps = if task.depends_on == [], do: "", else: " (depends: #{Enum.join(task.depends_on, ", ")})"
      IO.puts("  #{task.id}#{deps}: #{String.slice(to_string(task.input), 0, 60)}...")
    end

    plan

  {:error, reason} ->
    IO.puts("Failed: #{inspect(reason)}")
    reason
end

Execute a Plan with Mock Tools

alias PtcRunner.PlanExecutor

# Mock stock API
mock_tools = %{
  "fetch_price" => fn %{"symbol" => symbol} ->
    prices = %{"AAPL" => 185.50, "MSFT" => 425.00, "GOOGL" => 175.25}
    case Map.get(prices, symbol) do
      nil -> {:error, "Unknown symbol: #{symbol}"}
      price -> {:ok, %{"symbol" => symbol, "price" => price, "currency" => "USD"}}
    end
  end
}

mission = "Fetch stock prices for AAPL and MSFT, then tell me which is higher"

result = PlanExecutor.run(mission,
  llm: my_llm,
  available_tools: %{
    "fetch_price" => "Fetch stock price. Input: {symbol: string}. Output: {symbol, price, currency}"
  },
  base_tools: mock_tools,
  max_turns: 3,
  timeout: 60_000
)

case result do
  {:ok, results, metadata} ->
    IO.puts("Success after #{metadata.replan_count} replans")
    IO.puts("Duration: #{metadata.total_duration_ms}ms")
    IO.puts("\nResults:")
    for {task_id, value} <- results do
      IO.puts("  #{task_id}: #{inspect(value, limit: 100)}")
    end

  {:error, reason, metadata} ->
    IO.puts("Failed: #{inspect(reason)}")
    IO.puts("Attempts: #{metadata.execution_attempts}")
end

result

Visualize Execution with TraceTree

Use the interactive TraceTree widget to visualize agent execution. Collect steps via the on_event callback:

# Step collector using Agent
{:ok, step_collector} = Agent.start_link(fn -> [] end)

on_event = fn
  {:task_step, %{task_id: _id, step: step}} ->
    Agent.update(step_collector, fn steps -> [step | steps] end)
  _other ->
    :ok
end

mission = "Fetch stock prices for AAPL and GOOGL"

result = PlanExecutor.run(mission,
  llm: my_llm,
  available_tools: %{
    "fetch_price" => "Fetch stock price. Input: {symbol: string}. Output: {symbol, price, currency}"
  },
  base_tools: mock_tools,
  max_turns: 3,
  timeout: 60_000,
  on_event: on_event
)

# Get collected steps
steps = Agent.get(step_collector, &amp; &amp;1) |> Enum.reverse()
Agent.stop(step_collector)

IO.puts("Collected #{length(steps)} step(s)")
steps
# Render interactive trace tree (agent hierarchy with expandable details)
if steps != [] do
  PtcRunner.Kino.TraceTree.new(steps)
else
  Kino.Markdown.new("*No steps collected - run the cell above first*")
end

Self-Correction with Verification

Tasks can include verification predicates. When verification fails, the planner can automatically generate a repair plan.

alias PtcRunner.Plan

# A plan with strict verification that will trigger replanning
raw_plan = %{
  "agents" => %{
    "fetcher" => %{
      "prompt" => "You fetch data. Always return JSON.",
      "tools" => ["fetch_data"]
    }
  },
  "tasks" => [
    %{
      "id" => "get_info",
      "agent" => "fetcher",
      "input" => "Fetch information and return with 'status' and 'data' fields",
      # Verification: result must have status = "ok"
      "verification" => "(= (get data/result \"status\") \"ok\")",
      "on_verification_failure" => "replan"
    }
  ]
}

{:ok, plan} = Plan.parse(raw_plan)

# Flaky tool: fails twice, succeeds on third attempt
attempt_counter = Agent.start_link(fn -> 0 end) |> elem(1)

flaky_tools = %{
  "fetch_data" => fn _args ->
    attempt = Agent.get_and_update(attempt_counter, fn n -> {n + 1, n + 1} end)
    IO.puts("  [fetch_data] Attempt ##{attempt}")

    case attempt do
      1 -> {:ok, %{"status" => "error", "message" => "Service unavailable"}}
      2 -> {:ok, %{"status" => "pending", "data" => nil}}
      _ -> {:ok, %{"status" => "ok", "data" => %{"value" => 42}}}
    end
  end
}

result = PlanExecutor.execute(plan, "Fetch data with status ok",
  llm: my_llm,
  base_tools: flaky_tools,
  max_turns: 3,
  max_total_replans: 3,
  replan_cooldown_ms: 100,
  timeout: 120_000
)

Agent.stop(attempt_counter)

case result do
  {:ok, metadata} ->
    IO.puts("\nSuccess after #{metadata.replan_count} replans")
    IO.puts("Results: #{inspect(metadata.results)}")

    if metadata.replan_history != [] do
      IO.puts("\nReplan history (lessons learned):")
      for entry <- metadata.replan_history do
        IO.puts("  - Task '#{entry.task_id}': #{entry.diagnosis}")
      end
    end

  {:error, reason, metadata} ->
    IO.puts("\nFailed: #{inspect(reason)}")
    IO.puts("Replans attempted: #{metadata.replan_count}")
end

result

Trial History: Learning from Failures

When replanning occurs multiple times, the Meta Planner receives a “Trial & Error History” showing what approaches failed. This helps the LLM avoid repeating mistakes.

# Track what the LLM sees in replan prompts
replan_prompts = Agent.start_link(fn -> [] end) |> elem(1)

# Wrap LLM to capture replan prompts
capturing_llm = fn %{messages: messages} = input ->
  prompt = hd(messages).content

  if String.contains?(prompt, "repair specialist") do
    Agent.update(replan_prompts, fn list -> list ++ [prompt] end)

    has_history = String.contains?(prompt, "Trial & Error History")
    IO.puts("\n  [Replan] Trial history included: #{has_history}")
  end

  my_llm.(input)
end

# Plan with verification that requires specific token
raw_plan = %{
  "tasks" => [
    %{
      "id" => "get_token",
      "input" => "Call the token_api tool",
      "verification" => "(= (get data/result \"token\") \"VALID\")",
      "on_verification_failure" => "replan"
    }
  ]
}

{:ok, plan} = Plan.parse(raw_plan)

# Tool fails twice before returning valid token
token_counter = Agent.start_link(fn -> 0 end) |> elem(1)

token_tools = %{
  "token_api" => fn _args ->
    n = Agent.get_and_update(token_counter, fn x -> {x + 1, x + 1} end)
    IO.puts("  [token_api] Call ##{n}")

    case n do
      1 -> {:ok, %{"token" => "INVALID_1"}}
      2 -> {:ok, %{"token" => "INVALID_2"}}
      _ -> {:ok, %{"token" => "VALID"}}
    end
  end
}

result = PlanExecutor.execute(plan, "Get a valid token",
  llm: capturing_llm,
  base_tools: token_tools,
  max_turns: 3,
  max_total_replans: 4,
  replan_cooldown_ms: 100,
  timeout: 180_000
)

prompts = Agent.get(replan_prompts, &amp; &amp;1)
Agent.stop(replan_prompts)
Agent.stop(token_counter)

IO.puts("\n--- Summary ---")
IO.puts("Replan prompts captured: #{length(prompts)}")

for {prompt, idx} <- Enum.with_index(prompts, 1) do
  has_history = String.contains?(prompt, "Trial & Error History")
  has_reflection = String.contains?(prompt, "Self-Reflection")
  IO.puts("  Replan ##{idx}: history=#{has_history}, self_reflect=#{has_reflection}")
end

result

Interactive Mission

Try your own mission with the Meta Planner:

mission_input = Kino.Input.textarea("Mission",
  default: "What is 15% tip on a $47.50 restaurant bill?"
)
mission = Kino.Input.read(mission_input)

result = PlanExecutor.run(mission,
  llm: my_llm,
  max_turns: 3,
  max_total_replans: 2,
  timeout: 60_000
)

case result do
  {:ok, results, metadata} ->
    IO.puts("Success (#{metadata.total_duration_ms}ms, #{metadata.replan_count} replans)")
    IO.puts("\nResults:")
    for {task_id, value} <- results do
      IO.puts("  #{task_id}: #{inspect(value)}")
    end

  {:error, reason, metadata} ->
    IO.puts("Failed: #{inspect(reason)}")
    IO.puts("Attempts: #{metadata.execution_attempts}")

  {:waiting, pending, _metadata} ->
    IO.puts("Waiting for human review:")
    for p <- pending, do: IO.puts("  - #{p.task_id}")
end

result

Predefined Plan with Dependencies

Execute a hand-crafted plan with task dependencies:

raw_plan = %{
  "tasks" => [
    %{
      "id" => "step1",
      "input" => "What is the capital of France? Return as {capital: string}"
    },
    %{
      "id" => "step2",
      "input" => "What is the population of {{results.step1.capital}}? Return as {population: int}",
      "depends_on" => ["step1"]
    },
    %{
      "id" => "summary",
      "input" => "Summarize: The capital is {{results.step1.capital}} with population {{results.step2.population}}",
      "depends_on" => ["step1", "step2"],
      "type" => "synthesis_gate"
    }
  ]
}

{:ok, plan} = Plan.parse(raw_plan)

IO.puts("Plan structure:")
for task <- plan.tasks do
  deps = if task.depends_on == [], do: "(root)", else: "(after: #{Enum.join(task.depends_on, ", ")})"
  type = if task.type, do: " [#{task.type}]", else: ""
  IO.puts("  #{task.id} #{deps}#{type}")
end

result = PlanExecutor.execute(plan, "Research France",
  llm: my_llm,
  max_turns: 2,
  timeout: 60_000
)

case result do
  {:ok, metadata} ->
    IO.puts("\nResults:")
    for {id, val} <- metadata.results do
      IO.puts("  #{id}: #{inspect(val, limit: 80)}")
    end

  {:error, reason, _} ->
    IO.puts("Failed: #{inspect(reason)}")
end

result

Learn More