Powered by AppSignal & Oban Pro

Session 16: Checkpoint Project - OTP Agents

16_checkpoint_otp_agents.livemd

Session 16: Checkpoint Project - OTP Agents

Mix.install([])

Introduction

Congratulations! You’ve learned all the core OTP concepts:

  • GenServer - Process state and message handling
  • Supervisor - Fault tolerance and restarts
  • DynamicSupervisor - Runtime child management
  • Application - Lifecycle and supervision tree startup
  • Distribution - Multi-node communication

Now it’s time to put it all together by refactoring your Phase 2 agent_framework to use OTP properly.

Project Goal

Convert the manually-implemented Phase 2 code to OTP:

Phase 2 (Manual) Phase 3 (OTP)
ProcessAgent AgentServer (GenServer)
AgentMonitor AgentSupervisor (DynamicSupervisor)
Manual startup Application module

The end result will have the same functionality with:

  • ~80% less supervision code
  • Built-in debugging tools
  • Automatic application startup
  • Production-ready patterns

Part 1: Review Phase 2 Structure

Before converting, let’s review what you built in Phase 2:

# Current Phase 2 structure:
# agent_framework/
# ├── lib/
# │   └── agent_framework/
# │       ├── agent.ex          # Phase 1 struct (keep)
# │       ├── message.ex        # Phase 1 messages (keep)
# │       ├── process_agent.ex  # → Will become AgentServer
# │       ├── agent_monitor.ex  # → Will become AgentSupervisor
# │       └── agent_registry.ex # Already uses OTP Registry (keep)
# │   └── agent_framework.ex    # API module (update)
# └── test/
#     └── agent_framework/
#         ├── agent_test.exs
#         ├── message_test.exs
#         └── process_test.exs  # → Will add otp_test.exs

🤔 Pre-Conversion Reflection

# Before you start, answer these questions:

pre_conversion = %{
  # 1. Which callbacks will AgentServer need?
  agent_server_callbacks: [
    # List the GenServer callbacks you'll implement
  ],

  # 2. What will AgentSupervisor's init/1 look like?
  supervisor_init: """
  # Sketch the init function
  """,

  # 3. What children will Application.start/2 supervise?
  application_children: [
    # List the children in order
  ]
}

Part 2: Create AgentServer (GenServer)

Create a new file: lib/agent_framework/agent_server.ex

Specification

The AgentServer module should:

  1. Be a GenServer that maintains agent state
  2. Support all the operations from ProcessAgent
  3. Use proper OTP patterns

Required Functions

Client API:

  • start_link(name, opts \\ []) - Start a supervised agent
  • get_state(server) - Get full agent state
  • get_status(server) - Get just the status
  • remember(server, key, value) - Store in memory (async)
  • recall(server, key) - Retrieve from memory (sync)
  • forget_all(server) - Clear memory (async)
  • send_task(server, action, params) - Queue a task (async)
  • process_next(server) - Process next task (sync)
  • inbox_count(server) - Get inbox size (sync)

Server Callbacks:

  • init/1 - Initialize state
  • handle_call/3 - Sync operations
  • handle_cast/2 - Async operations
  • handle_info/2 - System messages
  • terminate/2 - Cleanup

Template

