Powered by AppSignal & Oban Pro

Session 10: Designing a Concurrent Application

10_designing_concurrent_app.livemd

Session 10: Designing a Concurrent Application

Mix.install([])

Learning Goals

By the end of this session, you will:

  1. Design process architectures for concurrent applications
  2. Use the Registry module for named process lookup
  3. Design message protocols for inter-process communication
  4. Implement coordinator patterns for orchestrating workers
  5. Build a complete multi-agent system

1. Designing Process Architecture

When building concurrent applications, the first step is identifying what processes you need.

Questions to Ask

  1. What entities have independent lifecycles?

    • Each agent can work independently → process per agent
    • Each task could be processed independently → process per task?
  2. What needs to maintain state?

    • Agents have memory → stateful processes
    • Coordinator tracks all agents → stateful process
  3. What can fail independently?

    • One agent crashing shouldn’t stop others → isolated processes
  4. What needs to communicate?

    • Agents send messages to each other → message passing

Multi-Agent System Architecture

┌──────────────────────────────────────────────────────────────────┐
│                        Application                                │
├──────────────────────────────────────────────────────────────────┤
│  ┌────────────────────────────────────────────────────────────┐  │
│  │                     AgentSupervisor                         │  │
│  │  (monitors all agents, restarts on crash)                   │  │
│  └─────────────────────────┬──────────────────────────────────┘  │
│                            │                                      │
│     ┌──────────────────────┼───────────────────────┐              │
│     │                      │                       │              │
│  ┌──▼───┐              ┌───▼──┐              ┌────▼──┐           │
│  │Agent │              │Agent │              │Agent  │           │
│  │"A"   │◄────────────►│"B"   │◄────────────►│"C"    │           │
│  └──────┘   messages   └──────┘   messages   └───────┘           │
│                                                                   │
│  ┌────────────────────────────────────────────────────────────┐  │
│  │                     AgentRegistry                           │  │
│  │  (lookup agents by name)                                    │  │
│  └────────────────────────────────────────────────────────────┘  │
│                                                                   │
│  ┌────────────────────────────────────────────────────────────┐  │
│  │                     MessageRouter                           │  │
│  │  (routes messages between agents)                           │  │
│  └────────────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────────────┘

2. Process Registry

The Registry module allows you to register processes by name and look them up later.

Starting a Registry

# Start a registry for unique keys (one process per name)
{:ok, _} = Registry.start_link(keys: :unique, name: AgentRegistry)

IO.puts("Registry started!")

Registering Processes

# Spawn a process and register it
worker1 = spawn(fn ->
  # Register ourselves
  Registry.register(AgentRegistry, "worker-1", %{started_at: DateTime.utc_now()})

  receive do
    msg -> IO.puts("Worker-1 received: #{inspect(msg)}")
  end
end)

worker2 = spawn(fn ->
  Registry.register(AgentRegistry, "worker-2", %{started_at: DateTime.utc_now()})

  receive do
    msg -> IO.puts("Worker-2 received: #{inspect(msg)}")
  end
end)

# Give them time to register
Process.sleep(100)

IO.puts("Registered workers")

Looking Up Processes

# Lookup by name
case Registry.lookup(AgentRegistry, "worker-1") do
  [{pid, metadata}] ->
    IO.puts("Found worker-1: #{inspect(pid)}")
    IO.puts("Metadata: #{inspect(metadata)}")

  [] ->
    IO.puts("worker-1 not found")
end

# Send a message to a named process
case Registry.lookup(AgentRegistry, "worker-2") do
  [{pid, _}] ->
    send(pid, {:hello, "from registry lookup"})

  [] ->
    IO.puts("worker-2 not found")
end

Process.sleep(100)

Automatic Cleanup

When a registered process dies, it’s automatically removed from the Registry:

# Create and register a process
temp = spawn(fn ->
  Registry.register(AgentRegistry, "temporary", %{})
  Process.sleep(500)
  # Process exits after sleep
end)

