Processes Basics
Introduction
All Elixir code runs inside processes. In this livebook, we’ll take a look at the basics of processes, how to spawn them, and how to have them communicate with each other.
Spawning
The most basic operation to create a new process is to spawn it. spawn/1
takes a function, creates a new process, and executes the function in the new process.
The process exits when the function returns or when there’s an explicit exit (such as an uncaught raise
).
Each process is identified by a PID (Process IDentifier).
pid =
spawn(fn ->
Process.sleep(2000)
IO.puts("Finished the expensive computation.")
end)
IO.puts("Just spawned a process with PID #{inspect(pid)}. Let's wait a bit.")
Sending and Receiving Messages
Processes communicate via messages. A process can send or receive messages.
Sending
To send messages, you use send(pid, message)
. The caller process sends message
to the process identified by pid
. Sending is asynchronous, that is, send/2
returns as soon as the message is sent. There’s no guarantee that when send/2
returns the destination process has received the message.
Receiving
To receive messages, a process calls receive
. receive
is blocking: it will halt execution until a message that matches one of the listed patterns arrives to the caller process.
pid =
spawn(fn ->
receive do
message ->
IO.puts("#{inspect(self())} received a message: #{inspect(message)}")
end
end)
Process.alive?(pid)
send(pid, :hello_world)
As you can see, the process receives the message and prints it to standard output. Since the function that we passed to spawn/1
returns after printing the message, the process itself finishes its execution. You can see this by checking whether pid
represents a process that is alive.
Process.alive?(pid)
Pattern Matching on Receive
receive
supports multiple ->
clauses. When a message arrives, the first clause that matches it gets executed. This is analogous to case
.
pid =
spawn(fn ->
receive do
:ping ->
IO.puts("pong!")
message ->
IO.puts("#{inspect(self())} received a message: #{inspect(message)}")
end
end)
send(pid, :ping)
The Process Mailbox
Each process has a mailbox where all messages it receives end up. It’s conceptually similar to a queue.
When a process receives a message, this message ends up in the process mailbox. The next time there is a receive
call in that process, this is the algorithm that gets executed to determine what to do with that message.
sequenceDiagram;
participant other_pid as Other process
participant pid as Process
participant mailbox as Mailbox
other_pid->>pid: Send message
pid->>mailbox: Put message at the end
pid->>pid: Wait for the next call to receive
loop Every message in mailbox
alt Matches one clause
pid->>pid: Execute clause
else
pid->>mailbox: Store in the same position
end
end
Two important things:
-
If the loop reaches the end of the mailbox and no messages match any
receive
clauses,receive
blocks until a new message comes and the algorithm gets executed again. -
If
receive
is called when there are already messages in the mailbox, the algorithms executes right away (it doesn’t wait for a new message).
Receive Timeout
receive
blocks indefinitely. However, it supports an after
clause. This clause lets you specify a timeout after which the corresponding code executes and receive
returns. Be careful using receive
without after
, since it could cause the process to halt indefinitely in case there’s a bug in your list of patterns.
spawn(fn ->
receive do
message ->
IO.puts("#{inspect(self())} received a message: #{inspect(message)}")
after
5_000 ->
IO.puts("Timeout, no messages")
end
end)
Parallel Map
Let’s use what we know so far to implement parallel mapping.
defmodule Parallel do
def map(enum, fun) do
# Let's take note of the "parent" PID, since if we call self()
# in the function we pass to spawn/1 then we get the PID of the
# spawned process.
parent = self()
pids =
Enum.map(enum, fn elem ->
spawn(fn ->
# Compute the mapped element.
mapped_elem = fun.(elem)
# Send the result back to the "parent".
send(parent, {self(), mapped_elem})
end)
end)
Enum.map(pids, fn pid ->
receive do
{^pid, mapped_elem} -> mapped_elem
end
end)
end
end
This code is full of bugs. 🙈 It doesn’t use after
for timeouts, it doesn’t do any error handling, and more. However, it illustrates the idea! Let’s give it a spin.
To see the parallelism in action, let’s map over a list of integers representing timeouts in milliseconds. We’ll map the Process.sleep/1
function over those. First, let’s use Enum.map/2
to see what happens when we map sequentially, one item at a time:
{elapsed, _result} = :timer.tc(fn -> Enum.map([1000, 1000, 1000], &Process.sleep/1) end)
IO.puts("Elapsed time: #{elapsed / 1_000_000} s")
It takes roughly 3s to execute the code, which makes perfect sense. If we use our Parallel.map/2
function, it should hopefully take around 1s!
{elapsed, _result} = :timer.tc(fn -> Parallel.map([1000, 1000, 1000], &Process.sleep/1) end)
IO.puts("Elapsed time: #{elapsed / 1_000_000} s")