Session 10: Designing a Concurrent Application
Mix.install([])
Learning Goals
By the end of this session, you will:
- Design process architectures for concurrent applications
- Use the Registry module for named process lookup
- Design message protocols for inter-process communication
- Implement coordinator patterns for orchestrating workers
- Build a complete multi-agent system
1. Designing Process Architecture
When building concurrent applications, the first step is identifying what processes you need.
Questions to Ask
-
What entities have independent lifecycles?
- Each agent can work independently → process per agent
- Each task could be processed independently → process per task?
-
What needs to maintain state?
- Agents have memory → stateful processes
- Coordinator tracks all agents → stateful process
-
What can fail independently?
- One agent crashing shouldn’t stop others → isolated processes
-
What needs to communicate?
- Agents send messages to each other → message passing
Multi-Agent System Architecture
┌──────────────────────────────────────────────────────────────────┐
│ Application │
├──────────────────────────────────────────────────────────────────┤
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ AgentSupervisor │ │
│ │ (monitors all agents, restarts on crash) │ │
│ └─────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────┼───────────────────────┐ │
│ │ │ │ │
│ ┌──▼───┐ ┌───▼──┐ ┌────▼──┐ │
│ │Agent │ │Agent │ │Agent │ │
│ │"A" │◄────────────►│"B" │◄────────────►│"C" │ │
│ └──────┘ messages └──────┘ messages └───────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ AgentRegistry │ │
│ │ (lookup agents by name) │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ MessageRouter │ │
│ │ (routes messages between agents) │ │
│ └────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
2. Process Registry
The Registry module allows you to register processes by name and look them up later.
Starting a Registry
# Start a registry for unique keys (one process per name)
{:ok, _} = Registry.start_link(keys: :unique, name: AgentRegistry)
IO.puts("Registry started!")
Registering Processes
# Spawn a process and register it
worker1 = spawn(fn ->
# Register ourselves
Registry.register(AgentRegistry, "worker-1", %{started_at: DateTime.utc_now()})
receive do
msg -> IO.puts("Worker-1 received: #{inspect(msg)}")
end
end)
worker2 = spawn(fn ->
Registry.register(AgentRegistry, "worker-2", %{started_at: DateTime.utc_now()})
receive do
msg -> IO.puts("Worker-2 received: #{inspect(msg)}")
end
end)
# Give them time to register
Process.sleep(100)
IO.puts("Registered workers")
Looking Up Processes
# Lookup by name
case Registry.lookup(AgentRegistry, "worker-1") do
[{pid, metadata}] ->
IO.puts("Found worker-1: #{inspect(pid)}")
IO.puts("Metadata: #{inspect(metadata)}")
[] ->
IO.puts("worker-1 not found")
end
# Send a message to a named process
case Registry.lookup(AgentRegistry, "worker-2") do
[{pid, _}] ->
send(pid, {:hello, "from registry lookup"})
[] ->
IO.puts("worker-2 not found")
end
Process.sleep(100)
Automatic Cleanup
When a registered process dies, it’s automatically removed from the Registry:
# Create and register a process
temp = spawn(fn ->
Registry.register(AgentRegistry, "temporary", %{})
Process.sleep(500)
# Process exits after sleep
end)
Process.sleep(100)
# It's registered
case Registry.lookup(AgentRegistry, "temporary") do
[{pid, _}] -> IO.puts("Found temporary: #{inspect(pid)}")
[] -> IO.puts("Not found")
end
# Wait for it to exit
Process.sleep(600)
# Automatically unregistered
case Registry.lookup(AgentRegistry, "temporary") do
[{pid, _}] -> IO.puts("Still found: #{inspect(pid)}")
[] -> IO.puts("Temporary was automatically unregistered!")
end
3. Message Protocol Design
Good message protocols make systems easier to understand and maintain.
Protocol Design Principles
-
Consistent structure: Use tagged tuples
{:action, data} -
Include sender: Caller can receive replies
{:request, data, from_pid} -
Use references: Correlate requests with responses
{:request, ref, data, from_pid} - Document expected responses: What does each request return?
Defining Our Agent Protocol
defmodule AgentProtocol do
@moduledoc """
Message protocol for inter-agent communication.
Builds on the Phase 1 Message struct concepts.
"""
# Generate unique message IDs
def new_id do
:crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
end
# --- Task Messages ---
@doc "Create a task request"
def task(action, params, from_pid) do
{:task, %{
id: new_id(),
action: action,
params: params,
from: from_pid,
timestamp: DateTime.utc_now()
}}
end
@doc "Create a task response"
def task_response(task_id, result) do
{:task_response, %{
task_id: task_id,
result: result,
timestamp: DateTime.utc_now()
}}
end
# --- Query Messages ---
def query(query_type, from_pid) do
{:query, %{
id: new_id(),
type: query_type,
from: from_pid
}}
end
def query_response(query_id, data) do
{:query_response, %{
query_id: query_id,
data: data
}}
end
# --- System Messages ---
def status_request(from_pid) do
{:status_request, from_pid}
end
def status_response(status) do
{:status_response, status}
end
def shutdown do
:shutdown
end
# --- Agent-to-Agent Messages ---
def delegate(target_agent, task, from_agent) do
{:delegate, %{
id: new_id(),
target: target_agent,
task: task,
from: from_agent
}}
end
def delegation_result(delegation_id, result, from_agent) do
{:delegation_result, %{
delegation_id: delegation_id,
result: result,
from: from_agent
}}
end
end
# Test the protocol
task = AgentProtocol.task(:search, %{query: "OTP"}, self())
IO.puts("Task message: #{inspect(task)}")
response = AgentProtocol.task_response("abc123", {:ok, ["result1", "result2"]})
IO.puts("Response message: #{inspect(response)}")
4. Coordinator Pattern
A coordinator manages a group of workers, distributing tasks and collecting results.
defmodule Coordinator do
@moduledoc """
Coordinates multiple worker processes.
- Distributes tasks to available workers
- Collects results
- Handles worker failures
"""
def start(worker_module, num_workers) do
spawn(fn ->
# Start workers
workers = for i <- 1..num_workers do
spawn_link(fn -> worker_module.run("Worker-#{i}") end)
end
IO.puts("[Coordinator] Started #{num_workers} workers")
loop(%{
workers: workers,
available: workers,
pending_tasks: [],
results: []
})
end)
end
defp loop(state) do
# Try to dispatch pending tasks to available workers
state = dispatch_tasks(state)
receive do
{:submit, task, from} ->
IO.puts("[Coordinator] Received task: #{inspect(task)}")
new_pending = state.pending_tasks ++ [{task, from}]
loop(%{state | pending_tasks: new_pending})
{:task_complete, worker, task, result} ->
IO.puts("[Coordinator] Task complete from #{inspect(worker)}")
# Worker becomes available again
loop(%{state |
available: [worker | state.available],
results: [{task, result} | state.results]
})
{:get_results, from} ->
send(from, {:results, Enum.reverse(state.results)})
loop(state)
{:status, from} ->
status = %{
total_workers: length(state.workers),
available: length(state.available),
pending_tasks: length(state.pending_tasks),
completed: length(state.results)
}
send(from, {:status, status})
loop(state)
:shutdown ->
IO.puts("[Coordinator] Shutting down workers...")
Enum.each(state.workers, &send(&1, :stop))
:ok
after
5000 ->
IO.puts("[Coordinator] Idle...")
loop(state)
end
end
defp dispatch_tasks(%{available: [], pending_tasks: pending} = state) when pending != [] do
IO.puts("[Coordinator] No available workers, #{length(pending)} tasks waiting")
state
end
defp dispatch_tasks(%{pending_tasks: []} = state) do
state
end
defp dispatch_tasks(%{available: [worker | rest], pending_tasks: [{task, from} | remaining]} = state) do
IO.puts("[Coordinator] Dispatching task to #{inspect(worker)}")
send(worker, {:work, task, self()})
%{state | available: rest, pending_tasks: remaining}
end
end
Simple Worker
defmodule SimpleWorker do
def run(name) do
IO.puts("[#{name}] Started")
loop(name)
end
defp loop(name) do
receive do
{:work, task, coordinator} ->
IO.puts("[#{name}] Processing: #{inspect(task)}")
# Simulate work
Process.sleep(:rand.uniform(500) + 200)
result = "Processed #{inspect(task)} by #{name}"
send(coordinator, {:task_complete, self(), task, result})
loop(name)
:stop ->
IO.puts("[#{name}] Stopping")
:ok
end
end
end
Test the Coordinator
# Start coordinator with 3 workers
coordinator = Coordinator.start(SimpleWorker, 3)
Process.sleep(200)
# Submit tasks
for i <- 1..5 do
send(coordinator, {:submit, "Task-#{i}", self()})
end
# Check status
send(coordinator, {:status, self()})
receive do
{:status, status} -> IO.puts("Status: #{inspect(status)}")
end
# Wait for processing
Process.sleep(3000)
# Get results
send(coordinator, {:get_results, self()})
receive do
{:results, results} ->
IO.puts("\nCompleted tasks:")
for {task, result} <- results do
IO.puts(" #{task}: #{result}")
end
end
send(coordinator, :shutdown)
5. Building a Multi-Agent System
Now let’s put it all together into a complete multi-agent system.
defmodule MultiAgentSystem do
@moduledoc """
A complete multi-agent system with:
- Registry for agent discovery
- Supervisor for fault tolerance
- Message router for communication
- Protocol-based messaging
"""
# ========================================
# System Startup
# ========================================
def start do
# Start registry
{:ok, _} = Registry.start_link(keys: :unique, name: MultiAgentRegistry)
# Start supervisor
supervisor = spawn(fn ->
Process.flag(:trap_exit, true)
supervisor_loop(%{agents: %{}})
end)
# Start message router
router = spawn(fn ->
router_loop()
end)
# Register system components
Registry.register(MultiAgentRegistry, :supervisor, %{})
spawn(fn ->
Registry.register(MultiAgentRegistry, :router, %{})
# Keep router registration alive
receive do
:stop -> :ok
end
end)
%{supervisor: supervisor, router: router}
end
# ========================================
# Supervisor
# ========================================
defp supervisor_loop(state) do
receive do
{:start_agent, name, config, from} ->
case Map.get(state.agents, name) do
nil ->
pid = start_agent_process(name, config)
send(from, {:ok, pid})
supervisor_loop(%{state | agents: Map.put(state.agents, name, pid)})
_existing ->
send(from, {:error, :already_exists})
supervisor_loop(state)
end
{:EXIT, pid, reason} ->
# Find which agent crashed
case find_agent_by_pid(state.agents, pid) do
{name, _pid} ->
IO.puts("[Supervisor] Agent #{name} crashed: #{inspect(reason)}")
if reason != :normal and reason != :shutdown do
IO.puts("[Supervisor] Restarting #{name}...")
new_pid = start_agent_process(name, %{})
supervisor_loop(%{state | agents: Map.put(state.agents, name, new_pid)})
else
supervisor_loop(%{state | agents: Map.delete(state.agents, name)})
end
nil ->
supervisor_loop(state)
end
{:stop_agent, name, from} ->
case Map.get(state.agents, name) do
nil ->
send(from, {:error, :not_found})
supervisor_loop(state)
pid ->
send(pid, :shutdown)
send(from, :ok)
supervisor_loop(%{state | agents: Map.delete(state.agents, name)})
end
{:list_agents, from} ->
send(from, {:agents, Map.keys(state.agents)})
supervisor_loop(state)
:shutdown ->
IO.puts("[Supervisor] Shutting down all agents...")
Enum.each(state.agents, fn {_name, pid} ->
send(pid, :shutdown)
end)
:ok
end
end
defp find_agent_by_pid(agents, pid) do
Enum.find(agents, fn {_name, p} -> p == pid end)
end
defp start_agent_process(name, config) do
pid = spawn_link(fn ->
# Register with the registry
Registry.register(MultiAgentRegistry, {:agent, name}, %{config: config})
agent_loop(%{name: name, memory: %{}, inbox: [], config: config})
end)
IO.puts("[Supervisor] Started agent #{name}: #{inspect(pid)}")
pid
end
# ========================================
# Agent Process
# ========================================
defp agent_loop(state) do
receive do
# Memory operations
{:remember, key, value} ->
IO.puts("[#{state.name}] Remembering #{key}")
agent_loop(%{state | memory: Map.put(state.memory, key, value)})
{:recall, key, from} ->
value = Map.get(state.memory, key)
send(from, {:recalled, key, value})
agent_loop(state)
# Task operations
{:task, task_msg} ->
IO.puts("[#{state.name}] Received task: #{inspect(task_msg.action)}")
result = handle_task(state, task_msg)
# Send response back
send(task_msg.from, AgentProtocol.task_response(task_msg.id, result))
agent_loop(state)
# Inter-agent delegation
{:delegate, delegation} ->
IO.puts("[#{state.name}] Delegation from #{delegation.from}: #{inspect(delegation.task)}")
result = handle_task(state, %{action: delegation.task.action, params: delegation.task.params})
# Send result back to delegating agent
case Registry.lookup(MultiAgentRegistry, {:agent, delegation.from}) do
[{from_pid, _}] ->
send(from_pid, AgentProtocol.delegation_result(delegation.id, result, state.name))
[] ->
IO.puts("[#{state.name}] Warning: Delegating agent #{delegation.from} not found")
end
agent_loop(state)
{:delegation_result, result_msg} ->
IO.puts("[#{state.name}] Delegation result: #{inspect(result_msg.result)}")
agent_loop(state)
# Status
{:status, from} ->
status = %{
name: state.name,
memory_keys: Map.keys(state.memory),
inbox_count: length(state.inbox)
}
send(from, {:status, status})
agent_loop(state)
# Control
:crash ->
IO.puts("[#{state.name}] Forced crash!")
exit(:forced_crash)
:shutdown ->
IO.puts("[#{state.name}] Shutting down")
:ok
other ->
IO.puts("[#{state.name}] Unknown message: #{inspect(other)}")
agent_loop(state)
after
30000 ->
IO.puts("[#{state.name}] Idle...")
agent_loop(state)
end
end
defp handle_task(state, %{action: :search, params: params}) do
query = Map.get(params, :query, "")
IO.puts("[#{state.name}] Searching: #{query}")
Process.sleep(100)
{:ok, ["Result 1 for #{query}", "Result 2 for #{query}"]}
end
defp handle_task(state, %{action: :analyze, params: params}) do
data = Map.get(params, :data, "")
IO.puts("[#{state.name}] Analyzing: #{data}")
Process.sleep(150)
{:ok, "Analysis of #{data}: positive"}
end
defp handle_task(state, %{action: :summarize, params: params}) do
text = Map.get(params, :text, "")
IO.puts("[#{state.name}] Summarizing text (#{String.length(text)} chars)")
Process.sleep(200)
{:ok, "Summary: #{String.slice(text, 0, 50)}..."}
end
defp handle_task(_state, task) do
{:error, {:unknown_action, task.action}}
end
# ========================================
# Message Router
# ========================================
defp router_loop do
receive do
{:route, to_agent, message} ->
case Registry.lookup(MultiAgentRegistry, {:agent, to_agent}) do
[{pid, _}] ->
send(pid, message)
IO.puts("[Router] Routed message to #{to_agent}")
[] ->
IO.puts("[Router] Agent #{to_agent} not found")
end
router_loop()
{:broadcast, message} ->
Registry.select(MultiAgentRegistry, [{{:"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]}])
|> Enum.filter(fn {{:agent, _name}, _pid} -> true; _ -> false end)
|> Enum.each(fn {{:agent, _name}, pid} ->
send(pid, message)
end)
router_loop()
:stop ->
:ok
end
end
# ========================================
# Client API
# ========================================
def start_agent(supervisor, name, config \\ %{}) do
send(supervisor, {:start_agent, name, config, self()})
receive do
{:ok, pid} -> {:ok, pid}
{:error, reason} -> {:error, reason}
after
5000 -> {:error, :timeout}
end
end
def stop_agent(supervisor, name) do
send(supervisor, {:stop_agent, name, self()})
receive do
:ok -> :ok
{:error, reason} -> {:error, reason}
after
5000 -> {:error, :timeout}
end
end
def list_agents(supervisor) do
send(supervisor, {:list_agents, self()})
receive do
{:agents, agents} -> agents
after
5000 -> []
end
end
def send_task(agent_name, action, params) do
case Registry.lookup(MultiAgentRegistry, {:agent, agent_name}) do
[{pid, _}] ->
task = AgentProtocol.task(action, params, self())
send(pid, task)
receive do
{:task_response, response} ->
{:ok, response.result}
after
10000 -> {:error, :timeout}
end
[] ->
{:error, :agent_not_found}
end
end
def delegate_task(from_agent, to_agent, action, params) do
case Registry.lookup(MultiAgentRegistry, {:agent, to_agent}) do
[{pid, _}] ->
delegation = AgentProtocol.delegate(to_agent, %{action: action, params: params}, from_agent)
send(pid, delegation)
:ok
[] ->
{:error, :agent_not_found}
end
end
end
Test the Multi-Agent System
# Start the system
system = MultiAgentSystem.start()
Process.sleep(200)
# Start some agents
{:ok, _} = MultiAgentSystem.start_agent(system.supervisor, "Research-Agent")
{:ok, _} = MultiAgentSystem.start_agent(system.supervisor, "Analysis-Agent")
{:ok, _} = MultiAgentSystem.start_agent(system.supervisor, "Summary-Agent")
Process.sleep(200)
# List agents
agents = MultiAgentSystem.list_agents(system.supervisor)
IO.puts("Running agents: #{inspect(agents)}")
# Send tasks to agents
IO.puts("\n=== Sending Tasks ===")
{:ok, search_result} = MultiAgentSystem.send_task("Research-Agent", :search, %{query: "Elixir OTP"})
IO.puts("Search result: #{inspect(search_result)}")
{:ok, analyze_result} = MultiAgentSystem.send_task("Analysis-Agent", :analyze, %{data: "some data"})
IO.puts("Analysis result: #{inspect(analyze_result)}")
{:ok, summary_result} = MultiAgentSystem.send_task("Summary-Agent", :summarize, %{text: "This is a long text that needs to be summarized for easier reading."})
IO.puts("Summary result: #{inspect(summary_result)}")
# Delegate between agents
IO.puts("\n=== Agent Delegation ===")
# Research agent delegates analysis to Analysis agent
MultiAgentSystem.delegate_task("Research-Agent", "Analysis-Agent", :analyze, %{data: "research findings"})
Process.sleep(500)
# Test fault tolerance
IO.puts("\n=== Fault Tolerance Test ===")
# Crash an agent
case Registry.lookup(MultiAgentRegistry, {:agent, "Research-Agent"}) do
[{pid, _}] ->
IO.puts("Crashing Research-Agent (#{inspect(pid)})")
send(pid, :crash)
[] ->
IO.puts("Agent not found")
end
Process.sleep(500)
# Agent should be restarted
agents_after = MultiAgentSystem.list_agents(system.supervisor)
IO.puts("Agents after crash: #{inspect(agents_after)}")
# Send another task - should work
{:ok, result} = MultiAgentSystem.send_task("Research-Agent", :search, %{query: "after crash"})
IO.puts("Task after crash: #{inspect(result)}")
# Cleanup
IO.puts("\n=== Shutdown ===")
send(system.supervisor, :shutdown)
6. Summary
Architecture Patterns
| Pattern | Use Case |
|---|---|
| Process per entity | Independent lifecycle (agents, sessions) |
| Registry | Named process lookup |
| Coordinator | Task distribution to workers |
| Supervisor | Fault tolerance and restart |
| Router | Message dispatch based on rules |
Design Checklist
- [ ] Identify entities that need processes
- [ ] Define clear message protocols
- [ ] Use Registry for discovery
- [ ] Implement supervision for fault tolerance
- [ ] Abstract process interaction behind clean APIs
- [ ] Handle cleanup on process death
Key Takeaways
- Registry provides dynamic name-based process lookup
- Protocols define structured inter-process communication
- Coordinators orchestrate work across multiple workers
- Supervision enables automatic recovery from failures
- Clean APIs hide message-passing complexity from users
7. Practice Exercises
Exercise 1: Load Balancer
Implement a load balancer that distributes tasks across workers based on their current load.
Exercise 2: Pub/Sub
Implement a publish-subscribe system where agents can subscribe to topics and receive broadcasts.
Exercise 3: Task Pipeline
Create a pipeline where tasks flow through multiple agents:
- Research Agent → finds data
- Analysis Agent → processes data
- Summary Agent → summarizes results
Next Session Preview
In Session 11: Checkpoint - Process Agents, we’ll:
-
Integrate these concepts into the
agent_frameworkproject -
Create
ProcessAgentmodule -
Add
AgentRegistryfor named agents -
Implement
AgentMonitorfor fault tolerance - Write comprehensive tests