Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Parallelism in Elixir

3-Parallelism.livemd

Parallelism in Elixir

Spawned Processes

# This is some setup for future cells.
require ExUnit.Assertions
import ExUnit.Assertions

"Welcome to Part 2!"

Processes are the primitive for concurrency and parallelism in Elixir. You will often hear about GenServers and Supervisors, but there’s an even more basic option available.

We’ll start with an example that shows how to use spawn to create a process that receives messages.

IO.inspect(self())

spawn(fn ->
  IO.inspect(self())
end)

"Notice that the PIDs are different"

Spawned processes are not like closures in JavaScript. We can access variables inside of them, but cannot get those variables back out with assignment.

var = "I'm from the surrounding context"

spawn(fn ->
  IO.puts(var)
  var = "I'm from the process"
end)

var

Notice the warning in the above example. Elixir knows that the code block doesn’t reference var after it’s set and even tells you exactly what went wrong.

If you want to get data exchanged between the processes, you need to pass messages between them. This is done with send/2 and receive:

parent = self()

spawn(fn ->
  send(parent, :message)
  send(parent, :message)
end)

receive do
  :message -> IO.puts("I got the message!")
end

The above example intentionally ends with an error. A single message is received per receive block. If you want to get multiple messages, you need to write a loop.

The next example sets you up with the basis for a looping receive call. Right now, however, it fails. Can you add a line of code to loop_receive/0 to make it work?

parent = self()

spawn(fn ->
  send(parent, {:msg, "a"})
  send(parent, {:msg, "b"})
  send(parent, {:msg, "c"})
  send(parent, :done)
end)

defmodule ReceiveLoop do
  def loop_receive do
    receive do
      {:msg, msg} ->
        IO.puts(msg)

      # What goes here to make it loop?

      :done ->
        IO.puts("Done!")
        :done
    end
  end
end

assert ReceiveLoop.loop_receive() == :done

You don’t have to worry about running out of stack space when doing recursion like this. Elixir uses tail-call optimization to prevent the dreaded “too much recursion!” type bug.

GenServer

spawn/receive are super important to know about, but they’re not the most common type of process you’ll work with. Generally, we use GenServer to create processes that stay alive for as long as we want. They’ll keep receiving messages without us writing a loop.

The next example is almost complete, but is missing a critical element that causes the tests to fail. Give it a read and make this test pass.

defmodule ExampleServer do
  use GenServer

  def inc(pid) do
    GenServer.call(pid, :inc)
  end

  def dec(pid) do
    GenServer.call(pid, :dec)
  end

  def start_link(opts) do
    IO.inspect({:start_link, self()})
    GenServer.start_link(__MODULE__, opts)
  end

  def init(opts) do
    IO.inspect({:init, self(), opts})
    {:ok, %{count: 0}}
  end

  def handle_call(_msg, _from, state = %{count: count}) do
    IO.inspect({:inc, self()})
    next_state = Map.put(state, :count, count + 1)
    {:reply, next_state.count, next_state}
  end
end

IO.inspect({:outside, self()})
assert {:ok, pid} = ExampleServer.start_link([])
assert ExampleServer.inc(pid) == 1
assert ExampleServer.inc(pid) == 2
assert ExampleServer.dec(pid) == 1

Check out the print statements in the previous example. start_link prints with a different pid than the :init and :inc statements.

This is because the start_link function executes in the calling process and init and callbacks execute in the created process.

The call function is special. The GenServer library will wrap the message into a particular format. So, you can’t just send a message that is processed by the handle_call block. Luckily, GenServer exposes a handy way to respond to send messages.

defmodule InfoServer do
  def init(_opts) do
    {:ok, nil}
  end

  def handle_info(msg, state) do
    IO.inspect({:handle_info, msg})
    {:noreply, state}
  end
end

assert {:ok, pid} = GenServer.start_link(InfoServer, [])
send(pid, %{map: true})

handle_info is convenient when you want to send a message to your own process without fussing with GenServer. If you look back at the Datastore lab, you’ll see that server.ex uses it to loop the :gen_tcp.accept/1 call.