Process.sleep(100)

# It's registered
case Registry.lookup(AgentRegistry, "temporary") do
  [{pid, _}] -> IO.puts("Found temporary: #{inspect(pid)}")
  [] -> IO.puts("Not found")
end

# Wait for it to exit
Process.sleep(600)

# Automatically unregistered
case Registry.lookup(AgentRegistry, "temporary") do
  [{pid, _}] -> IO.puts("Still found: #{inspect(pid)}")
  [] -> IO.puts("Temporary was automatically unregistered!")
end

3. Message Protocol Design

Good message protocols make systems easier to understand and maintain.

Protocol Design Principles

  1. Consistent structure: Use tagged tuples {:action, data}
  2. Include sender: Caller can receive replies {:request, data, from_pid}
  3. Use references: Correlate requests with responses {:request, ref, data, from_pid}
  4. Document expected responses: What does each request return?

Defining Our Agent Protocol

defmodule AgentProtocol do
  @moduledoc """
  Message protocol for inter-agent communication.
  Builds on the Phase 1 Message struct concepts.
  """

  # Generate unique message IDs
  def new_id do
    :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
  end

  # --- Task Messages ---

  @doc "Create a task request"
  def task(action, params, from_pid) do
    {:task, %{
      id: new_id(),
      action: action,
      params: params,
      from: from_pid,
      timestamp: DateTime.utc_now()
    }}
  end

  @doc "Create a task response"
  def task_response(task_id, result) do
    {:task_response, %{
      task_id: task_id,
      result: result,
      timestamp: DateTime.utc_now()
    }}
  end

  # --- Query Messages ---

  def query(query_type, from_pid) do
    {:query, %{
      id: new_id(),
      type: query_type,
      from: from_pid
    }}
  end

  def query_response(query_id, data) do
    {:query_response, %{
      query_id: query_id,
      data: data
    }}
  end

  # --- System Messages ---

  def status_request(from_pid) do
    {:status_request, from_pid}
  end

  def status_response(status) do
    {:status_response, status}
  end

  def shutdown do
    :shutdown
  end

  # --- Agent-to-Agent Messages ---

  def delegate(target_agent, task, from_agent) do
    {:delegate, %{
      id: new_id(),
      target: target_agent,
      task: task,
      from: from_agent
    }}
  end

  def delegation_result(delegation_id, result, from_agent) do
    {:delegation_result, %{
      delegation_id: delegation_id,
      result: result,
      from: from_agent
    }}
  end
end

# Test the protocol
task = AgentProtocol.task(:search, %{query: "OTP"}, self())
IO.puts("Task message: #{inspect(task)}")

response = AgentProtocol.task_response("abc123", {:ok, ["result1", "result2"]})
IO.puts("Response message: #{inspect(response)}")

4. Coordinator Pattern

A coordinator manages a group of workers, distributing tasks and collecting results.