defmodule AgentFramework.AgentServer do
  @moduledoc """
  GenServer-based agent that processes tasks and maintains memory.

  This is the OTP version of Phase 2's ProcessAgent.

  ## Example

      {:ok, agent} = AgentServer.start_link("Worker-1")
      AgentServer.remember(agent, :context, "researching")
      AgentServer.send_task(agent, :search, %{query: "OTP"})
      {:ok, task, result} = AgentServer.process_next(agent)

  """
  use GenServer

  alias AgentFramework.Message

  # ============================================
  # Type Definitions
  # ============================================

  @type state :: %{
          name: String.t(),
          status: :idle | :busy | :waiting,
          memory: map(),
          inbox: [Message.t()],
          processed_count: non_neg_integer()
        }

  # ============================================
  # Client API
  # ============================================

  @doc """
  Start a linked AgentServer process.

  ## Options
  - `:memory` - Initial memory map (default: %{})
  - `:name` - Process registration name (optional)

  ## Examples

      {:ok, pid} = AgentServer.start_link("Worker-1")
      {:ok, pid} = AgentServer.start_link("Worker-2", memory: %{key: "value"})

  """
  def start_link(name, opts \\ []) when is_binary(name) do
    # TODO: Implement
    # Hint: Use GenServer.start_link(__MODULE__, init_arg, options)
  end

  @doc "Get the agent's full state."
  def get_state(server) do
    # TODO: Implement using GenServer.call
  end

  @doc "Get the agent's current status."
  def get_status(server) do
    # TODO: Implement
  end

  @doc "Store a value in memory (async)."
  def remember(server, key, value) do
    # TODO: Implement using GenServer.cast
  end

  @doc "Recall a value from memory (sync)."
  def recall(server, key) do
    # TODO: Implement
  end

  @doc "Clear all memory (async)."
  def forget_all(server) do
    # TODO: Implement
  end

  @doc "Send a task to the inbox (async)."
  def send_task(server, action, params \\ %{}) do
    # TODO: Implement
  end

  @doc "Process the next task (sync)."
  def process_next(server) do
    # TODO: Implement
  end

  @doc "Get the number of tasks in inbox (sync)."
  def inbox_count(server) do
    # TODO: Implement
  end

  # ============================================
  # Server Callbacks
  # ============================================

  @impl true
  def init(init_arg) do
    # TODO: Parse init_arg, create initial state
    # Return {:ok, state}
  end

  @impl true
  def handle_call(request, _from, state) do
    # TODO: Handle :get_state, :get_status, {:recall, key},
    #       :process_next, :inbox_count
  end

  @impl true
  def handle_cast(request, state) do
    # TODO: Handle {:remember, key, value}, :forget_all,
    #       {:send_task, action, params}
  end

  @impl true
  def handle_info(msg, state) do
    # Handle unexpected messages
    IO.puts("[AgentServer #{state.name}] Unexpected: #{inspect(msg)}")
    {:noreply, state}
  end

  @impl true
  def terminate(reason, state) do
    IO.puts("[AgentServer #{state.name}] Terminating: #{inspect(reason)}")
    :ok
  end

  # ============================================
  # Private Functions
  # ============================================

  # TODO: Copy handle_task functions from ProcessAgent
  # defp handle_task(%Message{...}) do ... end

  defp generate_id do
    :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
  end
end

🤔 Implementation Questions

As you implement, think about:

questions_during_implementation = %{
  # 1. Should :remember be call or cast? Why?
  remember_choice: "cast because ???",

  # 2. Should :recall be call or cast? Why?
  recall_choice: "call because ???",

  # 3. What should process_next return when inbox is empty?
  empty_inbox: "???",

  # 4. Do you need child_spec/1? When is it called?
  child_spec: "???"
}

Part 3: Create AgentSupervisor (DynamicSupervisor)

Create a new file: lib/agent_framework/agent_supervisor.ex

Specification

The AgentSupervisor module should:

  1. Be a DynamicSupervisor for dynamic agent management
  2. Allow starting/stopping agents at runtime
  3. Automatically restart crashed agents

Required Functions

  • start_link(opts) - Start the supervisor
  • start_agent(name, opts \\ []) - Start a supervised agent
  • stop_agent(pid) - Stop an agent
  • list_agents() - List all supervised agents
  • count_agents() - Count supervised agents

Template

