Powered by AppSignal & Oban Pro

Session 7: The Hitchhiker's Guide to Concurrency

07_hitchhikers_guide_concurrency.livemd

Session 7: The Hitchhiker’s Guide to Concurrency

Mix.install([])

Learning Goals

By the end of this session, you will:

  1. Understand Erlang/Elixir’s process model and how it differs from OS threads
  2. Master the three concurrency primitives: spawn, send, and receive
  3. Grasp the actor model fundamentals
  4. Build your first communicating processes

1. What is a Process?

The Actor Model

Elixir’s concurrency is based on the Actor Model. In this model:

  • Actors (processes) are isolated units of computation
  • They run independently and share nothing
  • They communicate only through message passing
  • Each has its own mailbox for incoming messages

This is fundamentally different from traditional threading where threads share memory.

Processes vs OS Threads

Elixir Processes OS Threads
~2-3 KB initial memory ~1 MB stack size
Microseconds to create Milliseconds to create
Millions possible Thousands max
Isolated memory Shared memory
No locks needed Locks required
Preemptive scheduling by BEAM OS scheduled

Let’s see how many processes already exist in our runtime:

# How many processes are currently running?
process_count = length(Process.list())
IO.puts("Current running processes: #{process_count}")

# Our current process
IO.puts("My PID: #{inspect(self())}")
IO.puts("Am I alive? #{Process.alive?(self())}")

The BEAM Scheduler

The BEAM VM creates one scheduler per CPU core. Each scheduler:

  • Runs in its own OS thread
  • Manages thousands of Elixir processes
  • Uses preemptive scheduling (processes get fair time slices)
  • Can steal work from other schedulers for load balancing
# How many schedulers do we have?
:erlang.system_info(:schedulers)

2. Spawning Processes

The spawn function creates a new process. It returns immediately with the new process’s PID (Process Identifier).

spawn/1 - Anonymous Function

# Spawn a process that runs an anonymous function
pid = spawn(fn ->
  IO.puts("Hello from process #{inspect(self())}!")
end)

IO.puts("Spawned process: #{inspect(pid)}")

# Give it a moment to execute
Process.sleep(100)

# Is it still alive?
IO.puts("Is #{inspect(pid)} alive? #{Process.alive?(pid)}")

The spawned process executes the function and then terminates. This is a fundamental concept: processes have a lifecycle.

spawn/3 - Module, Function, Args

You can also spawn using a module, function name, and arguments list (MFA):

defmodule Greeter do
  def say_hello(name) do
    IO.puts("Hello, #{name}! I am #{inspect(self())}")
  end
end

# Spawn using MFA (Module, Function, Args)
pid = spawn(Greeter, :say_hello, ["World"])
Process.sleep(100)

Exercise: Spawn 1000 Processes

Let’s prove that Elixir processes are truly lightweight:

# Measure time to spawn 1000 processes
{time_microseconds, _} = :timer.tc(fn ->
  1..1000
  |> Enum.map(fn i ->
    spawn(fn ->
      # Each process does a tiny bit of work
      _result = i * i
    end)
  end)
end)

IO.puts("Time to spawn 1000 processes: #{time_microseconds / 1000} ms")
IO.puts("Average per process: #{time_microseconds / 1000} microseconds")

Now let’s see the process count spike:

before = length(Process.list())

# Spawn processes that stay alive longer
pids = for i <- 1..1000 do
  spawn(fn ->
    Process.sleep(5000)  # Stay alive for 5 seconds
  end)
end

after_spawn = length(Process.list())

IO.puts("Processes before: #{before}")
IO.puts("Processes after spawning 1000: #{after_spawn}")
IO.puts("Difference: #{after_spawn - before}")

3. Message Passing

Processes communicate through messages. Each process has a mailbox - a queue where incoming messages wait to be processed.

send/2 - Sending Messages

The send/2 function (or ! operator) sends a message to a PID:

# Send a message to ourselves
send(self(), {:greeting, "Hello!"})
send(self(), {:number, 42})
send(self(), :done)

# Check our mailbox (without consuming messages)
Process.info(self(), :messages)

receive/1 - Receiving Messages

The receive block pattern matches against messages in the mailbox:

# Now consume the messages we sent
receive do
  {:greeting, msg} -> IO.puts("Got greeting: #{msg}")
end

receive do
  {:number, n} -> IO.puts("Got number: #{n}")
