Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Processes Basics

01-processes.livemd

Processes Basics

Introduction

All Elixir code runs inside processes. In this livebook, we’ll take a look at the basics of processes, how to spawn them, and how to have them communicate with each other.

Spawning

The most basic operation to create a new process is to spawn it. spawn/1 takes a function, creates a new process, and executes the function in the new process.

The process exits when the function returns or when there’s an explicit exit (such as an uncaught raise).

Each process is identified by a PID (Process IDentifier).

pid =
  spawn(fn ->
    Process.sleep(2000)
    IO.puts("Finished the expensive computation.")
  end)

IO.puts("Just spawned a process with PID #{inspect(pid)}. Let's wait a bit.")

Sending and Receiving Messages

Processes communicate via messages. A process can send or receive messages.

Sending

To send messages, you use send(pid, message). The caller process sends message to the process identified by pid. Sending is asynchronous, that is, send/2 returns as soon as the message is sent. There’s no guarantee that when send/2 returns the destination process has received the message.

Receiving

To receive messages, a process calls receive. receive is blocking: it will halt execution until a message that matches one of the listed patterns arrives to the caller process.

pid =
  spawn(fn ->
    receive do
      message ->
        IO.puts("#{inspect(self())} received a message: #{inspect(message)}")
    end
  end)

Process.alive?(pid)
send(pid, :hello_world)

As you can see, the process receives the message and prints it to standard output. Since the function that we passed to spawn/1 returns after printing the message, the process itself finishes its execution. You can see this by checking whether pid represents a process that is alive.

Process.alive?(pid)

Pattern Matching on Receive

receive supports multiple -> clauses. When a message arrives, the first clause that matches it gets executed. This is analogous to case.

pid =
  spawn(fn ->
    receive do
      :ping ->
        IO.puts("pong!")

      message ->
        IO.puts("#{inspect(self())} received a message: #{inspect(message)}")
    end
  end)

send(pid, :ping)

The Process Mailbox

Each process has a mailbox where all messages it receives end up. It’s conceptually similar to a queue.

When a process receives a message, this message ends up in the process mailbox. The next time there is a receive call in that process, this is the algorithm that gets executed to determine what to do with that message.

sequenceDiagram;
  participant other_pid as Other process
  participant pid as Process
  participant mailbox as Mailbox

  other_pid->>pid: Send message
  pid->>mailbox: Put message at the end

  pid->>pid: Wait for the next call to receive

  loop Every message in mailbox
    alt Matches one clause
      pid->>pid: Execute clause
    else
      pid->>mailbox: Store in the same position
    end
  end

Two important things:

  • If the loop reaches the end of the mailbox and no messages match any receive clauses, receive blocks until a new message comes and the algorithm gets executed again.
  • If receive is called when there are already messages in the mailbox, the algorithms executes right away (it doesn’t wait for a new message).

Receive Timeout

receive blocks indefinitely. However, it supports an after clause. This clause lets you specify a timeout after which the corresponding code executes and receive returns. Be careful using receive without after, since it could cause the process to halt indefinitely in case there’s a bug in your list of patterns.

spawn(fn ->
  receive do
    message ->
      IO.puts("#{inspect(self())} received a message: #{inspect(message)}")
  after
    5_000 ->
      IO.puts("Timeout, no messages")
  end
end)

Parallel Map

Let’s use what we know so far to implement parallel mapping.

defmodule Parallel do
  def map(enum, fun) do
    # Let's take note of the "parent" PID, since if we call self()
    # in the function we pass to spawn/1 then we get the PID of the
    # spawned process.
    parent = self()

    pids =
      Enum.map(enum, fn elem ->
        spawn(fn ->
          # Compute the mapped element.
          mapped_elem = fun.(elem)

          # Send the result back to the "parent".
          send(parent, {self(), mapped_elem})
        end)
      end)

    Enum.map(pids, fn pid ->
      receive do
        {^pid, mapped_elem} -> mapped_elem
      end
    end)
  end
end

This code is full of bugs. 🙈 It doesn’t use after for timeouts, it doesn’t do any error handling, and more. However, it illustrates the idea! Let’s give it a spin.

To see the parallelism in action, let’s map over a list of integers representing timeouts in milliseconds. We’ll map the Process.sleep/1 function over those. First, let’s use Enum.map/2 to see what happens when we map sequentially, one item at a time:

{elapsed, _result} = :timer.tc(fn -> Enum.map([1000, 1000, 1000], &Process.sleep/1) end)
IO.puts("Elapsed time: #{elapsed / 1_000_000} s")

It takes roughly 3s to execute the code, which makes perfect sense. If we use our Parallel.map/2 function, it should hopefully take around 1s!

{elapsed, _result} = :timer.tc(fn -> Parallel.map([1000, 1000, 1000], &Process.sleep/1) end)
IO.puts("Elapsed time: #{elapsed / 1_000_000} s")