defmodule AgentFramework.AgentSupervisor do
  @moduledoc """
  DynamicSupervisor for agent processes.

  This is the OTP version of Phase 2's AgentMonitor.

  ## Example

      # Usually started by Application
      {:ok, _} = AgentSupervisor.start_link([])

      # Start agents dynamically
      {:ok, agent1} = AgentSupervisor.start_agent("Worker-1")
      {:ok, agent2} = AgentSupervisor.start_agent("Worker-2")

      # List them
      AgentSupervisor.list_agents()

  """
  use DynamicSupervisor

  @default_name __MODULE__

  # ============================================
  # Public API
  # ============================================

  @doc """
  Start the agent supervisor.

  ## Options
  - `:name` - Process name (default: #{inspect(@default_name)})

  """
  def start_link(opts \\ []) do
    name = Keyword.get(opts, :name, @default_name)
    DynamicSupervisor.start_link(__MODULE__, opts, name: name)
  end

  @doc """
  Start a new supervised agent.

  Returns `{:ok, pid}` on success.

  ## Examples

      {:ok, pid} = AgentSupervisor.start_agent("Worker-1")
      {:ok, pid} = AgentSupervisor.start_agent("Worker-2", memory: %{key: "val"})

  """
  def start_agent(name, opts \\ []) do
    # TODO: Create child spec for AgentServer
    # TODO: Use DynamicSupervisor.start_child
  end

  @doc """
  Stop a supervised agent.

  ## Examples

      AgentSupervisor.stop_agent(pid)

  """
  def stop_agent(pid) when is_pid(pid) do
    # TODO: Use DynamicSupervisor.terminate_child
  end

  @doc """
  List all supervised agent PIDs.

  ## Examples

      [pid1, pid2] = AgentSupervisor.list_agents()

  """
  def list_agents do
    # TODO: Use DynamicSupervisor.which_children
    # Extract just the pids
  end

  @doc """
  Count supervised agents.

  ## Examples

      %{active: 3, ...} = AgentSupervisor.count_agents()

  """
  def count_agents do
    # TODO: Use DynamicSupervisor.count_children
  end

  # ============================================
  # Callbacks
  # ============================================

  @impl true
  def init(_opts) do
    DynamicSupervisor.init(
      strategy: :one_for_one,
      max_restarts: 5,
      max_seconds: 60
    )
  end
end

🤔 Supervisor Questions

supervisor_questions = %{
  # 1. Why DynamicSupervisor instead of regular Supervisor?
  why_dynamic: "???",

  # 2. What restart strategy should agents use?
  restart_strategy: "???",

  # 3. What happens if an agent crashes?
  on_crash: "???",

  # 4. How is this simpler than AgentMonitor?
  simplification: "???"
}

Part 4: Create Application Module

Create a new file: lib/agent_framework/application.ex

Specification

The Application module should:

  1. Define the supervision tree root
  2. Start Registry and AgentSupervisor
  3. Be referenced in mix.exs

Template

defmodule AgentFramework.Application do
  @moduledoc """
  OTP Application for AgentFramework.

  Starts the supervision tree:

      AgentFramework.Supervisor
           │
      ┌────┴────┐
      ▼         ▼
   Registry  AgentSupervisor

  """
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # TODO: Add Registry
      # TODO: Add AgentSupervisor
    ]

    opts = [
      strategy: :one_for_one,
      name: AgentFramework.Supervisor
    ]

    Supervisor.start_link(children, opts)
  end
end

Update mix.exs

# In mix.exs, update the application function:
def application do
  [
    mod: {AgentFramework.Application, []},
    extra_applications: [:logger]
  ]
end

Part 5: Update AgentFramework API

Update lib/agent_framework.ex to add OTP-based functions.

New Functions to Add

# Add to AgentFramework module:

# ============================================
# Phase 3: OTP-based Agent API
# ============================================

@doc """
Start a supervised agent using OTP.

## Examples

    {:ok, pid} = AgentFramework.start_otp_agent("Worker-1")

"""
def start_otp_agent(name, opts \\ []) do
  AgentSupervisor.start_agent(name, opts)
end

@doc """
Stop a supervised agent.

## Examples

    AgentFramework.stop_otp_agent(pid)

"""
def stop_otp_agent(pid) do
  AgentSupervisor.stop_agent(pid)
end

@doc """
List all supervised agents.
"""
def list_otp_agents do
  AgentSupervisor.list_agents()
end

