Rendevouz hashing …
Mix.install(
[
{:rendevous_hash, path: "#{__DIR__}/.."},
{:combination, "~> 0.0.3"},
{:kino, "~> 0.16.0"},
{:benchee, "~> 1.4"}
],
consolidate_protocols: false
)
Section
- https://en.wikipedia.org/wiki/Rendezvous_hashing
- https://www.npiontko.pro/2024/12/23/computation-efficient-rendezvous-hashing
The %ComputeNode{}
struct represents a VM in a certain compute :region
(like Amsterdam or Dublin), an availability :zone
and the :id
of the virtual machine in that AZ.
defmodule ComputeNode do
@moduledoc """
Represents a compute node in the distributed system.
A compute node is uniquely identified by its region, availability zone, and VM ID.
"""
@enforce_keys [:region, :zone, :id]
defstruct [:region, :zone, :id]
@type t :: %__MODULE__{
region: String.t(),
zone: String.t(),
id: String.t()
}
@doc """
Creates a new compute node.
## Examples
iex> ComputeNode.new("Amsterdam", "AZ1", "VM1")
%ComputeNode{region: "Amsterdam", zone: "AZ1", id: "VM1"}
"""
def new(region, zone, id) when is_integer(id) do
%__MODULE__{
region: region,
zone: zone,
id: id |> Integer.to_string() |> String.pad_leading(4, "0")
}
end
def new(region, zone, id) do
%__MODULE__{
region: region,
zone: zone,
id: id
}
end
@doc """
Returns the availability zone tuple for the node.
"""
def az_tuple(%__MODULE__{region: region, zone: zone}) do
{region, zone}
end
end
defimpl String.Chars, for: ComputeNode do
def to_string(%ComputeNode{region: region, zone: zone, id: id})
when is_integer(id) do
"#{region}-#{zone}-#{String.pad_leading(Integer.to_string(id), 4, "0")}"
end
def to_string(%ComputeNode{region: region, zone: zone, id: id}) do
"#{region}-#{zone}-#{id}"
end
end
IO.puts(ComputeNode.new("AMS", "A", 3))
# Show the preferred servers for multiple users
datacenters =
for region <- ["AMS", "DUB", "NYC", "CGN"],
az <- ["AZ1", "AZ2", "AZ3"],
id <- Range.new(1, 4) do
ComputeNode.new(region, az, id)
end
bucket_hashes =
datacenters
|> RendevousHash.pre_compute_list()
["Christian", "Joe", "Lance"]
|> Enum.map(fn user ->
server_list =
bucket_hashes
|> RendevousHash.list(user)
|> Enum.join(" / ")
"#{String.pad_trailing(user, 12)}: #{server_list}"
end)
|> Enum.each(&IO.puts/1)
# bucket_hashes
# |> RendevousHash.list("Chris")
# #|> RendevousHash.sort_by_optimum_storage_resiliency()
# |> IO.inspect(limit: :infinity)
regions = ~w{Florida Tennessee Texas Utah}
zones = ["Zone A", "Zone B"]
vms = Range.new(1, 8) |> Enum.map(&"VM #{&1}")
determine_nodes = fn (name, regions, zones, vms) ->
datacenters =
for region <- regions,
az <- zones,
id <- vms do
ComputeNode.new(region, az, id)
end
bucket_hashes =
datacenters
|> RendevousHash.pre_compute_list()
bucket_hashes
|> RendevousHash.list(name)
|> IO.inspect(label: "input", limit: :infinity)
|> RendevousHash.sort_by_optimum_storage_resiliency()
|> IO.inspect(label: "output", limit: :infinity)
end
text_input = Kino.Input.text("Customer name", default: "your_default_text_here", debounce: 30)
# Create frames for dynamic content
slider_frame = Kino.Frame.new()
value_frame = Kino.Frame.new()
image2_frame = Kino.Frame.new()
# Variable to track current nodes and slider
current_slider = nil
# Function to render images and value display
render_content = fn nodes, scale_factor ->
max = length(nodes) - 1
max = if max < 1, do: 1, else: max
# Update value display
value_display = Kino.Markdown.new("**Scale Factor:** #{scale_factor} | **Nodes:** #{length(nodes)} | **Max:** #{max}")
Kino.Frame.render(value_frame, value_display)
# Generate and display images
image2 = RendevousHash.Helpers.Drawing.generate_svg(nodes, scale_factor) |> Kino.Image.new(:svg)
Kino.Frame.render(image2_frame, image2)
end
# Function to create and set up a new slider
create_slider = fn nodes ->
max = length(nodes) - 1
max = if max < 1, do: 1, else: max # Ensure max is at least 1
# Use consistent default value
default_scale = min(length(regions), max)
slider = Kino.Input.range("Scale Factor", min: 1, max: max, default: default_scale, step: 1, debounce: 100)
# Listen to the new slider
slider
|> Kino.Control.stream()
|> Kino.listen(fn event ->
scale_factor = event.value |> trunc()
render_content.(nodes, scale_factor)
end)
# Return both slider and its default value
{slider, default_scale}
end
# Function to update everything when text changes
update_from_text = fn text_value ->
# Determine nodes from text value
nodes = determine_nodes.(text_value, regions, zones, vms)
# Create new slider with correct max and get its default value
{slider, default_scale} = create_slider.(nodes)
# Render the new slider
Kino.Frame.render(slider_frame, slider)
# Immediately render content with the slider's default value
render_content.(nodes, default_scale)
end
# Display the layout first
layout = [text_input, slider_frame,value_frame,image2_frame] |> Kino.Layout.grid(columns: 1)
# Listen to text input changes AFTER layout is displayed
text_input
|> Kino.Control.stream()
|> Kino.listen(fn event ->
update_from_text.(event.value)
end)
# Initial setup with default value
update_from_text.("your_default_text_here") # Use the same default as the text input
layout
regions = ~w{Florida Tennessee Texas Utah}
zones = ["Zone A", "Zone B"]
vms = Range.new(1, 8) |> Enum.map(&"VM #{&1}")
Kino.Datacenter.render(regions, zones, vms)
Show prefered server order for multiple actors
actors =
Range.new(1, 20)
|> Enum.map(&Integer.to_string/1)
|> Enum.map(fn x -> String.pad_leading(x, 4, "0") end)
|> Enum.map(fn x -> "actor_#{x}" end)
|> rendevouz_hash.pre_compute_list()
actors
|> Enum.map(fn {key, key_hash} ->
order =
bucket_hashes
|> rendevouz_hash.list(key_hash)
|> rendevouz_hash.sort_by_optimum_storage_resiliency()
|> Enum.join(" / ")
{key, order}
end)
|> Enum.map(fn {key, order} -> "#{String.pad_trailing(key, 15)}: #{order}" end)
|> Enum.each(&IO.puts/1)
Pre-compute the hashes of 1 million actors
actor_hashes =
Range.new(1, 2_000)
|> Enum.map(fn x ->
x
|> Integer.to_string()
|> String.pad_leading(8, "0")
|> (&("actor_#{&1}")).()
end)
|> rendevouz_hash.pre_compute_list()
histogram =
actor_hashes
|> Enum.map(fn {key, key_hash} ->
{key, rendevouz_hash.list(bucket_hashes, key_hash)}
end)
|> Enum.group_by(fn {_key, order} -> order end)
|> Enum.map(fn {k, vals} ->
vals =
vals
|> Enum.map(fn {k,_v} -> k end)
{k, Enum.count(vals)}
end)
|> Map.new()
With 8 VMs and 1 million actors, we ideally expect to see around 125k per VM, i.e. 1/8 of 1 million.
%{
"Amsterdam" => 124845,
"Berlin" => 124519,
"Düsseldorf" => 125169,
"Köln" => 124833,
"Magdeburg" => 124802,
"München" => 125554,
"Seattle" => 125070,
"Stuttgart" => 125208
}
frequencies =
histogram
|> Enum.map(fn {order, count} ->
primary_bucket = hd(order)
# primary_bucket = order |> Enum.at(3)
{primary_bucket, count}
end)
|> Enum.reduce(%{}, fn {city, value}, acc ->
Map.update(acc, city, value, &(&1 + value))
end)
total_entries = Enum.sum_by(frequencies, &elem(&1, 1))
percentage_frequencies =
frequencies
|> Map.new(fn {bucket, entries} ->
percentage = 100.0 * entries / total_entries
{bucket, {entries, percentage}}
end)
percentage_frequencies
|> Enum.map(fn {bucket, {entries, percentage}} ->
bucket = bucket |> to_string() |> String.pad_trailing(15)
percentage = percentage |> :erlang.float_to_binary(decimals: 3)
"| `#{bucket}` | #{entries} | `#{percentage}%` |"
end)
|> Enum.join("\n")
|> (fn body ->
"""
| Bucket | Entries | Percentage |
| ------ | ------: | ---------: |
#{body}
"""
end).()
|> Kino.Markdown.new()
dcs =
for region <- ["Amsterdam", "Dublin"],
az <- ["AZ1", "AZ2"],
id <- Range.new(1, 1) do
%ComputeNode{region: region, zone: az, id: id}
end
|> Enum.map(&to_string/1)
|> rendevouz_hash.pre_compute_list()
actors =
Range.new(1, 20)
|> Enum.map(&Integer.to_string/1)
|> Enum.map(fn x -> String.pad_leading(x, 4, "0") end)
|> Enum.map(fn x -> "actor_#{x}" end)
|> rendevouz_hash.pre_compute_list()
actors
|> Enum.map(fn {user, user_hash} ->
server_list =
rendevouz_hash.list(dcs, user_hash) |> Enum.join(" / ")
"#{String.pad_trailing(user, 12)}: #{server_list}"
end)
|> Enum.each(&IO.puts/1)
# 32 VMs * 1M actors
defmodule HashBenchmark do
def run_list_benchmark do
dcs_hashes =
for region <- ["DC1", "DC2", "DC3", "DC4"], # 4 regions, 2 AZs, 4 VMs -> 32 VMs
az <- ["AZ1", "AZ2"],
id <- Range.new(1, 4) do
ComputeNode.new(region, az, id)
end
|> RendevousHash.Native.pre_compute_list()
actor_hashes =
Range.new(1, 1024 * 1024)
|> Enum.map(&Integer.to_string/1)
|> Enum.map(fn x -> String.pad_leading(x, 10, "0") end)
|> Enum.map(fn x -> "actor_#{x}" end)
|> RendevousHash.Native.pre_compute_list()
Benchee.run(
%{
"elixir" => fn ->
actor_hashes
|> Map.new(fn {actor, actor_hash} ->
{actor, dcs_hashes |> RendevousHash.Elixir.list(actor_hash)}
end)
end,
"rustler" => fn ->
actor_hashes
|> Map.new(fn {actor, actor_hash} ->
{actor, dcs_hashes |> RendevousHash.Native.list(actor_hash)}
end)
end,
},
time: 10,
memory_time: 2,
formatters: [Benchee.Formatters.Console]
)
end
def run_full_benchmark do
dcs_hashes =
for region <- ["DC1", "DC2", "DC3", "DC4"], # 4 regions, 2 AZs, 4 VMs -> 32 VMs
az <- ["AZ1", "AZ2"],
id <- Range.new(1, 4) do
ComputeNode.new(region, az, id)
end
|> RendevousHash.Native.pre_compute_list()
actor_hashes =
Range.new(1, 1024 * 1024)
|> Enum.map(&Integer.to_string/1)
|> Enum.map(fn x -> String.pad_leading(x, 10, "0") end)
|> Enum.map(fn x -> "actor_#{x}" end)
|> RendevousHash.Native.pre_compute_list()
Benchee.run(
%{
"rust_plus_elixir" => fn ->
actor_hashes
|> Map.new(fn {actor, actor_hash} ->
{actor, dcs_hashes |> RendevousHash.Native.list(actor_hash) |> RendevousHash.Elixir.sort_by_optimum_storage_resiliency()}
end)
end,
"pure_rust" => fn ->
actor_hashes
|> Map.new(fn {actor, actor_hash} ->
{actor, dcs_hashes |> RendevousHash.Native.list(actor_hash) |> RendevousHash.Native.sort_by_optimum_storage_resiliency()}
end)
end,
"official" => fn ->
actor_hashes
|> Map.new(fn {actor, actor_hash} ->
{actor, dcs_hashes |> RendevousHash.list(actor_hash) |> RendevousHash.sort_by_optimum_storage_resiliency()}
end)
end,
},
time: 10,
memory_time: 2,
formatters: [Benchee.Formatters.Console]
)
end
end
HashBenchmark.run_full_benchmark()