Here’s a breakdown of the various GenServer callbacks:

  • handle_call - Calls will block the caller synchronously and can return a reply
  • handle_cast - Casts will not block the caller and cannot return a reply
  • handle_info - Regular messages sent to the process

Next, let’s can give our GenServer a name to call it more conveniently.

defmodule NameServer do
  use GenServer

  def inc do
    GenServer.call(__MODULE__, :inc)
  end

  def start(opts) do
    # start is used instead of start_link because we'll be killing the process
    GenServer.start(__MODULE__, opts, name: __MODULE__)
  end

  def init(count: start_count) do
    {:ok, %{count: start_count}}
  end

  def handle_call(:inc, _from, state = %{count: count}) do
    next_state = Map.put(state, :count, count + 1)
    {:reply, next_state.count, next_state}
  end
end

# This is here to make the example runnable multiple times in a row
case Process.whereis(NameServer) do
  nil -> :ok
  pid when is_pid(pid) -> Process.exit(pid, :exit) && Process.sleep(100)
end

assert {:ok, pid} = NameServer.start(count: 1337)
assert Process.whereis(NameServer) == pid
assert NameServer.inc() == 1338

You’ll notice that there’s a bit of code in the previous example that exits out of the server if it exists. Remove that block of code and observe what happens when you run the example multiple times.

This shows an important consideration with named processes: they are globally unique. If we had two processes with the NamedServer name, then it wouldn’t be clear where to route the message.

Named processes are typically used for singleton processes. However, you should be careful when using a single process that is expected to be high-throughput. Let’s see why.

Single Process Concurrency

The next example demonstrates how a process can only handle a single message at a time.

defmodule SingleThroughput do
  def init(_opts) do
    {:ok, 1}
  end

  def handle_call(:tick, _from, current) do
    IO.inspect({:current, current})
    {:reply, current + 1, current + 1}
  end
end

{:ok, pid} = GenServer.start_link(SingleThroughput, [])

[
  Task.async(fn -> Enum.each(1..10, fn _ -> GenServer.call(pid, :tick) end) end),
  Task.async(fn -> Enum.each(1..10, fn _ -> GenServer.call(pid, :tick) end) end),
  Task.async(fn -> Enum.each(1..10, fn _ -> GenServer.call(pid, :tick) end) end)
]
|> Enum.map(&Task.await/1)

Did this surprise you at all? It catches many people off-guard because Elixir is supposed to be super parallel.

Think back to the presentation about how the scheduler runs. The scheduler will put a process that’s waiting to process a message onto a CPU. Once the process is scheduled for that CPU, it’s not going to be scheduled simultaneously for a different CPU. Effectively, the process goes through its messages sequentially (and in the order it received them.)

We’re not going to do it here, but a common pattern that you’ll see in the wild is to shard a single process into many processes. Theoretically, the best performance will be with as many processes as you have CPU threads available.

For example, if you have a Key-Value store, you could shard the data into 8 processes based on the key. That store will have 8x throughput than a single process that isn’t sharded.

There is a lot of nuance to the performance of processes in this way. The most important thing is to avoid single global process bottlenecks. Look for ways to split up your processes to increase performance.

Supervisors

We’re not going to cover it in this lab, but supervision trees are a powerful tool for modeling Elixir applications. You have actually already seen a supervision tree in the application.ex file of lab 2.

defmodule Datastore.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {Datastore.Store.Data, []},
      {Datastore.Store.Server, []}
    ]

    opts = [strategy: :one_for_one, name: Datastore.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Application is a special type of supervision tree that runs at the root level of your app. When you pull in a library that exposes processes, it will have its application automatically started on boot.

Sometimes, you’ll need to create your own supervision trees. This provides several benefits:

One nice aspect of supervisors is that processes are started in ascending order of their definition (top is first) and stopped in descending order. This means that a process that depends on another process can be setup in such a way that it’s guaranteed that the other process is alive.

So, if you are starting your database pool before your application HTTP server, you are guaranteed that the database pool will be alive before you start fulfilling HTTP requests. It would be bad if you tried to fulfill a request before the database pool was ready!