end

receive do
  :done -> IO.puts("Got done signal")
end

The Mailbox is a Queue (FIFO)

Messages are processed in the order they can be matched:

# Send multiple messages
send(self(), {:a, 1})
send(self(), {:b, 2})
send(self(), {:a, 3})

# This will match the FIRST {:a, _} message
receive do
  {:a, n} -> IO.puts("First match for :a -> #{n}")
end

# The {:b, 2} is still in queue, along with {:a, 3}
Process.info(self(), :messages)
# Clean up remaining messages
receive do
  {:b, n} -> IO.puts("Got :b -> #{n}")
end

receive do
  {:a, n} -> IO.puts("Got remaining :a -> #{n}")
end

4. Two Processes Communicating

Now let’s make two processes talk to each other - the classic ping-pong example.

Basic Ping-Pong

# The parent process (us)
parent = self()

# Spawn a child that waits for a message
child = spawn(fn ->
  IO.puts("Child #{inspect(self())} waiting for message...")

  receive do
    {:hello, from_pid} ->
      IO.puts("Child received :hello from #{inspect(from_pid)}")
      # Send a reply back
      send(from_pid, {:world, self()})
  end

  IO.puts("Child done")
end)

IO.puts("Parent #{inspect(parent)} sending :hello to child #{inspect(child)}")

# Send message to child
send(child, {:hello, parent})

# Wait for reply
receive do
  {:world, from_pid} ->
    IO.puts("Parent received :world from #{inspect(from_pid)}")
end

IO.puts("Conversation complete!")

Diagram: Message Flow

Parent Process                    Child Process
     |                                 |
     |----spawn----------------------->| (created)
     |                                 |
     |----{:hello, parent}------------>|
     |                                 | (receives, processes)
     |<---{:world, child}--------------|
     |                                 | (exits)
   (done)                              X

5. Exercise: Request-Response Pattern

Build a “calculator” process that receives requests and sends back responses.

defmodule Calculator do
  def start do
    spawn(fn -> loop() end)
  end

  defp loop do
    receive do
      {:add, a, b, from} ->
        send(from, {:result, a + b})
        loop()  # Keep running!

      {:multiply, a, b, from} ->
        send(from, {:result, a * b})
        loop()

      :stop ->
        IO.puts("Calculator stopping")
        # Don't call loop() - process ends
    end
  end
end

# Start the calculator
calc = Calculator.start()

# Send some calculations
send(calc, {:add, 5, 3, self()})
receive do
  {:result, n} -> IO.puts("5 + 3 = #{n}")
end

send(calc, {:multiply, 4, 7, self()})
receive do
  {:result, n} -> IO.puts("4 * 7 = #{n}")
end

# Stop the calculator
send(calc, :stop)
Process.sleep(100)
IO.puts("Calculator alive? #{Process.alive?(calc)}")

Key Insight: Recursive Loop

Notice how the loop/0 function calls itself at the end of each message handling. This is how processes stay alive to handle multiple messages. Without the recursive call, the process would exit after the first message.


6. Building a Simple Agent Process

Let’s connect this to our Phase 1 work by building a process that acts like our AgentFramework.Agent struct, but as a real running process.

defmodule SimpleAgentProcess do
  @moduledoc """
  A simple agent implemented as a process.
  This previews what we'll build in the checkpoint project.
  """

  def start(name) do
    spawn(fn -> loop(%{name: name, memory: %{}, inbox: []}) end)
  end

  defp loop(state) do
    receive do
      # Get current state
      {:get_state, from} ->
        send(from, {:state, state})
        loop(state)

      # Store something in memory
      {:remember, key, value} ->
        new_memory = Map.put(state.memory, key, value)
        loop(%{state | memory: new_memory})

      # Recall from memory
      {:recall, key, from} ->
        value = Map.get(state.memory, key)
        send(from, {:recalled, key, value})
        loop(state)

      # Receive a task (add to inbox)
      {:task, task} ->
        IO.puts("[#{state.name}] Received task: #{inspect(task)}")
        new_inbox = state.inbox ++ [task]
        loop(%{state | inbox: new_inbox})

      # Process next task
      {:process_next, from} ->
        case state.inbox do
          [] ->
            send(from, {:empty, "No tasks in inbox"})
            loop(state)
          [task | rest] ->
            IO.puts("[#{state.name}] Processing: #{inspect(task)}")
            send(from, {:processed, task})
            loop(%{state | inbox: rest})
        end

      :stop ->
        IO.puts("[#{state.name}] Shutting down")
        :ok

      other ->
        IO.puts("[#{state.name}] Unknown message: #{inspect(other)}")
        loop(state)
    end
  end
