Rate Limiting
Mix.install([
{:kino, "~> 0.14.2"}
])
Introduction
A rate limiter is a component that sits in front of a resource (or a pool of resources), and restricts the rate of requests to it. For the sake of simplicity, we will be working with a single resource in the form of a Worker
genserver:
graph LR;
RL[Rate Limiter];
C1[Client 1]-->RL;
C2[Client 2]-->RL;
C3[Client 3]-->RL;
RL-->W[Worker];
The rate limiter needs to be informed about the maximum allowed rate. This can take a number of forms. Some of these are:
- A feedback mechanism allowing the resource to control the flow.
- A period length and a number of requests to allow during this period.
- A number of requests to allow during a fixed period.
- A minimum period in between requests.
For reasons of simplicity, I have chosen to go for the last option. In a real-world scenario, I would go for a pluggable strategy that can be decided at initialization time.
Design
I have decided on a design where
- Clients don’t know about the rate limiter, but the workers do. Clients developers will thus only need to care about the interface of the worker.
- The interface side of the worker calls a generic interface function on the rate limiter. This means that the rate limiters interface is independent of the functionality offered by the worker: Adding a service to the worker does not affect the rate limiter.
For a cast message the design looks like this:
sequenceDiagram
Client-->>Worker: Specific interface function call
Worker-->>Rate Limiter: Generic interface function call
Rate Limiter->>Rate Limiter: cast
Rate Limiter->>Worker: cast
Here, the role of the cast in the Rate Limiter
is to queue the request so that the rate limiting can be implemented using the message inbox as queue. This is a volatile design choice.
While this setup protects the worker, the client is oblivious to the rate limiting. At times that may be fine, but it does come with a risk: If the client produces work packages faster than the rate limit, then the inbox of the rate limiter will grow. This is the practical equivalent of a memory leak, and violates the clients expectation of near-immediate processing. Some sort of flow control (e.g., back pressure) is needed. Calls are immune to this issue as they are synchronized by design.
For a call message the design looks like this:
sequenceDiagram
Client-->>Worker: Specific interface function call
Worker-->>Rate Limiter: Generic interface function call
Rate Limiter->>Rate Limiter: call
Rate Limiter->>Worker: call
Worker->>Client: response
The interesting part of this setup is that the call being issued from the client side (at the top of the diagram) is being forwarded from the Rate Limiter
to the Worker
. The responsibility of responding is thus delegated to the Worker
process. This has two implications:
-
The callback handler in the
Rate Limiter
is returning with a:noreply
tuple. -
The callback handler in the
Worker
process is mocking a respose to the client process by constructing and sending a:"$gen_call"
message.
The effect of this is that initial call by the client process waits for the worker that it called by proxy of the rate limiter instead of the rate limiter itself.
For demonstration purposes, printouts have been spattered over the code. This allows us to follow the order of execution and verify that the rate of the calls to the worker is limited.
Define Client Pool Size
kinos = [
Kino.Input.number("Client pool size:", default: 5),
Kino.Input.number("Rate limiter period:", default: 500),
]
Kino.Layout.grid(kinos)
[client_pool_size, rate_limiter_period] =
kinos
|> Enum.map(fn kino -> kino |> Kino.Input.read() end)
Implementation
defmodule RateLimiter do
use GenServer
# (client) interface
def start_link(period) do
GenServer.start_link(__MODULE__, period, name: __MODULE__)
end
def start(period) do
GenServer.start(__MODULE__, period, name: __MODULE__)
end
def cast(server, payload) do
GenServer.cast(__MODULE__, {:cast, server, payload})
end
def call(server, payload) do
GenServer.call(__MODULE__, {:call, server, payload})
end
# callbacks
@impl true
def init(period) do
{:ok, period}
end
@impl true
def handle_cast({:cast, server, payload}, period = state) do
IO.puts("RateLimiter[cast]: #{inspect payload}")
GenServer.cast(server, payload)
:timer.sleep(period)
{:noreply, state}
end
@impl true
def handle_call({:call, server, payload}, from, period = state) do
IO.puts("RateLimiter[call]: #{inspect payload}")
GenServer.call(server, payload)
send(server, {:"$gen_call", from, payload})
:timer.sleep(period)
{:noreply, state}
end
end
defmodule Worker do
use GenServer
# (client) interface
def start_link() do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def start() do
GenServer.start(__MODULE__, nil, name: __MODULE__)
end
def process(description, client_name) do
# RateLimiter.cast(__MODULE__, {:process, description, client_name})
RateLimiter.call(__MODULE__, {:process, description, client_name})
end
# callbacks
@impl true
def init(state) do
{:ok, state}
end
@impl true
def handle_cast({:process, description, client_name}, state) do
IO.puts("Worker[cast]: worker doing #{description} for #{client_name}")
{:noreply, state}
end
@impl true
def handle_call({:process, description, client_name} = message, _from, state) do
IO.puts("Worker[call]: worker doing #{description} for #{client_name}")
{:reply, {:ok, message}, state}
end
end
defmodule Client do
use GenServer
# (client) interface
def start_link(name) do
GenServer.start_link(__MODULE__, name)
end
def start(name) do
GenServer.start(__MODULE__, name)
end
# callbacks
@impl true
def init(name) do
prime()
{:ok, %{name: name, index: 0}}
end
@impl true
def handle_cast({:work}, state) do
index = state.index
IO.puts("#{state.name}: sending package ##{index}")
result = Worker.process("Work package ##{index}", state.name)
IO.puts("#{state.name}: received result #{inspect result}")
prime()
{:noreply, Map.put(state, :index, index+1)}
end
# helpers
defp prime() do
GenServer.cast(self(), {:work})
end
end
Demo
{:ok, worker_pid} = Worker.start()
{:ok, rl_pid} = RateLimiter.start(rate_limiter_period)
client_pids =
1..client_pool_size
|> Enum.map(fn i ->
{:ok, pid} = Client.start("Client ##{i}")
pid
end)
Cleanup:
client_pids
|> Enum.concat([worker_pid, rl_pid])
|> Enum.map(fn pid -> Process.exit(pid, :kill) end)
Note: In order to repeat the demo, you must execute all cells in the demo.
Discussion
This form of rate limiting is producer-driven: The producer decides on when to emit a value. The alternative is to use a consumer-driven design where the consumer side requests work to be emitted by the producer side. That is not always an option though. When it is an option, we should consider GenStage or some system that builds on it. Otherwise, one should consider relying on a queue.