Powered by AppSignal & Oban Pro

Session 8: More On Multiprocessing

08_multiprocessing_state.livemd

Session 8: More On Multiprocessing

Mix.install([])

Learning Goals

By the end of this session, you will:

  1. Understand how to maintain state across messages using recursive loops
  2. Master selective receive and pattern matching priority
  3. Learn about mailbox behavior and performance considerations
  4. Use timeouts effectively with the after clause
  5. Build a fully stateful agent process

1. Process State with Recursion

In Session 7, we saw processes that handle a single message and exit. For useful processes, we need them to:

  1. Stay alive - Handle multiple messages
  2. Maintain state - Remember things between messages

The key insight: State is passed as function arguments in a recursive loop.

Why Recursion Works

# This would stack overflow in most languages!
defmodule InfiniteLoop do
  def count_forever(n) do
    IO.puts("Count: #{n}")
    Process.sleep(500)
    count_forever(n + 1)  # Recursive call
  end
end

# Don't run this directly - it would print forever!
# But it won't crash due to Tail Call Optimization

Tail Call Optimization (TCO): When the last thing a function does is call itself (or another function), Elixir reuses the current stack frame instead of creating a new one. This allows infinite recursion without stack overflow.

Basic Stateful Process Pattern

defmodule Counter do
  @moduledoc """
  A process that maintains a count.
  State is carried through the recursive loop.
  """

  def start(initial \\ 0) do
    spawn(fn -> loop(initial) end)
  end

  # The loop function is the heart of the process
  # `count` is the STATE - it persists across messages
  defp loop(count) do
    receive do
      :increment ->
        # Process message, update state, continue looping
        loop(count + 1)

      {:increment_by, amount} ->
        loop(count + amount)

      :decrement ->
        loop(count - 1)

      {:get, from} ->
        # Send current state to requester
        send(from, {:count, count})
        # Continue with unchanged state
        loop(count)

      :reset ->
        loop(0)

      :stop ->
        IO.puts("Counter stopping with final count: #{count}")
        # Not calling loop() means process ends
        :ok
    end
  end
end

# Let's test it
counter = Counter.start(10)

send(counter, :increment)
send(counter, :increment)
send(counter, {:increment_by, 5})
send(counter, :decrement)

send(counter, {:get, self()})
receive do
  {:count, n} -> IO.puts("Current count: #{n}")
end

send(counter, :stop)

Visualizing State Flow

loop(10)
  ├─ receive :increment
  │     └─ loop(11)
  │          ├─ receive :increment
  │          │     └─ loop(12)
  │          │          ├─ receive {:increment_by, 5}
  │          │          │     └─ loop(17)
  │          │          │          ├─ receive :decrement
  │          │          │          │     └─ loop(16)
  │          │          │          │          └─ ... continues

2. Complex State: Maps and Structs

Real processes need complex state. Let’s build a “Fridge” process that stores and retrieves items.

defmodule Fridge do
  @moduledoc """
  A process that stores items like a fridge.
  Demonstrates complex state (list of items).
  """

  def start do
    spawn(fn -> loop([]) end)
  end

  defp loop(items) do
    receive do
      {:store, item} ->
        IO.puts("Storing #{item}")
        loop([item | items])  # Add to front of list

      {:take, item, from} ->
        case Enum.member?(items, item) do
          true ->
            send(from, {:ok, item})
            loop(List.delete(items, item))  # Remove item

          false ->
            send(from, {:error, :not_found})
            loop(items)  # State unchanged
        end

      {:contents, from} ->
        send(from, {:contents, items})
        loop(items)

      :stop ->
        IO.puts("Fridge contents at shutdown: #{inspect(items)}")
    end
  end
end

fridge = Fridge.start()

send(fridge, {:store, :milk})
send(fridge, {:store, :eggs})
send(fridge, {:store, :cheese})

send(fridge, {:take, :milk, self()})
receive do
  {:ok, item} -> IO.puts("Took: #{item}")
  {:error, reason} -> IO.puts("Error: #{reason}")
end

send(fridge, {:contents, self()})
receive do
  {:contents, items} -> IO.puts("In fridge: #{inspect(items)}")
