Session 8: More On Multiprocessing
Mix.install([])
Learning Goals
By the end of this session, you will:
- Understand how to maintain state across messages using recursive loops
- Master selective receive and pattern matching priority
- Learn about mailbox behavior and performance considerations
-
Use timeouts effectively with the
afterclause - Build a fully stateful agent process
1. Process State with Recursion
In Session 7, we saw processes that handle a single message and exit. For useful processes, we need them to:
- Stay alive - Handle multiple messages
- Maintain state - Remember things between messages
The key insight: State is passed as function arguments in a recursive loop.
Why Recursion Works
# This would stack overflow in most languages!
defmodule InfiniteLoop do
def count_forever(n) do
IO.puts("Count: #{n}")
Process.sleep(500)
count_forever(n + 1) # Recursive call
end
end
# Don't run this directly - it would print forever!
# But it won't crash due to Tail Call Optimization
Tail Call Optimization (TCO): When the last thing a function does is call itself (or another function), Elixir reuses the current stack frame instead of creating a new one. This allows infinite recursion without stack overflow.
Basic Stateful Process Pattern
defmodule Counter do
@moduledoc """
A process that maintains a count.
State is carried through the recursive loop.
"""
def start(initial \\ 0) do
spawn(fn -> loop(initial) end)
end
# The loop function is the heart of the process
# `count` is the STATE - it persists across messages
defp loop(count) do
receive do
:increment ->
# Process message, update state, continue looping
loop(count + 1)
{:increment_by, amount} ->
loop(count + amount)
:decrement ->
loop(count - 1)
{:get, from} ->
# Send current state to requester
send(from, {:count, count})
# Continue with unchanged state
loop(count)
:reset ->
loop(0)
:stop ->
IO.puts("Counter stopping with final count: #{count}")
# Not calling loop() means process ends
:ok
end
end
end
# Let's test it
counter = Counter.start(10)
send(counter, :increment)
send(counter, :increment)
send(counter, {:increment_by, 5})
send(counter, :decrement)
send(counter, {:get, self()})
receive do
{:count, n} -> IO.puts("Current count: #{n}")
end
send(counter, :stop)
Visualizing State Flow
loop(10)
├─ receive :increment
│ └─ loop(11)
│ ├─ receive :increment
│ │ └─ loop(12)
│ │ ├─ receive {:increment_by, 5}
│ │ │ └─ loop(17)
│ │ │ ├─ receive :decrement
│ │ │ │ └─ loop(16)
│ │ │ │ └─ ... continues
2. Complex State: Maps and Structs
Real processes need complex state. Let’s build a “Fridge” process that stores and retrieves items.
defmodule Fridge do
@moduledoc """
A process that stores items like a fridge.
Demonstrates complex state (list of items).
"""
def start do
spawn(fn -> loop([]) end)
end
defp loop(items) do
receive do
{:store, item} ->
IO.puts("Storing #{item}")
loop([item | items]) # Add to front of list
{:take, item, from} ->
case Enum.member?(items, item) do
true ->
send(from, {:ok, item})
loop(List.delete(items, item)) # Remove item
false ->
send(from, {:error, :not_found})
loop(items) # State unchanged
end
{:contents, from} ->
send(from, {:contents, items})
loop(items)
:stop ->
IO.puts("Fridge contents at shutdown: #{inspect(items)}")
end
end
end
fridge = Fridge.start()
send(fridge, {:store, :milk})
send(fridge, {:store, :eggs})
send(fridge, {:store, :cheese})
send(fridge, {:take, :milk, self()})
receive do
{:ok, item} -> IO.puts("Took: #{item}")
{:error, reason} -> IO.puts("Error: #{reason}")
end
send(fridge, {:contents, self()})
receive do
{:contents, items} -> IO.puts("In fridge: #{inspect(items)}")
end
send(fridge, :stop)
State as a Map
For key-value state, maps are more practical:
defmodule KeyValueStore do
def start do
spawn(fn -> loop(%{}) end)
end
defp loop(store) do
receive do
{:put, key, value} ->
loop(Map.put(store, key, value))
{:get, key, from} ->
send(from, {:value, Map.get(store, key)})
loop(store)
{:delete, key} ->
loop(Map.delete(store, key))
{:all, from} ->
send(from, {:all, store})
loop(store)
{:update, key, fun} when is_function(fun, 1) ->
new_store = Map.update(store, key, nil, fun)
loop(new_store)
:stop ->
:ok
end
end
end
kv = KeyValueStore.start()
send(kv, {:put, :name, "Alice"})
send(kv, {:put, :score, 100})
send(kv, {:update, :score, fn s -> s + 10 end})
send(kv, {:get, :score, self()})
receive do
{:value, v} -> IO.puts("Score: #{v}")
end
send(kv, {:all, self()})
receive do
{:all, data} -> IO.puts("All data: #{inspect(data)}")
end
send(kv, :stop)
3. Selective Receive
Elixir’s receive uses pattern matching to selectively process messages. This enables priority handling - important messages can be processed first.
How Selective Receive Works
# Let's see selective receive in action
defmodule PriorityDemo do
def start do
pid = spawn(fn -> loop() end)
Process.sleep(100) # Give it time to start
pid
end
defp loop do
# This receive prioritizes :urgent messages
receive do
{:urgent, msg} ->
IO.puts("URGENT: #{msg}")
loop()
after
0 ->
# If no urgent messages, check for normal ones
receive do
{:normal, msg} ->
IO.puts("Normal: #{msg}")
loop()
{:urgent, msg} ->
IO.puts("URGENT: #{msg}")
loop()
:stop ->
IO.puts("Stopping")
after
5000 ->
IO.puts("Idle...")
loop()
end
end
end
end
Simpler Selective Receive Example
# Send messages to ourselves in a specific order
send(self(), {:normal, "Message 1"})
send(self(), {:normal, "Message 2"})
send(self(), {:urgent, "IMPORTANT!"})
send(self(), {:normal, "Message 3"})
# Now receive - pattern matching controls order
# Let's grab the urgent one first
receive do
{:urgent, msg} -> IO.puts("Got urgent: #{msg}")
end
# Check our mailbox - the others are still there
Process.info(self(), :messages) |> elem(1)
# Clean up remaining messages with explicit pattern
receive do
{:normal, msg} -> IO.puts("Normal 1: #{msg}")
end
receive do
{:normal, msg} -> IO.puts("Normal 2: #{msg}")
end
receive do
{:normal, msg} -> IO.puts("Normal 3: #{msg}")
end
The Save Queue
When receive doesn’t match a message, it’s moved to a temporary save queue. After the receive completes (either matching or timing out), save queue messages return to the mailbox.
Mailbox: [A, B, C, D]
receive do
C -> handle(C)
end
Process:
1. Check A - no match, move to save queue
2. Check B - no match, move to save queue
3. Check C - MATCH! Process it
4. Return A, B to mailbox, D stays
Mailbox: [A, B, D]
Performance Warning
Selective receive scans the entire mailbox. If your mailbox grows large with many unmatched messages, performance degrades significantly.
Best practice: Use catch-all clauses to prevent mailbox pollution:
defmodule SafeReceiver do
defp loop(state) do
receive do
{:important, data} ->
handle_important(data)
loop(state)
{:normal, data} ->
handle_normal(data)
loop(state)
# IMPORTANT: Catch-all prevents mailbox growth
unexpected ->
IO.puts("Warning: unexpected message: #{inspect(unexpected)}")
loop(state)
end
end
defp handle_important(data), do: IO.puts("Important: #{data}")
defp handle_normal(data), do: IO.puts("Normal: #{data}")
end
4. Timeouts
The after clause prevents processes from blocking forever.
Basic Timeout
defmodule TimeoutDemo do
def wait_for_message do
receive do
msg ->
IO.puts("Received: #{inspect(msg)}")
:ok
after
3000 ->
IO.puts("No message after 3 seconds")
:timeout
end
end
end
# This will timeout after 3 seconds
TimeoutDemo.wait_for_message()
Timeout Value: 0
A timeout of 0 means “check immediately and continue if nothing matches”:
# Flush all messages from mailbox
defmodule Mailbox do
def flush do
receive do
msg ->
IO.puts("Flushed: #{inspect(msg)}")
flush() # Keep flushing
after
0 -> :ok # No more messages
end
end
def peek do
receive do
msg ->
IO.puts("Peeked: #{inspect(msg)}")
# Put it back!
send(self(), msg)
after
0 -> IO.puts("Mailbox empty")
end
end
end
# Test flush
send(self(), :a)
send(self(), :b)
send(self(), :c)
Mailbox.flush()
Heartbeat Pattern
Timeouts enable health-check patterns:
defmodule Heartbeat do
def start(interval \\ 1000) do
spawn(fn -> loop(interval, 0) end)
end
defp loop(interval, beat_count) do
receive do
{:status, from} ->
send(from, {:alive, beat_count})
loop(interval, beat_count)
:stop ->
IO.puts("Heartbeat stopped after #{beat_count} beats")
after
interval ->
IO.puts("Heartbeat ##{beat_count + 1}")
loop(interval, beat_count + 1)
end
end
end
heartbeat = Heartbeat.start(500)
# Let it beat a few times
Process.sleep(2000)
# Check status
send(heartbeat, {:status, self()})
receive do
{:alive, n} -> IO.puts("Heartbeat is alive, #{n} beats so far")
end
send(heartbeat, :stop)
5. Building a Stateful Agent Process
Now let’s build a complete agent process that mirrors our AgentFramework.Agent struct from Phase 1, but as a living process.
defmodule StatefulAgent do
@moduledoc """
A complete agent implemented as a stateful process.
Mirrors the Phase 1 Agent struct functionality.
"""
# State structure
defstruct [:name, :state, :memory, :inbox, :processed_count]
# --- Public API ---
def start(name) do
initial_state = %__MODULE__{
name: name,
state: :idle,
memory: %{},
inbox: [],
processed_count: 0
}
spawn(fn -> loop(initial_state) end)
end
# --- Process Loop ---
defp loop(agent) do
receive do
# State queries
{:get_state, from} ->
send(from, {:agent_state, agent})
loop(agent)
{:get_status, from} ->
send(from, {:status, agent.state})
loop(agent)
# Memory operations
{:remember, key, value} ->
new_memory = Map.put(agent.memory, key, value)
loop(%{agent | memory: new_memory})
{:recall, key, from} ->
value = Map.get(agent.memory, key)
send(from, {:recalled, key, value})
loop(agent)
{:forget, key} ->
new_memory = Map.delete(agent.memory, key)
loop(%{agent | memory: new_memory})
:forget_all ->
loop(%{agent | memory: %{}})
# Inbox operations
{:receive_task, task} ->
new_inbox = agent.inbox ++ [task]
loop(%{agent | inbox: new_inbox})
{:inbox_count, from} ->
send(from, {:count, length(agent.inbox)})
loop(agent)
{:peek_task, from} ->
task = List.first(agent.inbox)
send(from, {:next_task, task})
loop(agent)
# Process next task
{:process_next, from} ->
case agent.inbox do
[] ->
send(from, {:empty, nil})
loop(agent)
[task | rest] ->
# Set to busy, process, then back to idle
agent = %{agent | state: :busy}
IO.puts("[#{agent.name}] Processing: #{inspect(task)}")
# Simulate processing based on task type
result = handle_task(agent, task)
agent = %{agent |
state: :idle,
inbox: rest,
processed_count: agent.processed_count + 1
}
send(from, {:processed, task, result})
loop(agent)
end
# Process all tasks
{:process_all, from} ->
{agent, results} = process_all_tasks(agent, [])
send(from, {:all_processed, results})
loop(agent)
# Shutdown
:stop ->
IO.puts("[#{agent.name}] Shutting down")
IO.puts(" - Processed: #{agent.processed_count} tasks")
IO.puts(" - Remaining in inbox: #{length(agent.inbox)}")
:ok
# Catch-all
other ->
IO.puts("[#{agent.name}] Unknown message: #{inspect(other)}")
loop(agent)
after
30000 ->
# Idle timeout - log and continue
IO.puts("[#{agent.name}] Idle for 30 seconds...")
loop(agent)
end
end
# --- Task Handling ---
defp handle_task(agent, %{action: :search, params: params}) do
query = Map.get(params, :query, "unknown")
IO.puts("[#{agent.name}] Searching for: #{query}")
{:ok, "Results for: #{query}"}
end
defp handle_task(agent, %{action: :summarize, params: params}) do
content = Map.get(params, :content, "")
IO.puts("[#{agent.name}] Summarizing content...")
{:ok, "Summary of #{String.length(content)} chars"}
end
defp handle_task(agent, %{action: :store, params: params}) do
key = Map.get(params, :key)
value = Map.get(params, :value)
IO.puts("[#{agent.name}] Storing #{key}")
{:ok, {key, value}}
end
defp handle_task(agent, task) do
IO.puts("[#{agent.name}] Unknown task type: #{inspect(task)}")
{:error, :unknown_task}
end
defp process_all_tasks(agent, results) do
case agent.inbox do
[] ->
{agent, Enum.reverse(results)}
[task | rest] ->
agent = %{agent | state: :busy}
result = handle_task(agent, task)
agent = %{agent |
state: :idle,
inbox: rest,
processed_count: agent.processed_count + 1
}
process_all_tasks(agent, [{task, result} | results])
end
end
end
Testing Our Stateful Agent
# Create an agent
agent = StatefulAgent.start("Research-Agent")
# Store some context in memory
send(agent, {:remember, :topic, "Elixir Concurrency"})
send(agent, {:remember, :deadline, "end of week"})
# Check memory
send(agent, {:recall, :topic, self()})
receive do
{:recalled, :topic, value} -> IO.puts("Topic: #{value}")
end
# Queue up some tasks
send(agent, {:receive_task, %{action: :search, params: %{query: "OTP GenServer"}}})
send(agent, {:receive_task, %{action: :summarize, params: %{content: "Long article..."}}})
send(agent, {:receive_task, %{action: :store, params: %{key: :result, value: "data"}}})
# Check inbox count
send(agent, {:inbox_count, self()})
receive do
{:count, n} -> IO.puts("Tasks in inbox: #{n}")
end
# Process one task
send(agent, {:process_next, self()})
receive do
{:processed, task, result} ->
IO.puts("Processed task: #{inspect(task.action)}")
IO.puts("Result: #{inspect(result)}")
end
# Process all remaining
send(agent, {:process_all, self()})
receive do
{:all_processed, results} ->
IO.puts("Processed #{length(results)} tasks")
for {task, result} <- results do
IO.puts(" - #{task.action}: #{inspect(result)}")
end
end
# Check final state
send(agent, {:get_state, self()})
receive do
{:agent_state, state} ->
IO.puts("\nFinal agent state:")
IO.puts(" Name: #{state.name}")
IO.puts(" Status: #{state.state}")
IO.puts(" Memory: #{inspect(state.memory)}")
IO.puts(" Processed: #{state.processed_count}")
end
send(agent, :stop)
6. Helper Functions (API Abstraction)
Sending messages directly is error-prone. Let’s create a clean API:
defmodule AgentClient do
@moduledoc """
Client API for interacting with StatefulAgent processes.
Hides the message-passing details.
"""
def start(name), do: StatefulAgent.start(name)
def remember(pid, key, value) do
send(pid, {:remember, key, value})
:ok
end
def recall(pid, key, timeout \\ 5000) do
send(pid, {:recall, key, self()})
receive do
{:recalled, ^key, value} -> {:ok, value}
after
timeout -> {:error, :timeout}
end
end
def send_task(pid, action, params \\ %{}) do
task = %{action: action, params: params}
send(pid, {:receive_task, task})
:ok
end
def process_next(pid, timeout \\ 5000) do
send(pid, {:process_next, self()})
receive do
{:processed, task, result} -> {:ok, task, result}
{:empty, _} -> {:error, :empty_inbox}
after
timeout -> {:error, :timeout}
end
end
def status(pid, timeout \\ 5000) do
send(pid, {:get_status, self()})
receive do
{:status, status} -> {:ok, status}
after
timeout -> {:error, :timeout}
end
end
def stop(pid) do
send(pid, :stop)
:ok
end
end
# Much cleaner usage!
agent = AgentClient.start("Worker-1")
AgentClient.remember(agent, :context, "Learning Elixir")
{:ok, context} = AgentClient.recall(agent, :context)
IO.puts("Context: #{context}")
AgentClient.send_task(agent, :search, %{query: "Elixir processes"})
{:ok, task, result} = AgentClient.process_next(agent)
IO.puts("Completed: #{task.action} -> #{inspect(result)}")
AgentClient.stop(agent)
7. Summary
Key Concepts
- Recursive loops maintain state - pass state as function argument
- Tail call optimization prevents stack overflow in infinite loops
- Selective receive uses pattern matching to prioritize messages
- Save queue holds unmatched messages during receive scanning
-
Timeouts (
after) prevent indefinite blocking - Zero timeout enables non-blocking mailbox checks
- API abstraction hides message-passing complexity
State Management Pattern
defmodule StatefulProcess do
def start(initial_state) do
spawn(fn -> loop(initial_state) end)
end
defp loop(state) do
receive do
{:query, from} ->
send(from, {:result, derive_from(state)})
loop(state) # State unchanged
{:command, data} ->
new_state = transform(state, data)
loop(new_state) # State updated
:stop ->
cleanup(state)
:ok # Process ends
unexpected ->
log_warning(unexpected)
loop(state) # Ignore and continue
after
timeout ->
idle_action(state)
loop(state)
end
end
end
Connection to Agent Framework
Our StatefulAgent module now provides:
-
Same fields as
AgentFramework.Agentstruct - But as a living process with its own lifecycle
- Real message processing with state transitions
- Memory operations that persist across interactions
8. Practice Exercises
Exercise 1: Rate Limiter
Create a process that limits operations to N per second:
defmodule RateLimiter do
# Implement a process that:
# - Allows max N operations per second
# - Tracks operation count
# - Resets count every second
# - Returns :ok or :rate_limited
end
Exercise 2: Message Buffer
Create a process that buffers messages and flushes periodically:
defmodule MessageBuffer do
# Implement a process that:
# - Collects incoming messages
# - Flushes to a callback after N messages OR timeout
# - Supports {:add, msg}, {:flush, from}, :auto_flush
end
Exercise 3: Job Queue
Create a process that manages a priority job queue:
defmodule JobQueue do
# Implement a process that:
# - Accepts jobs with :high, :normal, :low priority
# - Processes high priority first
# - Returns next job based on priority
end
Next Session Preview
In Session 9: Errors and Processes, we’ll explore:
- Process isolation and crash containment
- Links for bidirectional crash propagation
- Monitors for unidirectional crash notification
- Trapping exits to handle crashes gracefully
- The “Let it crash” philosophy