Session 16: Checkpoint Project - OTP Agents
Mix.install([])
Introduction
Congratulations! You’ve learned all the core OTP concepts:
- GenServer - Process state and message handling
- Supervisor - Fault tolerance and restarts
- DynamicSupervisor - Runtime child management
- Application - Lifecycle and supervision tree startup
- Distribution - Multi-node communication
Now it’s time to put it all together by refactoring your Phase 2
agent_framework to use OTP properly.
Project Goal
Convert the manually-implemented Phase 2 code to OTP:
| Phase 2 (Manual) | Phase 3 (OTP) |
|---|---|
ProcessAgent |
AgentServer (GenServer) |
AgentMonitor |
AgentSupervisor (DynamicSupervisor) |
| Manual startup |
Application module |
The end result will have the same functionality with:
- ~80% less supervision code
- Built-in debugging tools
- Automatic application startup
- Production-ready patterns
Part 1: Review Phase 2 Structure
Before converting, let’s review what you built in Phase 2:
# Current Phase 2 structure:
# agent_framework/
# ├── lib/
# │ └── agent_framework/
# │ ├── agent.ex # Phase 1 struct (keep)
# │ ├── message.ex # Phase 1 messages (keep)
# │ ├── process_agent.ex # → Will become AgentServer
# │ ├── agent_monitor.ex # → Will become AgentSupervisor
# │ └── agent_registry.ex # Already uses OTP Registry (keep)
# │ └── agent_framework.ex # API module (update)
# └── test/
# └── agent_framework/
# ├── agent_test.exs
# ├── message_test.exs
# └── process_test.exs # → Will add otp_test.exs
🤔 Pre-Conversion Reflection
# Before you start, answer these questions:
pre_conversion = %{
# 1. Which callbacks will AgentServer need?
agent_server_callbacks: [
# List the GenServer callbacks you'll implement
],
# 2. What will AgentSupervisor's init/1 look like?
supervisor_init: """
# Sketch the init function
""",
# 3. What children will Application.start/2 supervise?
application_children: [
# List the children in order
]
}
Part 2: Create AgentServer (GenServer)
Create a new file: lib/agent_framework/agent_server.ex
Specification
The AgentServer module should:
- Be a GenServer that maintains agent state
-
Support all the operations from
ProcessAgent - Use proper OTP patterns
Required Functions
Client API:
-
start_link(name, opts \\ [])- Start a supervised agent -
get_state(server)- Get full agent state -
get_status(server)- Get just the status -
remember(server, key, value)- Store in memory (async) -
recall(server, key)- Retrieve from memory (sync) -
forget_all(server)- Clear memory (async) -
send_task(server, action, params)- Queue a task (async) -
process_next(server)- Process next task (sync) -
inbox_count(server)- Get inbox size (sync)
Server Callbacks:
-
init/1- Initialize state -
handle_call/3- Sync operations -
handle_cast/2- Async operations -
handle_info/2- System messages -
terminate/2- Cleanup
Template
defmodule AgentFramework.AgentServer do
@moduledoc """
GenServer-based agent that processes tasks and maintains memory.
This is the OTP version of Phase 2's ProcessAgent.
## Example
{:ok, agent} = AgentServer.start_link("Worker-1")
AgentServer.remember(agent, :context, "researching")
AgentServer.send_task(agent, :search, %{query: "OTP"})
{:ok, task, result} = AgentServer.process_next(agent)
"""
use GenServer
alias AgentFramework.Message
# ============================================
# Type Definitions
# ============================================
@type state :: %{
name: String.t(),
status: :idle | :busy | :waiting,
memory: map(),
inbox: [Message.t()],
processed_count: non_neg_integer()
}
# ============================================
# Client API
# ============================================
@doc """
Start a linked AgentServer process.
## Options
- `:memory` - Initial memory map (default: %{})
- `:name` - Process registration name (optional)
## Examples
{:ok, pid} = AgentServer.start_link("Worker-1")
{:ok, pid} = AgentServer.start_link("Worker-2", memory: %{key: "value"})
"""
def start_link(name, opts \\ []) when is_binary(name) do
# TODO: Implement
# Hint: Use GenServer.start_link(__MODULE__, init_arg, options)
end
@doc "Get the agent's full state."
def get_state(server) do
# TODO: Implement using GenServer.call
end
@doc "Get the agent's current status."
def get_status(server) do
# TODO: Implement
end
@doc "Store a value in memory (async)."
def remember(server, key, value) do
# TODO: Implement using GenServer.cast
end
@doc "Recall a value from memory (sync)."
def recall(server, key) do
# TODO: Implement
end
@doc "Clear all memory (async)."
def forget_all(server) do
# TODO: Implement
end
@doc "Send a task to the inbox (async)."
def send_task(server, action, params \\ %{}) do
# TODO: Implement
end
@doc "Process the next task (sync)."
def process_next(server) do
# TODO: Implement
end
@doc "Get the number of tasks in inbox (sync)."
def inbox_count(server) do
# TODO: Implement
end
# ============================================
# Server Callbacks
# ============================================
@impl true
def init(init_arg) do
# TODO: Parse init_arg, create initial state
# Return {:ok, state}
end
@impl true
def handle_call(request, _from, state) do
# TODO: Handle :get_state, :get_status, {:recall, key},
# :process_next, :inbox_count
end
@impl true
def handle_cast(request, state) do
# TODO: Handle {:remember, key, value}, :forget_all,
# {:send_task, action, params}
end
@impl true
def handle_info(msg, state) do
# Handle unexpected messages
IO.puts("[AgentServer #{state.name}] Unexpected: #{inspect(msg)}")
{:noreply, state}
end
@impl true
def terminate(reason, state) do
IO.puts("[AgentServer #{state.name}] Terminating: #{inspect(reason)}")
:ok
end
# ============================================
# Private Functions
# ============================================
# TODO: Copy handle_task functions from ProcessAgent
# defp handle_task(%Message{...}) do ... end
defp generate_id do
:crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
end
end
🤔 Implementation Questions
As you implement, think about:
questions_during_implementation = %{
# 1. Should :remember be call or cast? Why?
remember_choice: "cast because ???",
# 2. Should :recall be call or cast? Why?
recall_choice: "call because ???",
# 3. What should process_next return when inbox is empty?
empty_inbox: "???",
# 4. Do you need child_spec/1? When is it called?
child_spec: "???"
}
Part 3: Create AgentSupervisor (DynamicSupervisor)
Create a new file: lib/agent_framework/agent_supervisor.ex
Specification
The AgentSupervisor module should:
- Be a DynamicSupervisor for dynamic agent management
- Allow starting/stopping agents at runtime
- Automatically restart crashed agents
Required Functions
-
start_link(opts)- Start the supervisor -
start_agent(name, opts \\ [])- Start a supervised agent -
stop_agent(pid)- Stop an agent -
list_agents()- List all supervised agents -
count_agents()- Count supervised agents
Template
defmodule AgentFramework.AgentSupervisor do
@moduledoc """
DynamicSupervisor for agent processes.
This is the OTP version of Phase 2's AgentMonitor.
## Example
# Usually started by Application
{:ok, _} = AgentSupervisor.start_link([])
# Start agents dynamically
{:ok, agent1} = AgentSupervisor.start_agent("Worker-1")
{:ok, agent2} = AgentSupervisor.start_agent("Worker-2")
# List them
AgentSupervisor.list_agents()
"""
use DynamicSupervisor
@default_name __MODULE__
# ============================================
# Public API
# ============================================
@doc """
Start the agent supervisor.
## Options
- `:name` - Process name (default: #{inspect(@default_name)})
"""
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, @default_name)
DynamicSupervisor.start_link(__MODULE__, opts, name: name)
end
@doc """
Start a new supervised agent.
Returns `{:ok, pid}` on success.
## Examples
{:ok, pid} = AgentSupervisor.start_agent("Worker-1")
{:ok, pid} = AgentSupervisor.start_agent("Worker-2", memory: %{key: "val"})
"""
def start_agent(name, opts \\ []) do
# TODO: Create child spec for AgentServer
# TODO: Use DynamicSupervisor.start_child
end
@doc """
Stop a supervised agent.
## Examples
AgentSupervisor.stop_agent(pid)
"""
def stop_agent(pid) when is_pid(pid) do
# TODO: Use DynamicSupervisor.terminate_child
end
@doc """
List all supervised agent PIDs.
## Examples
[pid1, pid2] = AgentSupervisor.list_agents()
"""
def list_agents do
# TODO: Use DynamicSupervisor.which_children
# Extract just the pids
end
@doc """
Count supervised agents.
## Examples
%{active: 3, ...} = AgentSupervisor.count_agents()
"""
def count_agents do
# TODO: Use DynamicSupervisor.count_children
end
# ============================================
# Callbacks
# ============================================
@impl true
def init(_opts) do
DynamicSupervisor.init(
strategy: :one_for_one,
max_restarts: 5,
max_seconds: 60
)
end
end
🤔 Supervisor Questions
supervisor_questions = %{
# 1. Why DynamicSupervisor instead of regular Supervisor?
why_dynamic: "???",
# 2. What restart strategy should agents use?
restart_strategy: "???",
# 3. What happens if an agent crashes?
on_crash: "???",
# 4. How is this simpler than AgentMonitor?
simplification: "???"
}
Part 4: Create Application Module
Create a new file: lib/agent_framework/application.ex
Specification
The Application module should:
- Define the supervision tree root
- Start Registry and AgentSupervisor
-
Be referenced in
mix.exs
Template
defmodule AgentFramework.Application do
@moduledoc """
OTP Application for AgentFramework.
Starts the supervision tree:
AgentFramework.Supervisor
│
┌────┴────┐
▼ ▼
Registry AgentSupervisor
"""
use Application
@impl true
def start(_type, _args) do
children = [
# TODO: Add Registry
# TODO: Add AgentSupervisor
]
opts = [
strategy: :one_for_one,
name: AgentFramework.Supervisor
]
Supervisor.start_link(children, opts)
end
end
Update mix.exs
# In mix.exs, update the application function:
def application do
[
mod: {AgentFramework.Application, []},
extra_applications: [:logger]
]
end
Part 5: Update AgentFramework API
Update lib/agent_framework.ex to add OTP-based functions.
New Functions to Add
# Add to AgentFramework module:
# ============================================
# Phase 3: OTP-based Agent API
# ============================================
@doc """
Start a supervised agent using OTP.
## Examples
{:ok, pid} = AgentFramework.start_otp_agent("Worker-1")
"""
def start_otp_agent(name, opts \\ []) do
AgentSupervisor.start_agent(name, opts)
end
@doc """
Stop a supervised agent.
## Examples
AgentFramework.stop_otp_agent(pid)
"""
def stop_otp_agent(pid) do
AgentSupervisor.stop_agent(pid)
end
@doc """
List all supervised agents.
"""
def list_otp_agents do
AgentSupervisor.list_agents()
end
# Delegate to AgentServer for operations
defdelegate agent_get_state(server), to: AgentServer, as: :get_state
defdelegate agent_remember(server, key, value), to: AgentServer, as: :remember
defdelegate agent_recall(server, key), to: AgentServer, as: :recall
defdelegate agent_send_task(server, action, params), to: AgentServer, as: :send_task
defdelegate agent_process_next(server), to: AgentServer, as: :process_next
Part 6: Create OTP Tests
Create a new test file: test/agent_framework/otp_test.exs
Test Template
defmodule AgentFramework.OTPTest do
use ExUnit.Case, async: false
alias AgentFramework.{AgentServer, AgentSupervisor}
# ============================================
# AgentServer Tests
# ============================================
describe "AgentServer.start_link/2" do
test "starts a GenServer process" do
{:ok, pid} = AgentServer.start_link("Test-Agent")
assert Process.alive?(pid)
GenServer.stop(pid)
end
test "initializes with correct state" do
{:ok, pid} = AgentServer.start_link("Test-Agent")
state = AgentServer.get_state(pid)
assert state.name == "Test-Agent"
assert state.status == :idle
assert state.memory == %{}
assert state.inbox == []
assert state.processed_count == 0
GenServer.stop(pid)
end
test "accepts initial memory" do
{:ok, pid} = AgentServer.start_link("Test-Agent", memory: %{key: "value"})
state = AgentServer.get_state(pid)
assert state.memory == %{key: "value"}
GenServer.stop(pid)
end
end
describe "AgentServer memory operations" do
setup do
{:ok, pid} = AgentServer.start_link("Memory-Agent")
on_exit(fn -> GenServer.stop(pid) end)
{:ok, server: pid}
end
test "remember and recall", %{server: pid} do
:ok = AgentServer.remember(pid, :context, "researching")
# Give cast time to process
Process.sleep(10)
assert AgentServer.recall(pid, :context) == "researching"
end
test "recall missing key returns nil", %{server: pid} do
assert AgentServer.recall(pid, :missing) == nil
end
test "forget_all clears memory", %{server: pid} do
AgentServer.remember(pid, :a, 1)
AgentServer.remember(pid, :b, 2)
AgentServer.forget_all(pid)
Process.sleep(10)
state = AgentServer.get_state(pid)
assert state.memory == %{}
end
end
describe "AgentServer task operations" do
setup do
{:ok, pid} = AgentServer.start_link("Task-Agent")
on_exit(fn -> GenServer.stop(pid) end)
{:ok, server: pid}
end
test "send_task adds to inbox", %{server: pid} do
AgentServer.send_task(pid, :search, %{query: "test"})
Process.sleep(10)
assert AgentServer.inbox_count(pid) == 1
end
test "process_next processes a task", %{server: pid} do
AgentServer.send_task(pid, :search, %{query: "OTP"})
Process.sleep(10)
{:ok, task, result} = AgentServer.process_next(pid)
assert task.payload.action == :search
assert result == {:ok, "Search results for: OTP"}
end
test "process_next on empty inbox returns :empty", %{server: pid} do
assert AgentServer.process_next(pid) == {:empty, nil}
end
end
# ============================================
# AgentSupervisor Tests
# ============================================
describe "AgentSupervisor" do
setup do
# Start supervisor with unique name for test isolation
name = :"test_sup_#{System.unique_integer([:positive])}"
{:ok, sup} = AgentSupervisor.start_link(name: name)
on_exit(fn -> Supervisor.stop(sup) end)
{:ok, supervisor: name}
end
test "starts agents", %{supervisor: sup} do
{:ok, pid} = DynamicSupervisor.start_child(sup, {AgentServer, "Worker-1"})
assert Process.alive?(pid)
end
test "restarts crashed agents", %{supervisor: sup} do
{:ok, pid} = DynamicSupervisor.start_child(sup, {AgentServer, "Crashy"})
original_pid = pid
# Crash it
Process.exit(pid, :kill)
Process.sleep(100)
# Check it restarted
[{_, new_pid, _, _}] = DynamicSupervisor.which_children(sup)
assert is_pid(new_pid)
assert new_pid != original_pid
end
end
# ============================================
# Integration Tests
# ============================================
describe "OTP Integration" do
test "full workflow with supervisor" do
# Start supervisor
{:ok, sup} = AgentSupervisor.start_link(name: :integration_test_sup)
# Start agents
{:ok, w1} = DynamicSupervisor.start_child(
:integration_test_sup,
{AgentServer, "Worker-1"}
)
{:ok, w2} = DynamicSupervisor.start_child(
:integration_test_sup,
{AgentServer, "Worker-2"}
)
# Use agents
AgentServer.remember(w1, :task, "research")
AgentServer.remember(w2, :task, "write")
Process.sleep(10)
assert AgentServer.recall(w1, :task) == "research"
assert AgentServer.recall(w2, :task) == "write"
# Send and process tasks
AgentServer.send_task(w1, :search, %{query: "Elixir OTP"})
Process.sleep(10)
{:ok, task, _result} = AgentServer.process_next(w1)
assert task.payload.action == :search
# Cleanup
Supervisor.stop(sup)
end
end
end
Part 7: Verification Checklist
Run these checks to verify your implementation:
1. Compile Check
cd agent_framework
mix compile
Expected: No warnings about undefined functions.
2. Run Tests
mix test
Expected: All tests pass.
3. Interactive Test
iex -S mix
# Should automatically start supervision tree
AgentFramework.AgentSupervisor.count_agents()
# => %{active: 0, ...}
# Start an agent
{:ok, agent} = AgentFramework.AgentSupervisor.start_agent("Worker-1")
# Use it
AgentFramework.AgentServer.remember(agent, :hello, "world")
AgentFramework.AgentServer.recall(agent, :hello)
# => "world"
# Send tasks
AgentFramework.AgentServer.send_task(agent, :search, %{query: "OTP"})
AgentFramework.AgentServer.inbox_count(agent)
# => 1
# Process tasks
AgentFramework.AgentServer.process_next(agent)
# => {:ok, %Message{...}, {:ok, "Search results for: OTP"}}
# Crash and verify restart
Process.exit(agent, :kill)
Process.sleep(100)
AgentFramework.AgentSupervisor.list_agents()
# Should show a new PID (agent was restarted)
4. Observer Test (Optional)
:observer.start()
Navigate to Applications → agent_framework to see your supervision tree.
Completion Checklist
Before moving to Phase 4, verify:
-
[ ]
AgentServeris a working GenServer with all callbacks -
[ ]
AgentSupervisoris a DynamicSupervisor that manages agents -
[ ]
Applicationmodule starts the supervision tree -
[ ]
mix.exsreferences the Application module -
[ ] All tests pass with
mix test -
[ ]
iex -S mixshows supervision tree running - [ ] Crashing an agent triggers automatic restart
-
[ ]
list_agents()andcount_agents()work correctly
Key Learnings from Phase 3
You’ve now experienced the power of OTP:
- Less Code, Same Features - Went from ~400 lines to ~100 lines
- Battle-Tested Patterns - Using 30+ years of refinement
- Built-in Tools - Observer, tracing, debugging
- Automatic Recovery - Supervisors handle restarts
- Clean Architecture - Separation of concerns (client/server)
- Production Ready - Same patterns used by WhatsApp, Discord, etc.
What’s Next: Phase 4 Preview
Phase 4 will add Phoenix for HTTP/API support:
- A2A protocol endpoints
- JSON-RPC handling
- WebSocket channels for real-time
- Phoenix.PubSub for distributed agents
Your OTP foundation is ready for the web layer!
Navigation
← Previous: Session 15 - OTP Distribution
→ [Next: Phase 4 - Phoenix Essentials]