end

send(fridge, :stop)

State as a Map

For key-value state, maps are more practical:

defmodule KeyValueStore do
  def start do
    spawn(fn -> loop(%{}) end)
  end

  defp loop(store) do
    receive do
      {:put, key, value} ->
        loop(Map.put(store, key, value))

      {:get, key, from} ->
        send(from, {:value, Map.get(store, key)})
        loop(store)

      {:delete, key} ->
        loop(Map.delete(store, key))

      {:all, from} ->
        send(from, {:all, store})
        loop(store)

      {:update, key, fun} when is_function(fun, 1) ->
        new_store = Map.update(store, key, nil, fun)
        loop(new_store)

      :stop ->
        :ok
    end
  end
end

kv = KeyValueStore.start()

send(kv, {:put, :name, "Alice"})
send(kv, {:put, :score, 100})
send(kv, {:update, :score, fn s -> s + 10 end})

send(kv, {:get, :score, self()})
receive do
  {:value, v} -> IO.puts("Score: #{v}")
end

send(kv, {:all, self()})
receive do
  {:all, data} -> IO.puts("All data: #{inspect(data)}")
end

send(kv, :stop)

3. Selective Receive

Elixir’s receive uses pattern matching to selectively process messages. This enables priority handling - important messages can be processed first.

How Selective Receive Works

# Let's see selective receive in action
defmodule PriorityDemo do
  def start do
    pid = spawn(fn -> loop() end)
    Process.sleep(100)  # Give it time to start
    pid
  end

  defp loop do
    # This receive prioritizes :urgent messages
    receive do
      {:urgent, msg} ->
        IO.puts("URGENT: #{msg}")
        loop()
    after
      0 ->
        # If no urgent messages, check for normal ones
        receive do
          {:normal, msg} ->
            IO.puts("Normal: #{msg}")
            loop()

          {:urgent, msg} ->
            IO.puts("URGENT: #{msg}")
            loop()

          :stop ->
            IO.puts("Stopping")
        after
          5000 ->
            IO.puts("Idle...")
            loop()
        end
    end
  end
end

Simpler Selective Receive Example

# Send messages to ourselves in a specific order
send(self(), {:normal, "Message 1"})
send(self(), {:normal, "Message 2"})
send(self(), {:urgent, "IMPORTANT!"})
send(self(), {:normal, "Message 3"})

# Now receive - pattern matching controls order
# Let's grab the urgent one first
receive do
  {:urgent, msg} -> IO.puts("Got urgent: #{msg}")
end

# Check our mailbox - the others are still there
Process.info(self(), :messages) |> elem(1)
# Clean up remaining messages with explicit pattern
receive do
  {:normal, msg} -> IO.puts("Normal 1: #{msg}")
end
receive do
  {:normal, msg} -> IO.puts("Normal 2: #{msg}")
end
receive do
  {:normal, msg} -> IO.puts("Normal 3: #{msg}")
end

The Save Queue

When receive doesn’t match a message, it’s moved to a temporary save queue. After the receive completes (either matching or timing out), save queue messages return to the mailbox.

Mailbox: [A, B, C, D]

receive do
  C -> handle(C)
end

Process:
1. Check A - no match, move to save queue
2. Check B - no match, move to save queue
3. Check C - MATCH! Process it
4. Return A, B to mailbox, D stays

Mailbox: [A, B, D]

Performance Warning

Selective receive scans the entire mailbox. If your mailbox grows large with many unmatched messages, performance degrades significantly.

Best practice: Use catch-all clauses to prevent mailbox pollution:

defmodule SafeReceiver do
  defp loop(state) do
    receive do
      {:important, data} ->
        handle_important(data)
        loop(state)

      {:normal, data} ->
        handle_normal(data)
        loop(state)

      # IMPORTANT: Catch-all prevents mailbox growth
      unexpected ->
        IO.puts("Warning: unexpected message: #{inspect(unexpected)}")
        loop(state)
    end
  end

  defp handle_important(data), do: IO.puts("Important: #{data}")
  defp handle_normal(data), do: IO.puts("Normal: #{data}")
