Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Rate Limiting

rate-limiting.livemd

Rate Limiting

Mix.install([
  {:kino, "~> 0.14.2"}
])

Introduction

A rate limiter is a component that sits in front of a resource (or a pool of resources), and restricts the rate of requests to it. For the sake of simplicity, we will be working with a single resource in the form of a Worker genserver:

graph LR;
  RL[Rate Limiter];
  C1[Client 1]-->RL;
  C2[Client 2]-->RL;
  C3[Client 3]-->RL;
  RL-->W[Worker];

The rate limiter needs to be informed about the maximum allowed rate. This can take a number of forms. Some of these are:

  • A feedback mechanism allowing the resource to control the flow.
  • A period length and a number of requests to allow during this period.
  • A number of requests to allow during a fixed period.
  • A minimum period in between requests.

For reasons of simplicity, I have chosen to go for the last option. In a real-world scenario, I would go for a pluggable strategy that can be decided at initialization time.

Design

I have decided on a design where

  1. Clients don’t know about the rate limiter, but the workers do. Clients developers will thus only need to care about the interface of the worker.
  2. The interface side of the worker calls a generic interface function on the rate limiter. This means that the rate limiters interface is independent of the functionality offered by the worker: Adding a service to the worker does not affect the rate limiter.

For a cast message the design looks like this:

sequenceDiagram
    Client-->>Worker: Specific interface function call
    Worker-->>Rate Limiter: Generic interface function call
    Rate Limiter->>Rate Limiter: cast
    Rate Limiter->>Worker: cast

Here, the role of the cast in the Rate Limiter is to queue the request so that the rate limiting can be implemented using the message inbox as queue. This is a volatile design choice.

While this setup protects the worker, the client is oblivious to the rate limiting. At times that may be fine, but it does come with a risk: If the client produces work packages faster than the rate limit, then the inbox of the rate limiter will grow. This is the practical equivalent of a memory leak, and violates the clients expectation of near-immediate processing. Some sort of flow control (e.g., back pressure) is needed. Calls are immune to this issue as they are synchronized by design.

For a call message the design looks like this:

sequenceDiagram
    Client-->>Worker: Specific interface function call
    Worker-->>Rate Limiter: Generic interface function call
    Rate Limiter->>Rate Limiter: call
    Rate Limiter->>Worker: call
    Worker->>Client: response

The interesting part of this setup is that the call being issued from the client side (at the top of the diagram) is being forwarded from the Rate Limiter to the Worker. The responsibility of responding is thus delegated to the Worker process. This has two implications:

  1. The callback handler in the Rate Limiter is returning with a :noreply tuple.
  2. The callback handler in the Worker process is mocking a respose to the client process by constructing and sending a :"$gen_call" message.

The effect of this is that initial call by the client process waits for the worker that it called by proxy of the rate limiter instead of the rate limiter itself.

For demonstration purposes, printouts have been spattered over the code. This allows us to follow the order of execution and verify that the rate of the calls to the worker is limited.

Define Client Pool Size

kinos = [
  Kino.Input.number("Client pool size:", default: 5),
  Kino.Input.number("Rate limiter period:", default: 500),
]
Kino.Layout.grid(kinos)
[client_pool_size, rate_limiter_period] =
  kinos
  |> Enum.map(fn kino -> kino |> Kino.Input.read() end)

Implementation

defmodule RateLimiter do
  use GenServer

  # (client) interface

  def start_link(period) do
    GenServer.start_link(__MODULE__, period, name: __MODULE__)
  end

  def start(period) do
    GenServer.start(__MODULE__, period, name: __MODULE__)
  end

  def cast(server, payload) do
    GenServer.cast(__MODULE__, {:cast, server, payload})
  end
  
  def call(server, payload) do
    GenServer.call(__MODULE__, {:call, server, payload})
  end
  
  # callbacks

  @impl true
  def init(period) do
    {:ok, period}
  end
  
  @impl true
  def handle_cast({:cast, server, payload}, period = state) do
    IO.puts("RateLimiter[cast]: #{inspect payload}")
    GenServer.cast(server, payload)
    :timer.sleep(period)
    {:noreply, state}
  end
  
  @impl true
  def handle_call({:call, server, payload}, from, period = state) do
    IO.puts("RateLimiter[call]: #{inspect payload}")
    GenServer.call(server, payload)
    send(server, {:"$gen_call", from, payload})
    :timer.sleep(period)
    {:noreply, state}
  end
end
defmodule Worker do
  use GenServer

  # (client) interface

  def start_link() do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end
  
  def start() do
    GenServer.start(__MODULE__, nil, name: __MODULE__)
  end

  def process(description, client_name) do
    # RateLimiter.cast(__MODULE__, {:process, description, client_name})
    RateLimiter.call(__MODULE__, {:process, description, client_name})
  end
  
  # callbacks

  @impl true
  def init(state) do
    {:ok, state}
  end
  
  @impl true
  def handle_cast({:process, description, client_name}, state) do
    IO.puts("Worker[cast]: worker doing #{description} for #{client_name}")
    {:noreply, state}
  end
  
  @impl true
  def handle_call({:process, description, client_name} = message, _from, state) do
    IO.puts("Worker[call]: worker doing #{description} for #{client_name}")
    {:reply, {:ok, message}, state}
  end
end
defmodule Client do
  use GenServer

  # (client) interface

  def start_link(name) do
    GenServer.start_link(__MODULE__, name)
  end

  def start(name) do
    GenServer.start(__MODULE__, name)
  end

  # callbacks

  @impl true
  def init(name) do
    prime()
    {:ok, %{name: name, index: 0}}
  end

  @impl true
  def handle_cast({:work}, state) do
    index = state.index
    IO.puts("#{state.name}: sending package ##{index}")
    result = Worker.process("Work package ##{index}", state.name)
    IO.puts("#{state.name}: received result #{inspect result}")
    prime()
    {:noreply, Map.put(state, :index, index+1)}
  end

  # helpers

  defp prime() do
    GenServer.cast(self(), {:work})
  end
end

Demo

{:ok, worker_pid} = Worker.start()
{:ok, rl_pid} = RateLimiter.start(rate_limiter_period)
client_pids =
  1..client_pool_size
  |> Enum.map(fn i ->
    {:ok, pid} = Client.start("Client ##{i}")
    pid
    end)

Cleanup:

client_pids
|> Enum.concat([worker_pid, rl_pid])
|> Enum.map(fn pid -> Process.exit(pid, :kill) end)

Note: In order to repeat the demo, you must execute all cells in the demo.

Discussion

This form of rate limiting is producer-driven: The producer decides on when to emit a value. The alternative is to use a consumer-driven design where the consumer side requests work to be emitted by the producer side. That is not always an option though. When it is an option, we should consider GenStage or some system that builds on it. Otherwise, one should consider relying on a queue.