# Delegate to AgentServer for operations
defdelegate agent_get_state(server), to: AgentServer, as: :get_state
defdelegate agent_remember(server, key, value), to: AgentServer, as: :remember
defdelegate agent_recall(server, key), to: AgentServer, as: :recall
defdelegate agent_send_task(server, action, params), to: AgentServer, as: :send_task
defdelegate agent_process_next(server), to: AgentServer, as: :process_next

Part 6: Create OTP Tests

Create a new test file: test/agent_framework/otp_test.exs

Test Template

defmodule AgentFramework.OTPTest do
  use ExUnit.Case, async: false

  alias AgentFramework.{AgentServer, AgentSupervisor}

  # ============================================
  # AgentServer Tests
  # ============================================

  describe "AgentServer.start_link/2" do
    test "starts a GenServer process" do
      {:ok, pid} = AgentServer.start_link("Test-Agent")
      assert Process.alive?(pid)
      GenServer.stop(pid)
    end

    test "initializes with correct state" do
      {:ok, pid} = AgentServer.start_link("Test-Agent")
      state = AgentServer.get_state(pid)

      assert state.name == "Test-Agent"
      assert state.status == :idle
      assert state.memory == %{}
      assert state.inbox == []
      assert state.processed_count == 0

      GenServer.stop(pid)
    end

    test "accepts initial memory" do
      {:ok, pid} = AgentServer.start_link("Test-Agent", memory: %{key: "value"})
      state = AgentServer.get_state(pid)

      assert state.memory == %{key: "value"}

      GenServer.stop(pid)
    end
  end

  describe "AgentServer memory operations" do
    setup do
      {:ok, pid} = AgentServer.start_link("Memory-Agent")
      on_exit(fn -> GenServer.stop(pid) end)
      {:ok, server: pid}
    end

    test "remember and recall", %{server: pid} do
      :ok = AgentServer.remember(pid, :context, "researching")
      # Give cast time to process
      Process.sleep(10)
      assert AgentServer.recall(pid, :context) == "researching"
    end

    test "recall missing key returns nil", %{server: pid} do
      assert AgentServer.recall(pid, :missing) == nil
    end

    test "forget_all clears memory", %{server: pid} do
      AgentServer.remember(pid, :a, 1)
      AgentServer.remember(pid, :b, 2)
      AgentServer.forget_all(pid)
      Process.sleep(10)

      state = AgentServer.get_state(pid)
      assert state.memory == %{}
    end
  end

  describe "AgentServer task operations" do
    setup do
      {:ok, pid} = AgentServer.start_link("Task-Agent")
      on_exit(fn -> GenServer.stop(pid) end)
      {:ok, server: pid}
    end

    test "send_task adds to inbox", %{server: pid} do
      AgentServer.send_task(pid, :search, %{query: "test"})
      Process.sleep(10)
      assert AgentServer.inbox_count(pid) == 1
    end

    test "process_next processes a task", %{server: pid} do
      AgentServer.send_task(pid, :search, %{query: "OTP"})
      Process.sleep(10)

      {:ok, task, result} = AgentServer.process_next(pid)
      assert task.payload.action == :search
      assert result == {:ok, "Search results for: OTP"}
    end

    test "process_next on empty inbox returns :empty", %{server: pid} do
      assert AgentServer.process_next(pid) == {:empty, nil}
    end
  end

  # ============================================
  # AgentSupervisor Tests
  # ============================================

  describe "AgentSupervisor" do
    setup do
      # Start supervisor with unique name for test isolation
      name = :"test_sup_#{System.unique_integer([:positive])}"
      {:ok, sup} = AgentSupervisor.start_link(name: name)
      on_exit(fn -> Supervisor.stop(sup) end)
      {:ok, supervisor: name}
    end

    test "starts agents", %{supervisor: sup} do
      {:ok, pid} = DynamicSupervisor.start_child(sup, {AgentServer, "Worker-1"})
      assert Process.alive?(pid)
    end

    test "restarts crashed agents", %{supervisor: sup} do
      {:ok, pid} = DynamicSupervisor.start_child(sup, {AgentServer, "Crashy"})
      original_pid = pid

      # Crash it
      Process.exit(pid, :kill)
      Process.sleep(100)

      # Check it restarted
      [{_, new_pid, _, _}] = DynamicSupervisor.which_children(sup)
      assert is_pid(new_pid)
      assert new_pid != original_pid
    end
  end

  # ============================================
  # Integration Tests
  # ============================================

  describe "OTP Integration" do
    test "full workflow with supervisor" do
      # Start supervisor
      {:ok, sup} = AgentSupervisor.start_link(name: :integration_test_sup)

      # Start agents
      {:ok, w1} = DynamicSupervisor.start_child(
        :integration_test_sup,
        {AgentServer, "Worker-1"}
      )
      {:ok, w2} = DynamicSupervisor.start_child(
        :integration_test_sup,
        {AgentServer, "Worker-2"}
      )

      # Use agents
      AgentServer.remember(w1, :task, "research")
      AgentServer.remember(w2, :task, "write")

      Process.sleep(10)

      assert AgentServer.recall(w1, :task) == "research"
      assert AgentServer.recall(w2, :task) == "write"

      # Send and process tasks
      AgentServer.send_task(w1, :search, %{query: "Elixir OTP"})
      Process.sleep(10)

      {:ok, task, _result} = AgentServer.process_next(w1)
      assert task.payload.action == :search

      # Cleanup
      Supervisor.stop(sup)
    end
  end