defmodule Coordinator do
  @moduledoc """
  Coordinates multiple worker processes.
  - Distributes tasks to available workers
  - Collects results
  - Handles worker failures
  """

  def start(worker_module, num_workers) do
    spawn(fn ->
      # Start workers
      workers = for i <- 1..num_workers do
        spawn_link(fn -> worker_module.run("Worker-#{i}") end)
      end

      IO.puts("[Coordinator] Started #{num_workers} workers")
      loop(%{
        workers: workers,
        available: workers,
        pending_tasks: [],
        results: []
      })
    end)
  end

  defp loop(state) do
    # Try to dispatch pending tasks to available workers
    state = dispatch_tasks(state)

    receive do
      {:submit, task, from} ->
        IO.puts("[Coordinator] Received task: #{inspect(task)}")
        new_pending = state.pending_tasks ++ [{task, from}]
        loop(%{state | pending_tasks: new_pending})

      {:task_complete, worker, task, result} ->
        IO.puts("[Coordinator] Task complete from #{inspect(worker)}")
        # Worker becomes available again
        loop(%{state |
          available: [worker | state.available],
          results: [{task, result} | state.results]
        })

      {:get_results, from} ->
        send(from, {:results, Enum.reverse(state.results)})
        loop(state)

      {:status, from} ->
        status = %{
          total_workers: length(state.workers),
          available: length(state.available),
          pending_tasks: length(state.pending_tasks),
          completed: length(state.results)
        }
        send(from, {:status, status})
        loop(state)

      :shutdown ->
        IO.puts("[Coordinator] Shutting down workers...")
        Enum.each(state.workers, &amp;send(&amp;1, :stop))
        :ok
    after
      5000 ->
        IO.puts("[Coordinator] Idle...")
        loop(state)
    end
  end

  defp dispatch_tasks(%{available: [], pending_tasks: pending} = state) when pending != [] do
    IO.puts("[Coordinator] No available workers, #{length(pending)} tasks waiting")
    state
  end

  defp dispatch_tasks(%{pending_tasks: []} = state) do
    state
  end

  defp dispatch_tasks(%{available: [worker | rest], pending_tasks: [{task, from} | remaining]} = state) do
    IO.puts("[Coordinator] Dispatching task to #{inspect(worker)}")
    send(worker, {:work, task, self()})
    %{state | available: rest, pending_tasks: remaining}
  end
end

Simple Worker

defmodule SimpleWorker do
  def run(name) do
    IO.puts("[#{name}] Started")
    loop(name)
  end

  defp loop(name) do
    receive do
      {:work, task, coordinator} ->
        IO.puts("[#{name}] Processing: #{inspect(task)}")
        # Simulate work
        Process.sleep(:rand.uniform(500) + 200)
        result = "Processed #{inspect(task)} by #{name}"
        send(coordinator, {:task_complete, self(), task, result})
        loop(name)

      :stop ->
        IO.puts("[#{name}] Stopping")
        :ok
    end
  end
end

Test the Coordinator

# Start coordinator with 3 workers
coordinator = Coordinator.start(SimpleWorker, 3)
Process.sleep(200)

# Submit tasks
for i <- 1..5 do
  send(coordinator, {:submit, "Task-#{i}", self()})
end

# Check status
send(coordinator, {:status, self()})
receive do
  {:status, status} -> IO.puts("Status: #{inspect(status)}")
end

# Wait for processing
Process.sleep(3000)

# Get results
send(coordinator, {:get_results, self()})
receive do
  {:results, results} ->
    IO.puts("\nCompleted tasks:")
    for {task, result} <- results do
      IO.puts("  #{task}: #{result}")
    end
end

send(coordinator, :shutdown)

5. Building a Multi-Agent System

Now let’s put it all together into a complete multi-agent system.

