Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us
Notesclub

Worker Pools

reading/worker_pools.livemd

Worker Pools

Mix.install([
  {:jason, "~> 1.4"},
  {:kino, "~> 0.9", override: true},
  {:youtube, github: "brooklinjazz/youtube"},
  {:hidden_cell, github: "brooklinjazz/hidden_cell"},
  {:poolboy, "~> 1.5"}
])

Navigation

Home Report An Issue QueuesRubix Cube

Overview

Process Bottlenecks

Process bottlenecks occur when we send too many messages to a process. For example, if we have a single GenServer in our application, each message is handled synchronously. Synchronous message handling makes it easier to reason about the behavior of a GenServer but limits the concurrency in our system.

For example, if we had an expensive operation that takes 500ms, and three clients made simultaneous requests, the last client would have to wait 1.5 seconds.

sequenceDiagram
    participant Client 3
    participant Client 2
    participant Client 1
    participant GenServer
    Client 1->>GenServer: request
    Client 2->>GenServer: request
    Client 3->>GenServer: request
    GenServer->>Client 1: response (500ms)
    GenServer->>Client 2: response (1s)
    GenServer->>Client 3: response (1.5s)

Worker Pools

A worker pool is a group of processes that handle incoming tasks. When a new task arrives, one of the idle worker processes is assigned to handle it. This allows the workload to be distributed evenly among the worker processes, increasing the efficiency and scalability of the system.

flowchart LR

Caller --> P
subgraph Worker Pool
  P[Pool Manager]
  W1[Worker 1]
  W2[Worker 2]
  W3[Worker 3]

  P --> W1
  P --> W2
  P --> W3
end

Concurrency Constraints

Worker pools typically have a finite number of workers. A limited worker pool helps prevent unbounded concurrency, which can overload our system with too many concurrent tasks. In addition, once we have enough workers to ensure our CPU cores are busy with work, additional concurrency has few benefits.

Named Processes

We use named processes in this reading material. If a named process is already started and you re-evaluate a code example you may see the following error.

