Parallelism in Elixir
Spawned Processes
# This is some setup for future cells.
require ExUnit.Assertions
import ExUnit.Assertions
"Welcome to Part 2!"
Processes are the primitive for concurrency and parallelism in Elixir. You will often hear about GenServers and Supervisors, but there’s an even more basic option available.
We’ll start with an example that shows how to use spawn
to create
a process that receives messages.
IO.inspect(self())
spawn(fn ->
IO.inspect(self())
end)
"Notice that the PIDs are different"
Spawned processes are not like closures in JavaScript. We can access variables inside of them, but cannot get those variables back out with assignment.
var = "I'm from the surrounding context"
spawn(fn ->
IO.puts(var)
var = "I'm from the process"
end)
var
Notice the warning in the above example. Elixir knows that the code block
doesn’t reference var
after it’s set and even tells you exactly what went
wrong.
If you want to get data exchanged between the processes, you need to
pass messages between them. This is done with send/2
and receive
:
parent = self()
spawn(fn ->
send(parent, :message)
send(parent, :message)
end)
receive do
:message -> IO.puts("I got the message!")
end
The above example intentionally ends with an error. A single message
is received per receive
block. If you want to get multiple messages,
you need to write a loop.
The next example sets you up with the basis for a looping receive
call.
Right now, however, it fails. Can you add a line of code to loop_receive/0
to make it work?
parent = self()
spawn(fn ->
send(parent, {:msg, "a"})
send(parent, {:msg, "b"})
send(parent, {:msg, "c"})
send(parent, :done)
end)
defmodule ReceiveLoop do
def loop_receive do
receive do
{:msg, msg} ->
IO.puts(msg)
# What goes here to make it loop?
:done ->
IO.puts("Done!")
:done
end
end
end
assert ReceiveLoop.loop_receive() == :done
You don’t have to worry about running out of stack space when doing recursion like this. Elixir uses tail-call optimization to prevent the dreaded “too much recursion!” type bug.
GenServer
spawn
/receive
are super important to know about, but they’re not
the most common type of process you’ll work with. Generally, we use
GenServer
to create processes that stay alive for as long as we want.
They’ll keep receiving messages without us writing a loop.
The next example is almost complete, but is missing a critical element that causes the tests to fail. Give it a read and make this test pass.
defmodule ExampleServer do
use GenServer
def inc(pid) do
GenServer.call(pid, :inc)
end
def dec(pid) do
GenServer.call(pid, :dec)
end
def start_link(opts) do
IO.inspect({:start_link, self()})
GenServer.start_link(__MODULE__, opts)
end
def init(opts) do
IO.inspect({:init, self(), opts})
{:ok, %{count: 0}}
end
def handle_call(_msg, _from, state = %{count: count}) do
IO.inspect({:inc, self()})
next_state = Map.put(state, :count, count + 1)
{:reply, next_state.count, next_state}
end
end
IO.inspect({:outside, self()})
assert {:ok, pid} = ExampleServer.start_link([])
assert ExampleServer.inc(pid) == 1
assert ExampleServer.inc(pid) == 2
assert ExampleServer.dec(pid) == 1
Check out the print statements in the previous example. start_link
prints with
a different pid than the :init
and :inc
statements.
This is because the start_link
function executes in the calling process and
init
and callbacks execute in the created process.
The call
function is special. The GenServer
library will wrap the message
into a particular format. So, you can’t just send
a message that is processed
by the handle_call
block. Luckily, GenServer
exposes a handy way to respond to
send
messages.
defmodule InfoServer do
def init(_opts) do
{:ok, nil}
end
def handle_info(msg, state) do
IO.inspect({:handle_info, msg})
{:noreply, state}
end
end
assert {:ok, pid} = GenServer.start_link(InfoServer, [])
send(pid, %{map: true})
handle_info
is convenient when you want to send
a message to your
own process without fussing with GenServer. If you look back at the
Datastore lab, you’ll see that server.ex
uses it to loop the
:gen_tcp.accept/1
call.
Here’s a breakdown of the various GenServer callbacks:
-
handle_call
- Calls will block the caller synchronously and can return a reply -
handle_cast
- Casts will not block the caller and cannot return a reply -
handle_info
- Regular messages sent to the process
Next, let’s can give our GenServer a name to call it more conveniently.
defmodule NameServer do
use GenServer
def inc do
GenServer.call(__MODULE__, :inc)
end
def start(opts) do
# start is used instead of start_link because we'll be killing the process
GenServer.start(__MODULE__, opts, name: __MODULE__)
end
def init(count: start_count) do
{:ok, %{count: start_count}}
end
def handle_call(:inc, _from, state = %{count: count}) do
next_state = Map.put(state, :count, count + 1)
{:reply, next_state.count, next_state}
end
end
# This is here to make the example runnable multiple times in a row
case Process.whereis(NameServer) do
nil -> :ok
pid when is_pid(pid) -> Process.exit(pid, :exit) && Process.sleep(100)
end
assert {:ok, pid} = NameServer.start(count: 1337)
assert Process.whereis(NameServer) == pid
assert NameServer.inc() == 1338
You’ll notice that there’s a bit of code in the previous example that exits out of the server if it exists. Remove that block of code and observe what happens when you run the example multiple times.
This shows an important consideration with named processes: they are globally
unique. If we had two processes with the NamedServer
name, then it wouldn’t
be clear where to route the message.
Named processes are typically used for singleton processes. However, you should be careful when using a single process that is expected to be high-throughput. Let’s see why.
Single Process Concurrency
The next example demonstrates how a process can only handle a single message at a time.
defmodule SingleThroughput do
def init(_opts) do
{:ok, 1}
end
def handle_call(:tick, _from, current) do
IO.inspect({:current, current})
{:reply, current + 1, current + 1}
end
end
{:ok, pid} = GenServer.start_link(SingleThroughput, [])
[
Task.async(fn -> Enum.each(1..10, fn _ -> GenServer.call(pid, :tick) end) end),
Task.async(fn -> Enum.each(1..10, fn _ -> GenServer.call(pid, :tick) end) end),
Task.async(fn -> Enum.each(1..10, fn _ -> GenServer.call(pid, :tick) end) end)
]
|> Enum.map(&Task.await/1)
Did this surprise you at all? It catches many people off-guard because Elixir is supposed to be super parallel.
Think back to the presentation about how the scheduler runs. The scheduler will put a process that’s waiting to process a message onto a CPU. Once the process is scheduled for that CPU, it’s not going to be scheduled simultaneously for a different CPU. Effectively, the process goes through its messages sequentially (and in the order it received them.)
We’re not going to do it here, but a common pattern that you’ll see in the wild is to shard a single process into many processes. Theoretically, the best performance will be with as many processes as you have CPU threads available.
For example, if you have a Key-Value store, you could shard the data into 8 processes based on the key. That store will have 8x throughput than a single process that isn’t sharded.
There is a lot of nuance to the performance of processes in this way. The most important thing is to avoid single global process bottlenecks. Look for ways to split up your processes to increase performance.
Supervisors
We’re not going to cover it in this lab, but supervision trees are
a powerful tool for modeling Elixir applications. You have actually
already seen a supervision tree in the application.ex
file of lab 2.
defmodule Datastore.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
@impl true
def start(_type, _args) do
children = [
{Datastore.Store.Data, []},
{Datastore.Store.Server, []}
]
opts = [strategy: :one_for_one, name: Datastore.Supervisor]
Supervisor.start_link(children, opts)
end
end
Application
is a special type of supervision tree that runs at the root level
of your app. When you pull in a library that exposes processes, it will have
its application automatically started on boot.
Sometimes, you’ll need to create your own supervision trees. This provides several benefits:
- Controls startup and shutdown of a set of related processes
- Specify the exact load-order of your application
- Dynamically start / stop processes during runtime
- Creates a clean application structure
One nice aspect of supervisors is that processes are started in ascending order of their definition (top is first) and stopped in descending order. This means that a process that depends on another process can be setup in such a way that it’s guaranteed that the other process is alive.
So, if you are starting your database pool before your application HTTP server, you are guaranteed that the database pool will be alive before you start fulfilling HTTP requests. It would be bad if you tried to fulfill a request before the database pool was ready!