end

4. Timeouts

The after clause prevents processes from blocking forever.

Basic Timeout

defmodule TimeoutDemo do
  def wait_for_message do
    receive do
      msg ->
        IO.puts("Received: #{inspect(msg)}")
        :ok
    after
      3000 ->
        IO.puts("No message after 3 seconds")
        :timeout
    end
  end
end

# This will timeout after 3 seconds
TimeoutDemo.wait_for_message()

Timeout Value: 0

A timeout of 0 means “check immediately and continue if nothing matches”:

# Flush all messages from mailbox
defmodule Mailbox do
  def flush do
    receive do
      msg ->
        IO.puts("Flushed: #{inspect(msg)}")
        flush()  # Keep flushing
    after
      0 -> :ok  # No more messages
    end
  end

  def peek do
    receive do
      msg ->
        IO.puts("Peeked: #{inspect(msg)}")
        # Put it back!
        send(self(), msg)
    after
      0 -> IO.puts("Mailbox empty")
    end
  end
end

# Test flush
send(self(), :a)
send(self(), :b)
send(self(), :c)
Mailbox.flush()

Heartbeat Pattern

Timeouts enable health-check patterns:

defmodule Heartbeat do
  def start(interval \\ 1000) do
    spawn(fn -> loop(interval, 0) end)
  end

  defp loop(interval, beat_count) do
    receive do
      {:status, from} ->
        send(from, {:alive, beat_count})
        loop(interval, beat_count)

      :stop ->
        IO.puts("Heartbeat stopped after #{beat_count} beats")
    after
      interval ->
        IO.puts("Heartbeat ##{beat_count + 1}")
        loop(interval, beat_count + 1)
    end
  end
end

heartbeat = Heartbeat.start(500)

# Let it beat a few times
Process.sleep(2000)

# Check status
send(heartbeat, {:status, self()})
receive do
  {:alive, n} -> IO.puts("Heartbeat is alive, #{n} beats so far")
end

send(heartbeat, :stop)

5. Building a Stateful Agent Process

Now let’s build a complete agent process that mirrors our AgentFramework.Agent struct from Phase 1, but as a living process.