> ** (MatchError) no match of right hand side value: {:error, {:already_started, #PID<0.265.0>}} > reading/worker_pools.livemd#cell:oc2mtzukw3gd7z2s5oyguox4qdpegr7a:3: PoolManager.start_link/1 > /home/brook/dockyard/curriculum/reading/worker_pools. > livemd#cell:ynvusqt7eiocyatfupox6d2pqqpiwwf4:2: (file) > /home/brook/dockyard/curriculum/reading/worker_pools. > livemd#cell:ynvusqt7eiocyatfupox6d2pqqpiwwf4:1: (file)

To resolve this issue, reconnect the current runtime.

Module Names Are Atoms

Throughout this reading, you will see several examples of module names without a module definition.

Registry.start_link(name: ModuleName, keys: :duplicate)

Keep in mind that module names are just atoms. For these examples, we can use atoms and module names interchangeably. We often see this when providing the name for a process.

Registry.start_link(name: :my_module_name, keys: :duplicate)

Module names are just syntax sugar for atoms prefaced with :Elixir..

MyModule == :"Elixir.MyModule"

Modules even work with Atom functions.

Atom.to_string(MyModule)

Kino.Process

We use Kino.Process in this material to provide sequence diagrams of processes. We’ll often use the Kino.Process.render_sequence_trace/2 function to display process messages in a diagram.

Kino.Process.render_seq_trace(fn ->
  parent = self()
  child = spawn(fn -> send(parent, :message) end)

  receive do
    :message -> "parent #{inspect(parent)} received a message from child #{inspect(child)}"
  end
end)

Registry

Registry is a built-in module that provides a way to register and look up named processes in a distributed system. Think of it as a directory service that allows you to associate a process with a name, making it easier to locate and communicate with that process throughout your application.

flowchart
Registry
p1[Process]
p2[Process]
p3[Process]

Registry --> p1
Registry --> p2
Registry --> p3

Unique Registry

Unique Registries register only a single process for each unique key.

flowchart
R[Registry]
K1[Key1]
K2[Key2]
K3[Key3]
P1[Process1]
P2[Process2]
P3[Process3]

R --> K1 --> P1
R --> K2 --> P2
R --> K3 --> P3
{:ok, _} = Registry.start_link(keys: :unique, name: :my_unique_registry)

We can start a process under the Registry by calling Registry.register/3. We’ll start a simple Agent process.

{:ok, agent_pid} =
  Agent.start_link(fn ->
    Registry.register(:my_unique_registry, :agent1, nil)
    0
  end)

Then we can find the :agent1 process in the Registry using Registry.lookup/2.

Registry.lookup(:my_unique_registry, :agent1)

Via

For a unique Registry, we can also start a registered process without calling Registry.register/3 by using a :via tuple in the format {:via, Registry, {registry, key}}.

name = {:via, Registry, {:my_unique_registry, :agent2}}
{:ok, agent_pid} = Agent.start_link(fn -> 0 end, name: name)

Our Registry contains both :agent1, and :agent2 now.

[{agent1, _}] = Registry.lookup(:my_unique_registry, :agent1)
[{agent2, _}] = Registry.lookup(:my_unique_registry, :agent2)

{agent1, agent2}

Duplicate Registry

Instead of using a unique key for each process, we can group many processes under a single key.

flowchart
R[Registry]
K1[Key1]
K2[Key2]
P1[Process]
P2[Process]
P3[Process]
P4[Process]
P5[Process]
P6[Process]

R --> K1
K1 --> P1
K1 --> P2
K1 --> P3

R --> K2
K2 --> P4
K2 --> P5
K2 --> P6
{:ok, _} = Registry.start_link(keys: :duplicate, name: :my_duplicate_registry)

# Register Multiple Processes Under The Same Key In The Registry

Agent.start_link(fn ->
  Registry.register(:my_duplicate_registry, :my_key, nil)
  0
end)

Agent.start_link(fn ->
  Registry.register(:my_duplicate_registry, :my_key, nil)
  0
end)

Agent.start_link(fn ->
  Registry.register(:my_duplicate_registry, :my_key, nil)
  0
end)

# Lookup All Three Processes
Registry.lookup(:my_duplicate_registry, :my_key)

Building Our Own Worker Pool

Now that we understand Registry, we can use it to build our own worker pool.

Our Worker will be a simple GenServer that performs a job that waits for a second and returns a response.

When this Worker process starts, it will register itself under a PoolManager registry. While not strictly necessary, we pass in the Registry through the opts so we can use the same Worker process with multiple registry examples.

defmodule Worker do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, opts)
  end

  def init(opts) do
    # Register the Worker under the PoolManager
    # We've made the : Registry configurable.
    # While not necessary, this makes our worker re-usable throughout our registry examples.
    registry = opts[:registry] || PoolManager
    Registry.register(registry, :workers, nil)

    {:ok, 0}
  end

  def perform_job(pid) do
    # Print the name of the worker or its pid
    IO.inspect(Process.info(pid)[:registered_name] || pid, label: "starting job")
    GenServer.call(pid, :perform_job)
  end

  def handle_call(:perform_job, _from, state) do
    Process.sleep(1000)
    {:reply, "response", state}
  end
end

We’ll also create a PoolManager, which starts a Registry and three Worker processes.

defmodule PoolManager do
  def start_link(_opts) do
    {:ok, _} = Registry.start_link(name: __MODULE__, keys: :duplicate)

    # Start three workers. We've given them names for the diagrams below.
    for n <- 1..3 do
      {:ok, pid} = Worker.start_link(name: :"worker#{n}")
    end
  end

  def schedule_job do
    workers = Registry.lookup(__MODULE__, :workers)
    # We grab a random worker to perform the job.
    # While not ideal, this is a very simple scheduling implementation.
    {pid, _value} = Enum.random(workers)

    Worker.perform_job(pid)
  end
end

We’ll use Kino.Process.render_seq_trace/2 to demonstrate how the PoolManager starts the three registered Worker processes.

The PoolManager.PIDPartition0 process is used under the hood by Registry. See Registry.start_link/1 for more.

Kino.Process.render_seq_trace(fn ->
  PoolManager.start_link([])
end)

When a process calls PoolManager.schedule_job/0, our PoolManager will randomly select a worker.

Kino.Process.render_seq_trace(fn ->
  PoolManager.schedule_job()
end)

The code is synchronous for the Caller process, but if we have multiple Caller processes (such as when you have many clients in a Phoenix server), we’ll see the full Benefits of the worker pool.

flowchart LR
  C1[Client]
  C2[Client]
  C1 --> P
  C2 --> P
  subgraph Worker Pool
    P[Pool Manager]
    W1[Worker 1]
    W2[Worker 2]
    W3[Worker 3]

    P --Client 1 Routed to--> W1
    P --Client 2 Routed to --> W2
    P --inactive--> W3
  end

To simulate multiple callers, we’ll use two tasks. This will take only a second to run most of the time. However, it will take two seconds if we get unlucky and select the same worker for both tasks.

task1 = Task.async(fn -> PoolManager.schedule_job() end)
task2 = Task.async(fn -> PoolManager.schedule_job() end)

Task.await_many([task1, task2])

While randomly selecting a worker is usually fine since large workloads will be evenly distributed across workers, there are more complicated scheduling techniques such as round-robin, where we schedule workers in order.

Supervising Our Registry

To supervise our Registry, we simply need to put our Registry process and Worker pool processes under a supervisor.

Here is a minimal example to demonstrate the concept.

Next, we start a Registry called SupervisedPoolManager under a Supervisor. We also manually start three SupervisedWorker processes for our worker pool.

children = [
  {Registry, name: SupervisedPoolManager, keys: :duplicate},
  %{
    id: :worker1,
    start: {Worker, :start_link, [[name: :super_worker1, registry: SupervisedPoolManager]]}
  },
  %{
    id: :worker2,
    start: {Worker, :start_link, [[name: :super_worker2, registry: SupervisedPoolManager]]}
  },
  %{
    id: :worker3,
    start: {Worker, :start_link, [[name: :super_worker3, registry: SupervisedPoolManager]]}
  }
]

opts = [strategy: :one_for_one, name: :registry_supervisor]

{:ok, supervisor_pid} = Supervisor.start_link(children, opts)

We can look up our three SupervisedWorker processes registered by the SupervisedPoolManager to ensure they have started.

Registry.lookup(SupervisedPoolManager, :workers)

We’ll use Kino.Process.sup_tree/2 to visualize the supervisor tree above for demonstration purposes.

Kino.Process.sup_tree(supervisor_pid)

Supervised Worker Pool Module

Now that we’ve seen a minimal example, we’ll make a more complete implementation using a Module Based Supervisors to encapsulate our supervised Registry.

defmodule SupervisedPool do
  use Supervisor

  def start_link(_opts) do
    # we've made our name configurable for demonstration purposes.
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl true
  def init(_opts) do
    # System.schedulers_online() returns the number of 
    # available schedulers on the current machine.
    child_specs =
      Enum.map(1..System.schedulers_online(), fn n ->
        %{
          id: :"supervised_worker_#{n}",
          start: {Worker, :start_link, [[registry: SupervisedPool.Registry]]}
        }
      end)

    children =
      [
        {Registry, name: SupervisedPool.Registry, keys: :duplicate}
      ] ++ child_specs

    Supervisor.init(children, strategy: :one_for_one)
  end
end

This module starts a number of workers equal to the number of available schedulers in the current machine.

System.schedulers_online()

We’ll start the SupervisedPool and visualize it in a Kino supervision tree diagram. You’ll see the same number of processes under the SupervisedPool as you have schedulers online.

{:ok, supervisor_pid} = SupervisedPool.start_link([])

Kino.Process.sup_tree(supervisor_pid)

Worker pools help applications run better by managing worker processes, making it easier to handle many tasks and scale our system without running into unbounded concurrency issues.

It’s important to note that there exist numerous approaches to designing and implementing worker pools. While the code we’ve developed to construct our custom worker pool from scratch is one viable solution, there are certainly better options.

Your Turn

Create a mix application with a supervised worker pool that allows you to configure the number of workers started in the worker pool.

For example:

MyApp.SupervisedPool.start_link(name: :example_pool, size: 4)

Start the supervised worker pool as a child of your application supervisor.

Poolboy

Poolboy is a reliable alternative to implementing a custom process pool that saves us time and effort.

We can specify the following configuration for the process pool.

  • :name: The name of the pool. The scope can be either :local, :global, or :via.
  • :worker_module: The module used to create workers for the pool.
  • :size: The maximum number of workers allowed in the pool.
  • :max_overflow (optional): The maximum number of temporary workers that can be created when the pool is empty.
  • :strategy (optional): Determines whether workers that return to the pool should be placed first or last in the line of available workers, with two possible values: :lifo or :fifo. The default is :lifo.
poolboy_config = [
  name: {:local, :worker},
  worker_module: Worker,
  size: 4
]

Poolboy provides a :poolboy.child_spec/2 function we can use to start a Poolboy manager in the supervision tree of our application.

children = [
  :poolboy.child_spec(:worker, poolboy_config)
]

opts = [strategy: :one_for_one, name: :my_pool_supervisor]

{:ok, supervisor_pid} = Supervisor.start_link(children, opts)

We can use the :poolboy.transaction/3 function to access one of the workers in our worker pool and perform a job.

:poolboy.transaction(:worker, fn pid -> Worker.perform_job(pid) end, 5000)

Due to the size: 4 configuration above, 4 workers will perform jobs concurrently in the worker pool. Notice that despite triggering 10 tasks, only 4 jobs occur simultaneously.

tasks =
  Enum.map(1..10, fn _ ->
    Task.async(fn ->
      :poolboy.transaction(:worker, fn pid -> Worker.perform_job(pid) end, 5000)
    end)
  end)

Task.await_many(tasks)

Here, we’ll use Kino.Process.seq_trace/2 to visualize the Poolboy pool manager starting three worker processes.

Notice there are three perform_job messages. There are also several processes being used under the hood in Poolboy.

Kino.Process.render_seq_trace(fn ->
  tasks =
    Enum.map(1..3, fn _ ->
      Task.async(fn ->
        :poolboy.transaction(:worker, fn pid -> Worker.perform_job(pid) end, 5000)
      end)
    end)

  Task.await_many(tasks)
end)

Your Turn

Add the Poolboy dependency to a mix project and start a worker pool under your application’s supervision tree.

Further Reading

Consider the following resource(s) to deepen your understanding of the topic.

Commit Your Progress

DockYard Academy now recommends you use the latest Release rather than forking or cloning our repository.

Run git status to ensure there are no undesirable changes. Then run the following in your command line from the curriculum folder to commit your progress.

$ git add .
$ git commit -m "finish Worker Pools reading"
$ git push

We’re proud to offer our open-source curriculum free of charge for anyone to learn from at their own pace.

We also offer a paid course where you can learn from an instructor alongside a cohort of your peers. We will accept applications for the June-August 2023 cohort soon.

Navigation

Home Report An Issue QueuesRubix Cube