defmodule MultiAgentSystem do
  @moduledoc """
  A complete multi-agent system with:
  - Registry for agent discovery
  - Supervisor for fault tolerance
  - Message router for communication
  - Protocol-based messaging
  """

  # ========================================
  # System Startup
  # ========================================

  def start do
    # Start registry
    {:ok, _} = Registry.start_link(keys: :unique, name: MultiAgentRegistry)

    # Start supervisor
    supervisor = spawn(fn ->
      Process.flag(:trap_exit, true)
      supervisor_loop(%{agents: %{}})
    end)

    # Start message router
    router = spawn(fn ->
      router_loop()
    end)

    # Register system components
    Registry.register(MultiAgentRegistry, :supervisor, %{})
    spawn(fn ->
      Registry.register(MultiAgentRegistry, :router, %{})
      # Keep router registration alive
      receive do
        :stop -> :ok
      end
    end)

    %{supervisor: supervisor, router: router}
  end

  # ========================================
  # Supervisor
  # ========================================

  defp supervisor_loop(state) do
    receive do
      {:start_agent, name, config, from} ->
        case Map.get(state.agents, name) do
          nil ->
            pid = start_agent_process(name, config)
            send(from, {:ok, pid})
            supervisor_loop(%{state | agents: Map.put(state.agents, name, pid)})

          _existing ->
            send(from, {:error, :already_exists})
            supervisor_loop(state)
        end

      {:EXIT, pid, reason} ->
        # Find which agent crashed
        case find_agent_by_pid(state.agents, pid) do
          {name, _pid} ->
            IO.puts("[Supervisor] Agent #{name} crashed: #{inspect(reason)}")

            if reason != :normal and reason != :shutdown do
              IO.puts("[Supervisor] Restarting #{name}...")
              new_pid = start_agent_process(name, %{})
              supervisor_loop(%{state | agents: Map.put(state.agents, name, new_pid)})
            else
              supervisor_loop(%{state | agents: Map.delete(state.agents, name)})
            end

          nil ->
            supervisor_loop(state)
        end

      {:stop_agent, name, from} ->
        case Map.get(state.agents, name) do
          nil ->
            send(from, {:error, :not_found})
            supervisor_loop(state)

          pid ->
            send(pid, :shutdown)
            send(from, :ok)
            supervisor_loop(%{state | agents: Map.delete(state.agents, name)})
        end

      {:list_agents, from} ->
        send(from, {:agents, Map.keys(state.agents)})
        supervisor_loop(state)

      :shutdown ->
        IO.puts("[Supervisor] Shutting down all agents...")
        Enum.each(state.agents, fn {_name, pid} ->
          send(pid, :shutdown)
        end)
        :ok
    end
  end

  defp find_agent_by_pid(agents, pid) do
    Enum.find(agents, fn {_name, p} -> p == pid end)
  end

  defp start_agent_process(name, config) do
    pid = spawn_link(fn ->
      # Register with the registry
      Registry.register(MultiAgentRegistry, {:agent, name}, %{config: config})
      agent_loop(%{name: name, memory: %{}, inbox: [], config: config})
    end)

    IO.puts("[Supervisor] Started agent #{name}: #{inspect(pid)}")
    pid
  end

  # ========================================
  # Agent Process
  # ========================================

  defp agent_loop(state) do
    receive do
      # Memory operations
      {:remember, key, value} ->
        IO.puts("[#{state.name}] Remembering #{key}")
        agent_loop(%{state | memory: Map.put(state.memory, key, value)})

      {:recall, key, from} ->
        value = Map.get(state.memory, key)
        send(from, {:recalled, key, value})
        agent_loop(state)

      # Task operations
      {:task, task_msg} ->
        IO.puts("[#{state.name}] Received task: #{inspect(task_msg.action)}")
        result = handle_task(state, task_msg)

        # Send response back
        send(task_msg.from, AgentProtocol.task_response(task_msg.id, result))
        agent_loop(state)

      # Inter-agent delegation
      {:delegate, delegation} ->
        IO.puts("[#{state.name}] Delegation from #{delegation.from}: #{inspect(delegation.task)}")
        result = handle_task(state, %{action: delegation.task.action, params: delegation.task.params})

        # Send result back to delegating agent
        case Registry.lookup(MultiAgentRegistry, {:agent, delegation.from}) do
          [{from_pid, _}] ->
            send(from_pid, AgentProtocol.delegation_result(delegation.id, result, state.name))

          [] ->
            IO.puts("[#{state.name}] Warning: Delegating agent #{delegation.from} not found")
        end

        agent_loop(state)

      {:delegation_result, result_msg} ->
        IO.puts("[#{state.name}] Delegation result: #{inspect(result_msg.result)}")
        agent_loop(state)

      # Status
      {:status, from} ->
        status = %{
          name: state.name,
          memory_keys: Map.keys(state.memory),
          inbox_count: length(state.inbox)
        }
        send(from, {:status, status})
        agent_loop(state)

      # Control
      :crash ->
        IO.puts("[#{state.name}] Forced crash!")
        exit(:forced_crash)

      :shutdown ->
        IO.puts("[#{state.name}] Shutting down")
        :ok

      other ->
        IO.puts("[#{state.name}] Unknown message: #{inspect(other)}")
        agent_loop(state)
    after
      30000 ->
        IO.puts("[#{state.name}] Idle...")
        agent_loop(state)
    end
  end

  defp handle_task(state, %{action: :search, params: params}) do
    query = Map.get(params, :query, "")
    IO.puts("[#{state.name}] Searching: #{query}")
    Process.sleep(100)
    {:ok, ["Result 1 for #{query}", "Result 2 for #{query}"]}
  end

  defp handle_task(state, %{action: :analyze, params: params}) do
    data = Map.get(params, :data, "")
    IO.puts("[#{state.name}] Analyzing: #{data}")
    Process.sleep(150)
    {:ok, "Analysis of #{data}: positive"}
  end

  defp handle_task(state, %{action: :summarize, params: params}) do
    text = Map.get(params, :text, "")
    IO.puts("[#{state.name}] Summarizing text (#{String.length(text)} chars)")
    Process.sleep(200)
    {:ok, "Summary: #{String.slice(text, 0, 50)}..."}
  end

  defp handle_task(_state, task) do
    {:error, {:unknown_action, task.action}}
  end

  # ========================================
  # Message Router
  # ========================================

  defp router_loop do
    receive do
      {:route, to_agent, message} ->
        case Registry.lookup(MultiAgentRegistry, {:agent, to_agent}) do
          [{pid, _}] ->
            send(pid, message)
            IO.puts("[Router] Routed message to #{to_agent}")

          [] ->
            IO.puts("[Router] Agent #{to_agent} not found")
        end
        router_loop()

      {:broadcast, message} ->
        Registry.select(MultiAgentRegistry, [{{:"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]}])
        |> Enum.filter(fn {{:agent, _name}, _pid} -> true; _ -> false end)
        |> Enum.each(fn {{:agent, _name}, pid} ->
          send(pid, message)
        end)
        router_loop()

      :stop ->
        :ok
    end
  end

  # ========================================
  # Client API
  # ========================================

  def start_agent(supervisor, name, config \\ %{}) do
    send(supervisor, {:start_agent, name, config, self()})
    receive do
      {:ok, pid} -> {:ok, pid}
      {:error, reason} -> {:error, reason}
    after
      5000 -> {:error, :timeout}
    end
  end

  def stop_agent(supervisor, name) do
    send(supervisor, {:stop_agent, name, self()})
    receive do
      :ok -> :ok
      {:error, reason} -> {:error, reason}
    after
      5000 -> {:error, :timeout}
    end
  end

  def list_agents(supervisor) do
    send(supervisor, {:list_agents, self()})
    receive do
      {:agents, agents} -> agents
    after
      5000 -> []
    end
  end

  def send_task(agent_name, action, params) do
    case Registry.lookup(MultiAgentRegistry, {:agent, agent_name}) do
      [{pid, _}] ->
        task = AgentProtocol.task(action, params, self())
        send(pid, task)
        receive do
          {:task_response, response} ->
            {:ok, response.result}
        after
          10000 -> {:error, :timeout}
        end

      [] ->
        {:error, :agent_not_found}
    end
  end

  def delegate_task(from_agent, to_agent, action, params) do
    case Registry.lookup(MultiAgentRegistry, {:agent, to_agent}) do
      [{pid, _}] ->
        delegation = AgentProtocol.delegate(to_agent, %{action: action, params: params}, from_agent)
        send(pid, delegation)
        :ok

      [] ->
        {:error, :agent_not_found}
    end
  end
end

Test the Multi-Agent System

# Start the system
system = MultiAgentSystem.start()
Process.sleep(200)

# Start some agents
{:ok, _} = MultiAgentSystem.start_agent(system.supervisor, "Research-Agent")
{:ok, _} = MultiAgentSystem.start_agent(system.supervisor, "Analysis-Agent")
{:ok, _} = MultiAgentSystem.start_agent(system.supervisor, "Summary-Agent")

Process.sleep(200)

# List agents
agents = MultiAgentSystem.list_agents(system.supervisor)
IO.puts("Running agents: #{inspect(agents)}")
# Send tasks to agents
IO.puts("\n=== Sending Tasks ===")

{:ok, search_result} = MultiAgentSystem.send_task("Research-Agent", :search, %{query: "Elixir OTP"})
IO.puts("Search result: #{inspect(search_result)}")

{:ok, analyze_result} = MultiAgentSystem.send_task("Analysis-Agent", :analyze, %{data: "some data"})
IO.puts("Analysis result: #{inspect(analyze_result)}")

{:ok, summary_result} = MultiAgentSystem.send_task("Summary-Agent", :summarize, %{text: "This is a long text that needs to be summarized for easier reading."})
IO.puts("Summary result: #{inspect(summary_result)}")
# Delegate between agents
IO.puts("\n=== Agent Delegation ===")

# Research agent delegates analysis to Analysis agent
MultiAgentSystem.delegate_task("Research-Agent", "Analysis-Agent", :analyze, %{data: "research findings"})

Process.sleep(500)
# Test fault tolerance
IO.puts("\n=== Fault Tolerance Test ===")

# Crash an agent
case Registry.lookup(MultiAgentRegistry, {:agent, "Research-Agent"}) do
  [{pid, _}] ->
    IO.puts("Crashing Research-Agent (#{inspect(pid)})")
    send(pid, :crash)
  [] ->
    IO.puts("Agent not found")
end

Process.sleep(500)

# Agent should be restarted
agents_after = MultiAgentSystem.list_agents(system.supervisor)
IO.puts("Agents after crash: #{inspect(agents_after)}")

# Send another task - should work
{:ok, result} = MultiAgentSystem.send_task("Research-Agent", :search, %{query: "after crash"})
IO.puts("Task after crash: #{inspect(result)}")
# Cleanup
IO.puts("\n=== Shutdown ===")
send(system.supervisor, :shutdown)

6. Summary

Architecture Patterns

Pattern Use Case
Process per entity Independent lifecycle (agents, sessions)
Registry Named process lookup
Coordinator Task distribution to workers
Supervisor Fault tolerance and restart
Router Message dispatch based on rules

Design Checklist

  • [ ] Identify entities that need processes
  • [ ] Define clear message protocols
  • [ ] Use Registry for discovery
  • [ ] Implement supervision for fault tolerance
  • [ ] Abstract process interaction behind clean APIs
  • [ ] Handle cleanup on process death

Key Takeaways

  1. Registry provides dynamic name-based process lookup
  2. Protocols define structured inter-process communication
  3. Coordinators orchestrate work across multiple workers
  4. Supervision enables automatic recovery from failures
  5. Clean APIs hide message-passing complexity from users

7. Practice Exercises

Exercise 1: Load Balancer

Implement a load balancer that distributes tasks across workers based on their current load.

Exercise 2: Pub/Sub

Implement a publish-subscribe system where agents can subscribe to topics and receive broadcasts.

Exercise 3: Task Pipeline

Create a pipeline where tasks flow through multiple agents:

  1. Research Agent → finds data
  2. Analysis Agent → processes data
  3. Summary Agent → summarizes results

Next Session Preview

In Session 11: Checkpoint - Process Agents, we’ll:

  • Integrate these concepts into the agent_framework project
  • Create ProcessAgent module
  • Add AgentRegistry for named agents
  • Implement AgentMonitor for fault tolerance
  • Write comprehensive tests