defmodule StatefulAgent do
  @moduledoc """
  A complete agent implemented as a stateful process.
  Mirrors the Phase 1 Agent struct functionality.
  """

  # State structure
  defstruct [:name, :state, :memory, :inbox, :processed_count]

  # --- Public API ---

  def start(name) do
    initial_state = %__MODULE__{
      name: name,
      state: :idle,
      memory: %{},
      inbox: [],
      processed_count: 0
    }
    spawn(fn -> loop(initial_state) end)
  end

  # --- Process Loop ---

  defp loop(agent) do
    receive do
      # State queries
      {:get_state, from} ->
        send(from, {:agent_state, agent})
        loop(agent)

      {:get_status, from} ->
        send(from, {:status, agent.state})
        loop(agent)

      # Memory operations
      {:remember, key, value} ->
        new_memory = Map.put(agent.memory, key, value)
        loop(%{agent | memory: new_memory})

      {:recall, key, from} ->
        value = Map.get(agent.memory, key)
        send(from, {:recalled, key, value})
        loop(agent)

      {:forget, key} ->
        new_memory = Map.delete(agent.memory, key)
        loop(%{agent | memory: new_memory})

      :forget_all ->
        loop(%{agent | memory: %{}})

      # Inbox operations
      {:receive_task, task} ->
        new_inbox = agent.inbox ++ [task]
        loop(%{agent | inbox: new_inbox})

      {:inbox_count, from} ->
        send(from, {:count, length(agent.inbox)})
        loop(agent)

      {:peek_task, from} ->
        task = List.first(agent.inbox)
        send(from, {:next_task, task})
        loop(agent)

      # Process next task
      {:process_next, from} ->
        case agent.inbox do
          [] ->
            send(from, {:empty, nil})
            loop(agent)

          [task | rest] ->
            # Set to busy, process, then back to idle
            agent = %{agent | state: :busy}
            IO.puts("[#{agent.name}] Processing: #{inspect(task)}")

            # Simulate processing based on task type
            result = handle_task(agent, task)

            agent = %{agent |
              state: :idle,
              inbox: rest,
              processed_count: agent.processed_count + 1
            }

            send(from, {:processed, task, result})
            loop(agent)
        end

      # Process all tasks
      {:process_all, from} ->
        {agent, results} = process_all_tasks(agent, [])
        send(from, {:all_processed, results})
        loop(agent)

      # Shutdown
      :stop ->
        IO.puts("[#{agent.name}] Shutting down")
        IO.puts("  - Processed: #{agent.processed_count} tasks")
        IO.puts("  - Remaining in inbox: #{length(agent.inbox)}")
        :ok

      # Catch-all
      other ->
        IO.puts("[#{agent.name}] Unknown message: #{inspect(other)}")
        loop(agent)

    after
      30000 ->
        # Idle timeout - log and continue
        IO.puts("[#{agent.name}] Idle for 30 seconds...")
        loop(agent)
    end
  end

  # --- Task Handling ---

  defp handle_task(agent, %{action: :search, params: params}) do
    query = Map.get(params, :query, "unknown")
    IO.puts("[#{agent.name}] Searching for: #{query}")
    {:ok, "Results for: #{query}"}
  end

  defp handle_task(agent, %{action: :summarize, params: params}) do
    content = Map.get(params, :content, "")
    IO.puts("[#{agent.name}] Summarizing content...")
    {:ok, "Summary of #{String.length(content)} chars"}
  end

  defp handle_task(agent, %{action: :store, params: params}) do
    key = Map.get(params, :key)
    value = Map.get(params, :value)
    IO.puts("[#{agent.name}] Storing #{key}")
    {:ok, {key, value}}
  end

  defp handle_task(agent, task) do
    IO.puts("[#{agent.name}] Unknown task type: #{inspect(task)}")
    {:error, :unknown_task}
  end

  defp process_all_tasks(agent, results) do
    case agent.inbox do
      [] ->
        {agent, Enum.reverse(results)}

      [task | rest] ->
        agent = %{agent | state: :busy}
        result = handle_task(agent, task)
        agent = %{agent |
          state: :idle,
          inbox: rest,
          processed_count: agent.processed_count + 1
        }
        process_all_tasks(agent, [{task, result} | results])
    end
  end
end

Testing Our Stateful Agent

# Create an agent
agent = StatefulAgent.start("Research-Agent")

# Store some context in memory
send(agent, {:remember, :topic, "Elixir Concurrency"})
send(agent, {:remember, :deadline, "end of week"})

# Check memory
send(agent, {:recall, :topic, self()})
receive do
  {:recalled, :topic, value} -> IO.puts("Topic: #{value}")
end

# Queue up some tasks
send(agent, {:receive_task, %{action: :search, params: %{query: "OTP GenServer"}}})
send(agent, {:receive_task, %{action: :summarize, params: %{content: "Long article..."}}})
send(agent, {:receive_task, %{action: :store, params: %{key: :result, value: "data"}}})

# Check inbox count
send(agent, {:inbox_count, self()})
receive do
  {:count, n} -> IO.puts("Tasks in inbox: #{n}")
end

# Process one task
send(agent, {:process_next, self()})
receive do
  {:processed, task, result} ->
    IO.puts("Processed task: #{inspect(task.action)}")
    IO.puts("Result: #{inspect(result)}")
end

# Process all remaining
send(agent, {:process_all, self()})
receive do
  {:all_processed, results} ->
    IO.puts("Processed #{length(results)} tasks")
    for {task, result} <- results do
      IO.puts("  - #{task.action}: #{inspect(result)}")
    end
end

# Check final state
send(agent, {:get_state, self()})
receive do
  {:agent_state, state} ->
    IO.puts("\nFinal agent state:")
    IO.puts("  Name: #{state.name}")
    IO.puts("  Status: #{state.state}")
    IO.puts("  Memory: #{inspect(state.memory)}")
    IO.puts("  Processed: #{state.processed_count}")
end

send(agent, :stop)

6. Helper Functions (API Abstraction)

Sending messages directly is error-prone. Let’s create a clean API:

