The Partition Supervisor
Mix.install([
{:kino, "~> 0.9.0"}
])
Navigation
Home Introduction to dynamic supervisorScaling dynamic supervisorIntroduction
A PartitionSupervisor functions similarly to a regular supervisor, but with the added capability of creating partitions.
When a PartitionSupervisor is started, it will create multiple partitions and will start a process under each of the partitions.
This feature becomes particularly valuable when certain processes within a system have the potential to become bottlenecks. If these processes can easily partition their state without any interdependencies, the PartitionSupervisor can be used.
By starting multiple instances of such processes across different partitions, the workload can be distributed and potential bottlenecks can be avoided.
Usage
Once a PartitionSupervisor is started, we can dispatch messages to its children using the {:via, PartitionSupervisor, {name, key}}
. Here, name
refers to the name of the PartitionSupervisor, and key
is used for routing the message.
The PartitionSupervisor uses a routing strategy to determine the appropriate partition to which a message should be dispatched. When sending a message to a child process under a PartitionSupervisor, we provide a key
. Depending on the routing strategy in place, the PartitionSupervisor will utilize this key to select the specific partition to which the message should be sent.
Let’s explore an example to gain a better understanding of this concept.
Lets create a simple GenServer which we can start under the partition supervisor
defmodule EchoServer do
use GenServer
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
@impl true
def init(args) do
IO.inspect("EchoServer #{inspect(self())} started with args: #{inspect(args)}")
{:ok, :noop}
end
@impl true
def handle_call({:echo, msg}, _from, state) do
IO.inspect("EchoServer(#{inspect(self())}) echoing: #{inspect(msg)}")
{:reply, msg, state}
end
end
Nows lets start a partition supervisor
{:ok, supervisor_pid} =
PartitionSupervisor.start_link(
name: EchoServerPartitionSupervisor,
# Use the default child_spec/1 function of the GenServer
child_spec: EchoServer.child_spec(:test_arg)
)
Now lets visualize the supervision tree.
Kino.Process.render_sup_tree(supervisor_pid)
From the above output we can now see that multiple processes of the EchoServer
GenServer were started by the Partition Supervisor. A separate instance of the EchoServer
was started for each partition that was created.
By default the number of partitions a PartitionSupervisor will create is equal to System.schedulers_online()
(typically the number of CPU cores).
System.schedulers_online()
The number of processes(partitions) we see in the supervision tree must match the above output returned from System.schedulers_online()
.
The PartitionSupervisor provides additional options that can be passed during its initialization:
-
:partitions
- This option accepts a positive integer value that represents the number of partitions to create. By default, it is set toSystem.schedulers_online()
, which corresponds to the number of online schedulers in the system. -
:with_arguments
- A two-argument anonymous function that allows the partition to be given to the child starting function.
In addition to these specific options, other common options such as :name
, :child_spec
, :max_restarts
, and :max_seconds
can be used with the PartitionSupervisor, and they function as they do in regular supervisors.
Now lets restart our PartitionSupervisor with some of these options to customize its behaviour…
# Defining a new echo server GenServer with a start_link/2 function
# to also receive the partition number as an argument.
defmodule EchoServerV2 do
use GenServer
def start_link(args, partition_number) do
GenServer.start_link(__MODULE__, [args, partition_number])
end
@impl true
def init([args, partition_number]) do
IO.inspect(
"EchoServer #{inspect(self())} started on partition #{partition_number} with args: #{inspect(args)}"
)
# We save the partition number in the GenServer state
{:ok, partition_number}
end
@impl true
def handle_call({:echo, msg}, _from, partition_number) do
IO.inspect(
"EchoServer(#{inspect(self())})(partition=#{partition_number}) echoing: #{inspect(msg)}"
)
{:reply, msg, partition_number}
end
end
# Stop the existing supervisor
:ok = PartitionSupervisor.stop(EchoServerPartitionSupervisor)
# Start the EchoServerPartitionSupervisor again with added options
{:ok, supervisor_pid} =
PartitionSupervisor.start_link(
name: EchoServerPartitionSupervisor,
child_spec: EchoServerV2.child_spec(:test_arg),
# We explicitly specify the number of partitions to create
partitions: 3,
with_arguments: fn [existing_args], partition ->
# Inject the partition number into the args given to the child process
# This will be passed to the child process when it is started via the
# `start_link(args, partition_number)` function.
[existing_args, partition]
end
)
Kino.Process.render_sup_tree(supervisor_pid)
Notice that this time only 3 partitions were created and 3 child processes were started.
Also notice how the partition number was passed as an argument to every child process, this is due to the use of the with_arguments
option.
The with_arguments
option allows us to customize the arguments passed to child processes in a partitioned supervision setup. By providing a two-argument anonymous function, we can include the partition number in the arguments used to start each child process. This allows each process to have knowledge of the partition_number on which it is running.
Sending messages
To send a message to a child process under a PartitionSupervisor, we can use the {:via, PartitionSupervisor, {name, key}}
tuple. Here key is used for routing the message to the appropriate partition.
By using this message dispatching method, we can effectively send messages to specific child processes running under the PartitionSupervisor based on the key that we pass.
# Send a message to the EchoServer running on partition 0
:hi =
GenServer.call(
{:via, PartitionSupervisor, {EchoServerPartitionSupervisor, 0}},
{:echo, :hi}
)
# Send a message to the EchoServer running on partition 1
:ola =
GenServer.call(
{:via, PartitionSupervisor, {EchoServerPartitionSupervisor, 1}},
{:echo, :ola}
)
# Send a message to the EchoServer running on partition 2
:adios =
GenServer.call(
{:via, PartitionSupervisor, {EchoServerPartitionSupervisor, 2}},
{:echo, :adios}
)
# Send a message to the EchoServer running on partition 1
# (the routing key 1000 results in partition 1 to be selected)
GenServer.call(
{:via, PartitionSupervisor, {EchoServerPartitionSupervisor, 1000}},
{:echo, :boom}
)
When using integer keys with the PartitionSupervisor, the routing strategy is determined by the formula rem(abs(key), partitions)
. In the example we provided, the message with the key 1000
was sent to partition 1 because rem(abs(1000), 3) = rem(1000, 3) = 1
.
However, if the routing key is not an integer, the :erlang.phash2(key, partitions)
hash function is used as the routing strategy. This function calculates a hash value based on the key and the number of partitions, resulting in the selection of the appropriate partition to which the message should be dispatched.
:erlang.phash2("1000", 3) |> IO.inspect(label: "Partition")
GenServer.call(
{:via, PartitionSupervisor, {EchoServerPartitionSupervisor, "1000"}},
{:echo, :hello_world}
)
If we want to retrieve the PID of the process running on a partition for a certain key, we can use GenServer.whereis({:via, PartitionSupervisor, {name, key}})
# Get the PID of the process running in the partition that would be
# selected when using "1000" as the key
GenServer.whereis({:via, PartitionSupervisor, {EchoServerPartitionSupervisor, "1000"}})
Implementation detail
The PartitionSupervisor uses either an ETS table or a Registry
to manage all of the partitions. Under the hood, the PartitionSupervisor generates a child spec for each partition and then acts as a regular supervisor. The ID of each child spec is the partition number.