end

Part 7: Verification Checklist

Run these checks to verify your implementation:

1. Compile Check

cd agent_framework
mix compile

Expected: No warnings about undefined functions.

2. Run Tests

mix test

Expected: All tests pass.

3. Interactive Test

iex -S mix
# Should automatically start supervision tree
AgentFramework.AgentSupervisor.count_agents()
# => %{active: 0, ...}

# Start an agent
{:ok, agent} = AgentFramework.AgentSupervisor.start_agent("Worker-1")

# Use it
AgentFramework.AgentServer.remember(agent, :hello, "world")
AgentFramework.AgentServer.recall(agent, :hello)
# => "world"

# Send tasks
AgentFramework.AgentServer.send_task(agent, :search, %{query: "OTP"})
AgentFramework.AgentServer.inbox_count(agent)
# => 1

# Process tasks
AgentFramework.AgentServer.process_next(agent)
# => {:ok, %Message{...}, {:ok, "Search results for: OTP"}}

# Crash and verify restart
Process.exit(agent, :kill)
Process.sleep(100)
AgentFramework.AgentSupervisor.list_agents()
# Should show a new PID (agent was restarted)

4. Observer Test (Optional)

:observer.start()

Navigate to Applications → agent_framework to see your supervision tree.


Completion Checklist

Before moving to Phase 4, verify:

  • [ ] AgentServer is a working GenServer with all callbacks
  • [ ] AgentSupervisor is a DynamicSupervisor that manages agents
  • [ ] Application module starts the supervision tree
  • [ ] mix.exs references the Application module
  • [ ] All tests pass with mix test
  • [ ] iex -S mix shows supervision tree running
  • [ ] Crashing an agent triggers automatic restart
  • [ ] list_agents() and count_agents() work correctly

Key Learnings from Phase 3

You’ve now experienced the power of OTP:

  1. Less Code, Same Features - Went from ~400 lines to ~100 lines
  2. Battle-Tested Patterns - Using 30+ years of refinement
  3. Built-in Tools - Observer, tracing, debugging
  4. Automatic Recovery - Supervisors handle restarts
  5. Clean Architecture - Separation of concerns (client/server)
  6. Production Ready - Same patterns used by WhatsApp, Discord, etc.

What’s Next: Phase 4 Preview

Phase 4 will add Phoenix for HTTP/API support:

  • A2A protocol endpoints
  • JSON-RPC handling
  • WebSocket channels for real-time
  • Phoenix.PubSub for distributed agents

Your OTP foundation is ready for the web layer!


Navigation

Previous: Session 15 - OTP Distribution

→ [Next: Phase 4 - Phoenix Essentials]