Load Balancing
Mix.install([
{:kino, "~> 0.14.2"}
])
Introduction
The job of distributing a set of tasks across a set of resources is called load balancing. Here, the tasks represents the load. Typically, the load is not known beforehand, but is being revealed over time. The load balancing process thus deals with a – potentially infinite – stream of tasks. Improper load balancing leads to low resource utilization.
In Elixir, the resources will typically be genservers, but could also be physical machines. In this notebook, we will focus on genservers.
Load balancing regimes are typically classified as being either static or dynamic. Dynamic regimes takes into account the state of the available resources while static regimes does not. Static regimes can (usually) be implemented on the client side. Dynamic regimes, on the other hand, require some sort of feedback loop: Information needs to flow from the reources to the decidion making logic. As there may be multiple clients, to avoid a many-to-many flow, a separate load balancer component is often introduced to the flow. All of this comes with complexity and overhead. Still, it allows for better utilization.
Typically, all tasks are considered to be independent of each other. That is, any task can be satisfied by any resource. But that is not always the case. Sometimes one task leaves state at the resource that is needed for the completion of a different task. This could be the case if a session from a single client spans multiple tasks, or if a task or resource concern overlaps multiple clients.
One way of dealing with this is to share a persistence layer between the resources. Another is to make sure to route related tasks to the same resource. But that, obviously, requires knowledge of which tasks are related.
In this notebook we are going to refer to clients and workers. Clients produce tasks, and workers are the resources that complete these tasks. At times, we will also refer to a load balancer, which is then a process that sits in between the clients and the resources as a narrow waist.
Define Pool Sizes
These controls determine how many clients and how many workers are started in each implementation of the load balancing regimes below.
kinos = [
Kino.Input.number("Client pool size:", default: 3),
Kino.Input.number("Worker pool size:", default: 2),
]
Kino.Layout.grid(kinos)
[client_pool_size, worker_pool_size] =
kinos
|> Enum.map(fn kino -> kino |> Kino.Input.read() end)
Randomized Client-Side Balancing
We start out with perhaps the simples possible solution. Here, each worker registers itself at a registry process with duplicate keys. The duplicate keys option makes a client look up a list of worker processes instead of a single process. That list fill reflect the set of running worker processes. The client then chooses a random worker from this list and offloads the task to that worker. It looks a bit like this:
flowchart LR;
R<-.->W1;
R[Registry];
C1[Client 1];
C2[Client 2];
C3[Client 3];
W1[Worker 1];
W2[Worker 2];
R-->C1;
R-->C2;
R-->C3;
C1-->W1;
C1-->W2;
C2-->W1;
C2-->W2;
C3-->W1;
C3-->W2;
W2<-.->R;
Worker
The RandomWorker
exposes one function for processing tasks, aptly named process
. The logic for picking a concrete worker instance (what is called a provider in the code) is placed in a separate call_random_provider
function. This means that it is trivial to extend the code with different forms of interface functions representing different types of tasks.
Task processing is simulated by a @job_size
delay (which is measured in ms).
defmodule RandomWorker do
use GenServer
@job_size 1000
# (client) interface
def start_link(name) do
GenServer.start_link(__MODULE__, name)
end
def start(name) do
GenServer.start(__MODULE__, name)
end
def process(description, client_name) do
call_random_provider(fn pid -> GenServer.call(pid, {:process, description, client_name}) end)
end
# callbacks
@impl true
def init(state) do
Registry.register(RandomWorkerRegistry, _key = nil, _value = nil)
{:ok, state}
end
@impl true
def handle_call({:process, description, client_name}, _from, name = state) do
IO.puts("#{name} doing #{description} for #{client_name}")
:timer.sleep(@job_size)
{:reply, {:ok, description}, state}
end
# helpers
defp call_random_provider(callback) do
key = nil
case Registry.lookup(RandomWorkerRegistry, key) do
[] ->
{:error, "No provider for '#{key}'"}
workers ->
{pid, _} = Enum.random(workers)
callback.(pid)
:ok
end
end
end
Client
The client knows nothing about its requests being load balanced. From it’s perspective, it just asks RandomWorker
to fulfill the task.
defmodule RandomClient do
use GenServer
# (client) interface
def start_link(name) do
GenServer.start_link(__MODULE__, name)
end
def start(name) do
GenServer.start(__MODULE__, name)
end
# callbacks
@impl true
def init(name) do
prime()
{:ok, %{name: name, index: 0}}
end
@impl true
def handle_cast({:work}, state) do
index = state.index
RandomWorker.process("Work package ##{index}", state.name)
prime()
{:noreply, Map.put(state, :index, index+1)}
end
# helpers
defp prime() do
GenServer.cast(self(), {:work})
end
end
Demo
Lets start a worker registry, some workers and some clients:
{:ok, random_registry_pid} = Registry.start_link(name: RandomWorkerRegistry, keys: :duplicate)
random_worker_pids =
1..worker_pool_size
|> Enum.map(fn i ->
{:ok, pid} = RandomWorker.start("Worker ##{i}")
pid
end)
random_client_pids =
1..client_pool_size
|> Enum.map(fn i ->
{:ok, pid} = RandomClient.start("Client ##{i}")
pid
end)
Cleanup
Now we have a lot of processes printing out information. Lets kill them before continuing:
[random_worker_pids, random_client_pids]
|> List.flatten()
|> Enum.map(fn pid -> Process.exit(pid, :kill) end)
GenServer.stop(random_registry_pid, :normal)
Round-Robin Balancing Component
While the last solution distributed the load, did so randomly. That results in a fluctuating utilization. That inherent fluctuation can be removed by scheduling the tasks round-robin. That is, task $\textcolor{purple}{n}$ is scheduled to worker $\textcolor{purple}{n ~\mathrm{mod}~ c}$ where $\textcolor{purple}{c}$ is the number of workers. This requires us to keep track of the $\textcolor{purple}{n}$ though.
We place a load balancer as a narrow waist in between the clients and the workers. Its job is to keep track of that $\textcolor{purple}{n}$, and to forward requests to the appropriate worker.
The regime looks something like this:
graph LR;
R[Registry];
LB[Load Balancer];
C1[Client 1]-->LB;
C2[Client 2]-->LB;
C3[Client 3]-->LB;
LB-->W1[Worker 1];
LB<-.->R;
LB-->W2[Worker 2];
R<-.->W1;
R<-.->W2;
The Worker
needs to register itself to the Registry
when initializing. Ideally, the Load Balancer
would not need to know about the Registry
.
The Load Balancer
needs to keep track of an ever-increasing index that has to be modulated by the number of worker instances. Ideally, the Worker
would not need to know about this index.
However, in one component, the knowledge about the number of worker instances (and thus the Registry
) and the index will have to be combined. This is either done in the Load Balancer
or in the Worker
.
We choose to do this in the Load Balancer
, for two reasons:
-
In a more realistic scenario the
Worker
would be more complex while theLoad Balancer
would not. This choice limits to complexity of the largest component. -
This mapping of responsibility will leave the
Worker
with a clean external interface where theprocess
function takes a pid. Accordingly, it can be used without theLoad Balancer
.
Client
The client is completely standard, except that it registers itself with RoundRobinWorkerRegistry
:
defmodule RoundRobinWorker do
use GenServer
@job_size 1000
# (client) interface
def start_link(name) do
GenServer.start_link(__MODULE__, name)
end
def start(name) do
GenServer.start(__MODULE__, name)
end
def process(pid, description, client_name) do
GenServer.call(pid, {:process, description, client_name})
end
# callbacks
@impl true
def init(state) do
Registry.register(RoundRobinWorkerRegistry, _key = nil, _value = nil)
{:ok, state}
end
@impl true
def handle_call({:process, description, client_name}, _from, name = state) do
IO.puts("#{name} doing #{description} for #{client_name}")
:timer.sleep(@job_size)
{:reply, {:ok, description}, state}
end
end
Load Balancer
The load balancer relies on a few tricks:
-
The private
engage_provider
function is the only function with an outgoing dependency. It’s job is to issue a call to a worker (round-robin style). -
The details of that call is expressed as an anonymous function. That way,
engage_provider
is only responsible for picking which provider (i.e., worker) to use. -
The call itself is realized by essentially forwarding the call to the chosen provider. This is accomplished by:
-
Not sending a response back by returning
{:noreply, new_state}
message. -
Sending an ordinary message of the
:"$gen_call"
type to the provider, thereby transferring the responsibility of answering the call.
-
Not sending a response back by returning
-
The
handle_call
function employs pattern matching to allow arbitrary calls to be forwarded. In this particular case, it is only used by theprocess
interface function, but other such functions could reuse this functionality. This means that it is very quick to add interface functions to be load balanced.
defmodule LoadBalancer do
use GenServer
# (client) interface
def start_link() do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def start() do
GenServer.start(__MODULE__, nil, name: __MODULE__)
end
def call(message) do
GenServer.call(__MODULE__, message)
end
def process(description, client_name) do
GenServer.call(__MODULE__, {:process, description, client_name})
end
# callbacks
@impl true
def init(nil) do
{:ok, 0}
end
@impl true
def handle_call(message, from, index = _state) do
IO.puts("load balancer forwarding work from #{inspect from} to #{index}: #{inspect message}")
engage_provider(index, fn pid ->
send(pid, {:"$gen_call", from, message})
end)
{:noreply, index+1}
end
# helpers
defp engage_provider(i, callback) do
key = nil
case Registry.lookup(RoundRobinWorkerRegistry, key) do
[] ->
{:error, "No provider for '#{key}'"}
workers ->
{pid, _} = Enum.at(workers, rem(i, length(workers)))
callback.(pid)
:ok
end
end
end
Client
The client is written exactly like RandomClient
except that the process
call now goes to the LoadBalancer
module instead of RandomWorker
.
defmodule RondRobinClient do
use GenServer
# (client) interface
def start_link(name) do
GenServer.start_link(__MODULE__, name)
end
def start(name) do
GenServer.start(__MODULE__, name)
end
# callbacks
@impl true
def init(name) do
prime()
{:ok, %{name: name, index: 0}}
end
@impl true
def handle_cast({:work}, state) do
index = state.index
LoadBalancer.process("Work package ##{index}", state.name)
prime()
{:noreply, Map.put(state, :index, index+1)}
end
# helpers
defp prime() do
GenServer.cast(self(), {:work})
end
end
Start Basics
{:ok, rr_registry_pid} = Registry.start_link(name: RoundRobinWorkerRegistry, keys: :duplicate)
{:ok, balancer_pid} = LoadBalancer.start()
Test
Single Test:
{:ok, test_worker_pid} = RoundRobinWorker.start("Test Worker")
Kino.Process.render_seq_trace(fn ->
LoadBalancer.process("Test package", "Me")
|> inspect()
end)
|> IO.puts()
In the resulting diagram you should see five different processes. At times other stuff will happen and you will see a lot more. If that is what you are seeing, please execute the cell again.
In ths sequence diagram, you should be able to identify the load balancer and the pid of the worker (the one in test_worker_pid
). You should also see a pid that received “INFO: in_request” messages and sends “INFO: runtime_evaluation_output” messages. This process has the role of forwarding printouts to this web view.
But why is the registry nowhere to be seen? While it is a genserver, it stores its data in an ETS table, and the interface function goes straight to that table. There is, thus no messaging happening.
Note: No response messages to the calls are displayed here. It is not clear to me whether this is a simplification choice or because a different mechanism is used. But the result is that we can’t see that it is the worker that responds to the client.
Lets clean up the test worker:
Process.exit(test_worker_pid, :kill)
Demo
Start the full thing up:
rr_worker_pids =
1..worker_pool_size
|> Enum.map(fn i ->
{:ok, pid} = RoundRobinWorker.start("Worker ##{i}")
pid
end)
rr_client_pids =
1..client_pool_size
|> Enum.map(fn i ->
{:ok, pid} = RondRobinClient.start("Client ##{i}")
pid
end)
Cleanup
Finally, lets clean up the client and worker processes:
[rr_worker_pids, rr_client_pids]
|> List.flatten()
|> Enum.map(fn pid -> Process.exit(pid, :kill) end)
Discussion
Bottlenecking the Load Balancer
By linking one set of clients to another set of workers through a single load balancer component we introduce a bottleneck. The moment that load balancer can’t keep up, performance will start to suffer, and – depending on how exactly that load balancer is used – its queue might also represent what is effectively a memory leak.
If back pressure can be employed, then that would be a good solution. Other – more general – solutions involve some sort of partitioning or tiered load balancers.
Predictable Client-to-Worker Mapping
In the above problem, we assumed that any request could (equally well) be satisfied by any worker. That is not always the case. There might be a need for per-client state. In this case we need a consistent mapping from client to workers.
One way of doing that would be to hash the pid of the client to an integer and then use the remainder when dividing with the number of workers. This solution, however, depends on the number of workers not changing.
If we need the ability to dynamically change the number of workers, then it should be relatively simple to either double or half the number of workers by splitting each or joining every two workers. Beyond that, it would require a full rehash. And even that would rely on having no state that crosses the boundary of a single worker.
Knowledge of Actual Worker Load
In the above scenario, tasks are all the same and take equally long to finish. When that is the case, a round-robin strategy will keep things balanced. That premise is, however, by no means always a given. When it is not given, a round-robin strategy will still distribute the load, but the time to task completion will vary depending on the load of the resource that it is assigned to.
Measuring time to completion will only help us if we know how long time the task is supposed to take. That is a very unique case. Having workers report actual load or capacity is a more general option.
Priority
It might be that certain tasks have higher priority than others. To account for that some sort of priority queue should be implemented. A hacky solution would be to bypass the ordinary message dispatch mechanism of the genserver and employ a selective receive construct. A better solution wouild likely be to simply let a load balancer process maintain a priority queue. That, however, comes with the potential for unbounded buffer growth unless addressed.
Distribution
The elephant in the room is that all of these solutions are local in nature. That is in part because the Registry module is local only, and in part because the latency penalty incurred by crossing a network should only be accepted after careful consideration. Such considerations are highly application specific.
Alernatives
In some use cases you have no control over the values being produced on the client side, and then you either need a buffer or load balancing. But in other cases (such as the buffer case from the previous sentence) one can get away with implementing back-pressure so that the workers drives the load by requesting work.
And yes, there already exists a number of modules for load balancing in Elixir. For most real projects, they should be used.