defmodule AgentClient do
  @moduledoc """
  Client API for interacting with StatefulAgent processes.
  Hides the message-passing details.
  """

  def start(name), do: StatefulAgent.start(name)

  def remember(pid, key, value) do
    send(pid, {:remember, key, value})
    :ok
  end

  def recall(pid, key, timeout \\ 5000) do
    send(pid, {:recall, key, self()})
    receive do
      {:recalled, ^key, value} -> {:ok, value}
    after
      timeout -> {:error, :timeout}
    end
  end

  def send_task(pid, action, params \\ %{}) do
    task = %{action: action, params: params}
    send(pid, {:receive_task, task})
    :ok
  end

  def process_next(pid, timeout \\ 5000) do
    send(pid, {:process_next, self()})
    receive do
      {:processed, task, result} -> {:ok, task, result}
      {:empty, _} -> {:error, :empty_inbox}
    after
      timeout -> {:error, :timeout}
    end
  end

  def status(pid, timeout \\ 5000) do
    send(pid, {:get_status, self()})
    receive do
      {:status, status} -> {:ok, status}
    after
      timeout -> {:error, :timeout}
    end
  end

  def stop(pid) do
    send(pid, :stop)
    :ok
  end
end

# Much cleaner usage!
agent = AgentClient.start("Worker-1")

AgentClient.remember(agent, :context, "Learning Elixir")
{:ok, context} = AgentClient.recall(agent, :context)
IO.puts("Context: #{context}")

AgentClient.send_task(agent, :search, %{query: "Elixir processes"})
{:ok, task, result} = AgentClient.process_next(agent)
IO.puts("Completed: #{task.action} -> #{inspect(result)}")

AgentClient.stop(agent)

7. Summary

Key Concepts

  1. Recursive loops maintain state - pass state as function argument
  2. Tail call optimization prevents stack overflow in infinite loops
  3. Selective receive uses pattern matching to prioritize messages
  4. Save queue holds unmatched messages during receive scanning
  5. Timeouts (after) prevent indefinite blocking
  6. Zero timeout enables non-blocking mailbox checks
  7. API abstraction hides message-passing complexity

State Management Pattern

defmodule StatefulProcess do
  def start(initial_state) do
    spawn(fn -> loop(initial_state) end)
  end

  defp loop(state) do
    receive do
      {:query, from} ->
        send(from, {:result, derive_from(state)})
        loop(state)  # State unchanged

      {:command, data} ->
        new_state = transform(state, data)
        loop(new_state)  # State updated

      :stop ->
        cleanup(state)
        :ok  # Process ends

      unexpected ->
        log_warning(unexpected)
        loop(state)  # Ignore and continue

    after
      timeout ->
        idle_action(state)
        loop(state)
    end
  end
end

Connection to Agent Framework

Our StatefulAgent module now provides:

  • Same fields as AgentFramework.Agent struct
  • But as a living process with its own lifecycle
  • Real message processing with state transitions
  • Memory operations that persist across interactions

8. Practice Exercises

Exercise 1: Rate Limiter

Create a process that limits operations to N per second:

defmodule RateLimiter do
  # Implement a process that:
  # - Allows max N operations per second
  # - Tracks operation count
  # - Resets count every second
  # - Returns :ok or :rate_limited
end

Exercise 2: Message Buffer

Create a process that buffers messages and flushes periodically:

defmodule MessageBuffer do
  # Implement a process that:
  # - Collects incoming messages
  # - Flushes to a callback after N messages OR timeout
  # - Supports {:add, msg}, {:flush, from}, :auto_flush
end

Exercise 3: Job Queue

Create a process that manages a priority job queue:

defmodule JobQueue do
  # Implement a process that:
  # - Accepts jobs with :high, :normal, :low priority
  # - Processes high priority first
  # - Returns next job based on priority
end

Next Session Preview

In Session 9: Errors and Processes, we’ll explore:

  • Process isolation and crash containment
  • Links for bidirectional crash propagation
  • Monitors for unidirectional crash notification
  • Trapping exits to handle crashes gracefully
  • The “Let it crash” philosophy