end

# Create an agent
agent = SimpleAgentProcess.start("Worker-1")

# Store some memory
send(agent, {:remember, :context, "Researching Elixir"})
send(agent, {:remember, :priority, :high})

# Recall memory
send(agent, {:recall, :context, self()})
receive do
  {:recalled, :context, value} -> IO.puts("Context: #{value}")
end

# Send tasks
send(agent, {:task, %{action: :search, query: "OTP"}})
send(agent, {:task, %{action: :summarize, doc: "results"}})

# Check state
send(agent, {:get_state, self()})
receive do
  {:state, state} -> IO.puts("Agent state: #{inspect(state)}")
end

# Process tasks
send(agent, {:process_next, self()})
receive do
  {:processed, task} -> IO.puts("Processed: #{inspect(task)}")
end

# Clean up
send(agent, :stop)

7. Summary

What We Learned

  1. Processes are lightweight - You can create millions of them
  2. Processes are isolated - They share nothing, preventing race conditions
  3. spawn/1 creates a new process running a function
  4. self/0 returns the current process’s PID
  5. send/2 sends a message to a PID (non-blocking)
  6. receive/1 pattern matches incoming messages (blocking)
  7. Recursive loops keep processes alive to handle multiple messages

Key Patterns

# Pattern 1: Fire and forget
spawn(fn -> do_work() end)

# Pattern 2: Request-Response
pid = spawn(fn ->
  receive do
    {:request, data, from} -> send(from, {:response, result})
  end
end)
send(pid, {:request, data, self()})
receive do
  {:response, result} -> handle(result)
end

# Pattern 3: Long-running server
defmodule Server do
  def start, do: spawn(fn -> loop(initial_state) end)
  defp loop(state) do
    receive do
      msg ->
        new_state = handle(msg, state)
        loop(new_state)  # Keep running
    end
  end
end

Connection to Agent Framework

Our AgentFramework.Agent struct from Phase 1 was a data structure representing an agent. Now we’ve seen how to make it a living process that:

  • Maintains its own state
  • Receives and processes messages
  • Runs independently

In the next sessions, we’ll explore:

  • Maintaining complex state across messages
  • Selective receive and timeouts
  • Handling process failures (links and monitors)
  • Designing multi-process architectures

8. Practice Exercises

Exercise 1: Echo Server

Create a process that echoes back any message it receives:

# Your code here
defmodule EchoServer do
  def start do
    # Implement me!
    spawn(fn -> loop() end)
  end

  defp loop do
    receive do
      {msg, from} ->
        send(from, {:echo, msg})
        loop()
      :stop ->
        :ok
    end
  end
end

# Test it
echo = EchoServer.start()
send(echo, {"Hello!", self()})
receive do
  {:echo, msg} -> IO.puts("Echo: #{msg}")
end
send(echo, :stop)

Exercise 2: Counter Process

Create a process that counts up each time it receives :increment:

defmodule Counter do
  def start(initial \\ 0) do
    spawn(fn -> loop(initial) end)
  end

  defp loop(count) do
    receive do
      :increment ->
        loop(count + 1)
      {:get, from} ->
        send(from, {:count, count})
        loop(count)
      :stop ->
        IO.puts("Final count: #{count}")
    end
  end
end

# Test it
counter = Counter.start(0)
send(counter, :increment)
send(counter, :increment)
send(counter, :increment)
send(counter, {:get, self()})
receive do
  {:count, n} -> IO.puts("Current count: #{n}")
end
send(counter, :stop)

Exercise 3: Two Agents Communicating

Create two agent processes that can send tasks to each other:

# Challenge: Implement two agents that collaborate
# Agent A receives a task, does partial work, sends to Agent B
# Agent B completes the work and reports back

# Your code here!

Next Session Preview

In Session 8: More On Multiprocessing, we’ll dive deeper into:

  • Maintaining complex state with recursive loops
  • Selective receive (pattern matching priority)
  • Mailbox management
  • Timeouts with the after clause
  • Building a stateful agent with full memory operations