Session 11: Checkpoint - Process-Based Agents
# Add the agent_framework to the path
Mix.install([
{:agent_framework, path: Path.expand("../agent_framework", __DIR__)}
])
Phase 2 Checkpoint
This checkpoint verifies that you’ve successfully learned:
- Spawning processes and message passing
- Maintaining state with recursive loops
- Handling errors with links and monitors
- Designing multi-process architectures
We’ve extended the agent_framework project with real process-based agents!
Project Structure
The Phase 2 additions to agent_framework:
agent_framework/
├── lib/
│ └── agent_framework/
│ ├── agent.ex # Phase 1: Agent struct
│ ├── message.ex # Phase 1: Message struct
│ ├── process_agent.ex # NEW: Agent as process
│ ├── agent_registry.ex # NEW: Named agent lookup
│ └── agent_monitor.ex # NEW: Fault tolerance
│ └── agent_framework.ex # Updated: Process-based API
└── test/
└── agent_framework/
├── agent_test.exs # Phase 1 tests
├── message_test.exs # Phase 1 tests
└── process_test.exs # NEW: Phase 2 tests
1. ProcessAgent - Agents as Processes
The ProcessAgent module spawns agents as real Elixir processes.
alias AgentFramework.{ProcessAgent, Message}
# Start an agent process
{:ok, worker} = ProcessAgent.start("Research-Worker")
IO.puts("Started worker: #{inspect(worker)}")
IO.puts("Worker alive? #{Process.alive?(worker)}")
# Check initial state
{:ok, state} = ProcessAgent.get_state(worker)
IO.inspect(state, label: "Initial state")
Memory Operations
# Store values in memory
ProcessAgent.remember(worker, :context, "Researching Elixir concurrency")
ProcessAgent.remember(worker, :priority, :high)
ProcessAgent.remember(worker, :deadline, "end of week")
# Recall values
{:ok, context} = ProcessAgent.recall(worker, :context)
IO.puts("Context: #{context}")
{:ok, priority} = ProcessAgent.recall(worker, :priority)
IO.puts("Priority: #{priority}")
Task Processing
# Send tasks to the inbox
ProcessAgent.send_task(worker, :search, %{query: "GenServer tutorial"})
ProcessAgent.send_task(worker, :analyze, %{data: "search results"})
ProcessAgent.send_task(worker, :summarize, %{text: "This is a long article about Elixir concurrency patterns..."})
# Check inbox count
{:ok, count} = ProcessAgent.inbox_count(worker)
IO.puts("Tasks in inbox: #{count}")
# Process tasks one by one
{:ok, task1, result1} = ProcessAgent.process_next(worker)
IO.puts("Task 1: #{task1.payload.action}")
IO.puts("Result 1: #{inspect(result1)}")
{:ok, task2, result2} = ProcessAgent.process_next(worker)
IO.puts("\nTask 2: #{task2.payload.action}")
IO.puts("Result 2: #{inspect(result2)}")
{:ok, task3, result3} = ProcessAgent.process_next(worker)
IO.puts("\nTask 3: #{task3.payload.action}")
IO.puts("Result 3: #{inspect(result3)}")
# Verify processed count
{:ok, state} = ProcessAgent.get_state(worker)
IO.puts("Processed count: #{state.processed_count}")
IO.puts("Tasks remaining: #{length(state.inbox)}")
# Clean up
ProcessAgent.stop(worker)
IO.puts("Worker stopped: #{not Process.alive?(worker)}")
2. AgentRegistry - Named Agent Lookup
The AgentRegistry allows looking up agents by name instead of PID.
alias AgentFramework.AgentRegistry
# Start the registry
{:ok, _} = AgentRegistry.start_link()
IO.puts("Registry started")
# Create agents that register themselves
agent1 = spawn(fn ->
AgentRegistry.register("Agent-Alpha")
IO.puts("Alpha registered")
receive do
{:ping, from} ->
IO.puts("Alpha received ping")
send(from, {:pong, "Alpha"})
:stop -> :ok
end
end)
agent2 = spawn(fn ->
AgentRegistry.register("Agent-Beta")
IO.puts("Beta registered")
receive do
{:ping, from} ->
IO.puts("Beta received ping")
send(from, {:pong, "Beta"})
:stop -> :ok
end
end)
Process.sleep(100)
# List registered agents
agents = AgentRegistry.list()
IO.puts("Registered agents: #{inspect(agents)}")
# Check existence
IO.puts("Alpha exists? #{AgentRegistry.exists?("Agent-Alpha")}")
IO.puts("Gamma exists? #{AgentRegistry.exists?("Agent-Gamma")}")
# Lookup and send message by name
{:ok, alpha_pid} = AgentRegistry.lookup("Agent-Alpha")
send(alpha_pid, {:ping, self()})
receive do
{:pong, name} -> IO.puts("Got pong from #{name}")
end
# Broadcast to all agents
AgentRegistry.broadcast({:ping, self()})
# Collect responses
receive do
{:pong, name} -> IO.puts("Pong from #{name}")
after
1000 -> IO.puts("Timeout")
end
# Cleanup
send(agent1, :stop)
send(agent2, :stop)
3. AgentMonitor - Fault Tolerance
The AgentMonitor provides supervisor-like fault tolerance.
alias AgentFramework.AgentMonitor
# Start a monitor with restart policy
{:ok, monitor} = AgentMonitor.start_link(restart_policy: :always, max_restarts: 3)
IO.puts("Monitor started with :always restart policy")
# Start monitored agents
{:ok, worker1} = AgentMonitor.start_agent(monitor, "Worker-1")
{:ok, worker2} = AgentMonitor.start_agent(monitor, "Worker-2")
IO.puts("Started Worker-1: #{inspect(worker1)}")
IO.puts("Started Worker-2: #{inspect(worker2)}")
# Check status
status = AgentMonitor.status(monitor)
IO.inspect(status, label: "Monitor status")
# List agents
agents = AgentMonitor.list_agents(monitor)
IO.puts("Monitored agents: #{inspect(agents)}")
Crash Recovery Demo
# Get current PID for Worker-1
[{"Worker-1", original_pid}, _] = AgentMonitor.list_agents(monitor)
IO.puts("Original Worker-1 PID: #{inspect(original_pid)}")
# Crash Worker-1
IO.puts("\nCrashing Worker-1...")
send(original_pid, {:crash, :demo_crash})
# Wait for restart
Process.sleep(500)
# Check that it was restarted
agents_after = AgentMonitor.list_agents(monitor)
[{"Worker-1", new_pid}, _] = Enum.sort(agents_after)
IO.puts("\nNew Worker-1 PID: #{inspect(new_pid)}")
IO.puts("PIDs different? #{original_pid != new_pid}")
IO.puts("New process alive? #{Process.alive?(new_pid)}")
# Check restart count in status
status = AgentMonitor.status(monitor)
IO.puts("Worker-1 restart count: #{status.agents["Worker-1"].restart_count}")
Max Restarts Demo
# Start a new monitor with low max_restarts
{:ok, limited_monitor} = AgentMonitor.start_link(restart_policy: :always, max_restarts: 2)
{:ok, fragile} = AgentMonitor.start_agent(limited_monitor, "Fragile-Agent")
IO.puts("Started Fragile-Agent with max_restarts: 2")
IO.puts("Initial PID: #{inspect(fragile)}")
# Crash multiple times
for i <- 1..3 do
[{_, current_pid}] = AgentMonitor.list_agents(limited_monitor)
IO.puts("\nCrash ##{i}, PID: #{inspect(current_pid)}")
send(current_pid, {:crash, :intentional})
Process.sleep(300)
agents = AgentMonitor.list_agents(limited_monitor)
IO.puts("Agents remaining: #{length(agents)}")
end
IO.puts("\nAfter exceeding max_restarts, agent is gone.")
# Cleanup monitors
AgentMonitor.stop(monitor)
AgentMonitor.stop(limited_monitor)
4. Integration: Complete Multi-Agent Workflow
Let’s demonstrate a complete workflow using all Phase 2 components.
# Setup
{:ok, _} = AgentRegistry.start_link()
{:ok, supervisor} = AgentMonitor.start_link()
IO.puts("=== Multi-Agent System Started ===\n")
# Start specialized agents
{:ok, researcher} = AgentMonitor.start_agent(supervisor, "Researcher")
{:ok, analyzer} = AgentMonitor.start_agent(supervisor, "Analyzer")
{:ok, summarizer} = AgentMonitor.start_agent(supervisor, "Summarizer")
IO.puts("Started agents:")
for {name, pid} <- AgentMonitor.list_agents(supervisor) do
IO.puts(" - #{name}: #{inspect(pid)}")
end
# Configure agents with memory
ProcessAgent.remember(researcher, :specialty, "information gathering")
ProcessAgent.remember(analyzer, :specialty, "data analysis")
ProcessAgent.remember(summarizer, :specialty, "content summarization")
IO.puts("\nAgent specialties configured:")
for {name, pid} <- AgentMonitor.list_agents(supervisor) do
{:ok, specialty} = ProcessAgent.recall(pid, :specialty)
IO.puts(" - #{name}: #{specialty}")
end
# Simulate a multi-stage task pipeline
IO.puts("\n=== Task Pipeline ===\n")
# Stage 1: Research
IO.puts("Stage 1: Research")
ProcessAgent.send_task(researcher, :search, %{query: "Elixir GenServer patterns"})
{:ok, _, research_result} = ProcessAgent.process_next(researcher)
IO.puts(" Research result: #{inspect(research_result)}")
# Stage 2: Analyze (using research results)
IO.puts("\nStage 2: Analysis")
ProcessAgent.send_task(analyzer, :analyze, %{data: elem(research_result, 1)})
{:ok, _, analysis_result} = ProcessAgent.process_next(analyzer)
IO.puts(" Analysis result: #{inspect(analysis_result)}")
# Stage 3: Summarize
IO.puts("\nStage 3: Summarize")
ProcessAgent.send_task(summarizer, :summarize, %{text: elem(analysis_result, 1)})
{:ok, _, summary_result} = ProcessAgent.process_next(summarizer)
IO.puts(" Summary result: #{inspect(summary_result)}")
# Verify all agents processed their tasks
IO.puts("\n=== Processing Statistics ===\n")
for {name, pid} <- AgentMonitor.list_agents(supervisor) do
{:ok, state} = ProcessAgent.get_state(pid)
IO.puts("#{name}: processed #{state.processed_count} task(s)")
end
# Demonstrate fault tolerance
IO.puts("\n=== Fault Tolerance Demo ===\n")
[{"Analyzer", analyzer_pid} | _] = Enum.filter(
AgentMonitor.list_agents(supervisor),
fn {name, _} -> name == "Analyzer" end
)
IO.puts("Crashing Analyzer (#{inspect(analyzer_pid)})...")
send(analyzer_pid, {:crash, :simulated_failure})
Process.sleep(300)
# Verify restart
[{"Analyzer", new_analyzer_pid} | _] = Enum.filter(
AgentMonitor.list_agents(supervisor),
fn {name, _} -> name == "Analyzer" end
)
IO.puts("Analyzer restarted with new PID: #{inspect(new_analyzer_pid)}")
IO.puts("All agents still running: #{length(AgentMonitor.list_agents(supervisor))}")
# Cleanup
IO.puts("\n=== Shutdown ===\n")
AgentMonitor.stop(supervisor)
IO.puts("System shutdown complete")
5. Verification Checklist
Let’s verify all the Phase 2 learning goals:
IO.puts("=== Phase 2 Verification ===\n")
tests = [
{"Can spawn processes", true},
{"Can send/receive messages", true},
{"Can maintain state in processes", true},
{"ProcessAgent module works", Code.ensure_loaded?(AgentFramework.ProcessAgent)},
{"AgentRegistry module works", Code.ensure_loaded?(AgentFramework.AgentRegistry)},
{"AgentMonitor module works", Code.ensure_loaded?(AgentFramework.AgentMonitor)},
{"Agents communicate via messages", true},
{"Crash detection works", true},
{"Auto-restart works", true}
]
for {test, passed} <- tests do
status = if passed, do: "[PASS]", else: "[FAIL]"
IO.puts("#{status} #{test}")
end
IO.puts("\n" <> String.duplicate("=", 40))
IO.puts("Phase 2 Complete!")
IO.puts(String.duplicate("=", 40))
Summary
What We Built
| Module | Purpose |
|---|---|
ProcessAgent |
Agent as a real Elixir process |
AgentRegistry |
Named process lookup |
AgentMonitor |
Fault tolerance / supervision |
Key Concepts Applied
- spawn/send/receive - Process creation and communication
- Recursive loops - State maintenance
- Links and monitors - Crash propagation and notification
- trap_exit - Supervisor pattern
- Registry - Named process discovery
- Message protocols - Structured communication
What’s Next
In Phase 3, we’ll learn about OTP behaviors:
- GenServer - Generic server abstraction
- Supervisor - Proper supervision trees
- Application - Application lifecycle
- Registry (OTP) - Built-in process registry
These OTP abstractions formalize the patterns we built manually in Phase 2!
Running the Tests
To verify everything works, run the tests:
cd agent_framework
mix test
Expected output:
......................
Finished in X.X seconds
XX tests, 0 failures