Powered by AppSignal & Oban Pro

Rendevouz hashing ...

content/rendevouz_hashing.livemd

Rendevouz hashing …

Mix.install(
  [
    {:rendevous_hash, path: "#{__DIR__}/.."},
    {:combination, "~> 0.0.3"},
    {:kino, "~> 0.16.0"},
    {:benchee, "~> 1.4"}
  ],
  consolidate_protocols: false
)

Section

The %ComputeNode{} struct represents a VM in a certain compute :region (like Amsterdam or Dublin), an availability :zone and the :id of the virtual machine in that AZ.

defmodule ComputeNode do
  @moduledoc """
  Represents a compute node in the distributed system.

  A compute node is uniquely identified by its region, availability zone, and VM ID.
  """

  @enforce_keys [:region, :zone, :id]
  defstruct [:region, :zone, :id]

  @type t :: %__MODULE__{
          region: String.t(),
          zone: String.t(),
          id: String.t()
        }

  @doc """
  Creates a new compute node.

  ## Examples

      iex> ComputeNode.new("Amsterdam", "AZ1", "VM1")
      %ComputeNode{region: "Amsterdam", zone: "AZ1", id: "VM1"}
  """
  def new(region, zone, id) when is_integer(id) do
    %__MODULE__{
      region: region,
      zone: zone,
      id: id |> Integer.to_string() |> String.pad_leading(4, "0")
    }
  end

  def new(region, zone, id) do
    %__MODULE__{
      region: region,
      zone: zone,
      id: id
    }
  end

  @doc """
  Returns the availability zone tuple for the node.
  """
  def az_tuple(%__MODULE__{region: region, zone: zone}) do
    {region, zone}
  end
end

defimpl String.Chars, for: ComputeNode do
  def to_string(%ComputeNode{region: region, zone: zone, id: id})
      when is_integer(id) do
    "#{region}-#{zone}-#{String.pad_leading(Integer.to_string(id), 4, "0")}"
  end

  def to_string(%ComputeNode{region: region, zone: zone, id: id}) do
    "#{region}-#{zone}-#{id}"
  end
end

IO.puts(ComputeNode.new("AMS", "A", 3))
# Show the preferred servers for multiple users
datacenters =
  for region <- ["AMS", "DUB", "NYC", "CGN"],
      az <- ["AZ1", "AZ2", "AZ3"],
      id <- Range.new(1, 4) do
    ComputeNode.new(region, az, id)
  end

bucket_hashes = 
  datacenters
  |> RendevousHash.pre_compute_list()

["Christian", "Joe", "Lance"]
|> Enum.map(fn user ->
  server_list = 
    bucket_hashes
    |> RendevousHash.list(user)
    |> Enum.join(" / ")
  
  "#{String.pad_trailing(user, 12)}: #{server_list}"
end)
|> Enum.each(&amp;IO.puts/1)


# bucket_hashes
# |> RendevousHash.list("Chris")
# #|> RendevousHash.sort_by_optimum_storage_resiliency()
# |> IO.inspect(limit: :infinity)
regions = ~w{Florida  Tennessee Texas Utah}
zones = ["Zone A", "Zone B"]
vms = Range.new(1, 8) |> Enum.map(&amp;"VM #{&amp;1}")
determine_nodes = fn (name, regions, zones, vms) ->
  datacenters =
    for region <- regions,
        az <- zones,
        id <- vms do
      ComputeNode.new(region, az, id)
    end
  
  bucket_hashes = 
    datacenters
    |> RendevousHash.pre_compute_list()
      
  bucket_hashes
  |> RendevousHash.list(name)
  |> IO.inspect(label: "input", limit: :infinity)
  |> RendevousHash.sort_by_optimum_storage_resiliency()
  |> IO.inspect(label: "output", limit: :infinity)
end

text_input = Kino.Input.text("Customer name", default: "your_default_text_here", debounce: 30)

# Create frames for dynamic content
slider_frame = Kino.Frame.new()
value_frame = Kino.Frame.new()
image2_frame = Kino.Frame.new()

# Variable to track current nodes and slider
current_slider = nil

# Function to render images and value display
render_content = fn nodes, scale_factor ->
  max = length(nodes) - 1
  max = if max < 1, do: 1, else: max
  
  # Update value display
  value_display = Kino.Markdown.new("**Scale Factor:** #{scale_factor} | **Nodes:** #{length(nodes)} | **Max:** #{max}")
  Kino.Frame.render(value_frame, value_display)
  
  # Generate and display images
  image2 = RendevousHash.Helpers.Drawing.generate_svg(nodes, scale_factor) |> Kino.Image.new(:svg)
  Kino.Frame.render(image2_frame, image2)
end

# Function to create and set up a new slider
create_slider = fn nodes ->
  max = length(nodes) - 1
  max = if max < 1, do: 1, else: max  # Ensure max is at least 1
  
  # Use consistent default value
  default_scale = min(length(regions), max)
  
  slider = Kino.Input.range("Scale Factor", min: 1, max: max, default: default_scale, step: 1, debounce: 100)
  
  # Listen to the new slider
  slider
  |> Kino.Control.stream()
  |> Kino.listen(fn event ->
    scale_factor = event.value |> trunc()
    render_content.(nodes, scale_factor)
  end)
  
  # Return both slider and its default value
  {slider, default_scale}
end


# Function to update everything when text changes
update_from_text = fn text_value ->
  # Determine nodes from text value
  nodes = determine_nodes.(text_value, regions, zones, vms)
  
  # Create new slider with correct max and get its default value
  {slider, default_scale} = create_slider.(nodes)
  
  # Render the new slider
  Kino.Frame.render(slider_frame, slider)
  
  # Immediately render content with the slider's default value
  render_content.(nodes, default_scale)
end

# Display the layout first
layout = [text_input, slider_frame,value_frame,image2_frame] |> Kino.Layout.grid(columns: 1)

# Listen to text input changes AFTER layout is displayed
text_input
|> Kino.Control.stream()
|> Kino.listen(fn event ->
  update_from_text.(event.value)
end)

# Initial setup with default value
update_from_text.("your_default_text_here")  # Use the same default as the text input

layout
regions = ~w{Florida  Tennessee Texas Utah}
zones = ["Zone A", "Zone B"]
vms = Range.new(1, 8) |> Enum.map(&amp;"VM #{&amp;1}")

Kino.Datacenter.render(regions, zones, vms)

Show prefered server order for multiple actors

actors = 
  Range.new(1, 20)
  |> Enum.map(&amp;Integer.to_string/1)
  |> Enum.map(fn x -> String.pad_leading(x, 4, "0") end)
  |> Enum.map(fn x -> "actor_#{x}" end)
  |> rendevouz_hash.pre_compute_list()
  
actors
|> Enum.map(fn {key, key_hash} ->
  order = 
    bucket_hashes
    |> rendevouz_hash.list(key_hash)
    |> rendevouz_hash.sort_by_optimum_storage_resiliency()
    |> Enum.join(" / ")
  {key, order}
end)
|> Enum.map(fn {key, order} -> "#{String.pad_trailing(key, 15)}: #{order}" end)
|> Enum.each(&amp;IO.puts/1)

Pre-compute the hashes of 1 million actors

actor_hashes =
  Range.new(1, 2_000)
  |> Enum.map(fn x -> 
    x
    |> Integer.to_string()
    |> String.pad_leading(8, "0") 
    |> (&amp;("actor_#{&amp;1}")).()
  end)
  |> rendevouz_hash.pre_compute_list()
histogram =
  actor_hashes
  |> Enum.map(fn {key, key_hash} ->
    {key, rendevouz_hash.list(bucket_hashes, key_hash)}
  end)
  |> Enum.group_by(fn {_key, order} -> order end)
  |> Enum.map(fn {k, vals} -> 
    vals =
      vals
      |> Enum.map(fn {k,_v} -> k end)
    {k, Enum.count(vals)}
  end)
  |> Map.new()

With 8 VMs and 1 million actors, we ideally expect to see around 125k per VM, i.e. 1/8 of 1 million.

%{
  "Amsterdam" => 124845,
  "Berlin" => 124519,
  "Düsseldorf" => 125169,
  "Köln" => 124833,
  "Magdeburg" => 124802,
  "München" => 125554,
  "Seattle" => 125070,
  "Stuttgart" => 125208
}
frequencies =
  histogram
  |> Enum.map(fn {order, count} -> 
    primary_bucket = hd(order)
    # primary_bucket = order |> Enum.at(3)
    
    {primary_bucket, count} 
  end)
  |> Enum.reduce(%{}, fn {city, value}, acc ->
    Map.update(acc, city, value, &amp;(&amp;1 + value))
  end)

total_entries = Enum.sum_by(frequencies, &amp;elem(&amp;1, 1))

percentage_frequencies =
  frequencies
  |> Map.new(fn {bucket, entries} -> 
    percentage = 100.0 * entries / total_entries

    {bucket, {entries, percentage}}
  end)

percentage_frequencies
|> Enum.map(fn {bucket, {entries, percentage}} -> 
  bucket = bucket |> to_string() |> String.pad_trailing(15)
  percentage = percentage |> :erlang.float_to_binary(decimals: 3)
  
  "| `#{bucket}` | #{entries} | `#{percentage}%` |" 
end)
|> Enum.join("\n")
|> (fn body -> 
  """
  | Bucket | Entries | Percentage |
  | ------ | ------: | ---------: |
  #{body}
  """
end).()
|> Kino.Markdown.new()
dcs =
  for region <- ["Amsterdam", "Dublin"], 
    az <- ["AZ1", "AZ2"],
    id <- Range.new(1, 1) do
    %ComputeNode{region: region, zone: az, id: id}
  end
  |> Enum.map(&amp;to_string/1)
  |> rendevouz_hash.pre_compute_list()

actors = 
  Range.new(1, 20)
  |> Enum.map(&amp;Integer.to_string/1)
  |> Enum.map(fn x -> String.pad_leading(x, 4, "0") end)
  |> Enum.map(fn x -> "actor_#{x}" end)
  |> rendevouz_hash.pre_compute_list()

actors
|> Enum.map(fn {user, user_hash} ->
  server_list = 
    rendevouz_hash.list(dcs, user_hash) |> Enum.join(" / ")
  "#{String.pad_trailing(user, 12)}: #{server_list}"
end)
|> Enum.each(&amp;IO.puts/1)
# 32 VMs * 1M actors

defmodule HashBenchmark do
  def run_list_benchmark do
    dcs_hashes =
      for region <- ["DC1", "DC2", "DC3", "DC4"], # 4 regions, 2 AZs, 4 VMs -> 32 VMs
        az <- ["AZ1", "AZ2"],
        id <- Range.new(1, 4) do
        ComputeNode.new(region, az, id)
      end
      |> RendevousHash.Native.pre_compute_list()
        
    actor_hashes = 
      Range.new(1, 1024 * 1024)
      |> Enum.map(&amp;Integer.to_string/1)
      |> Enum.map(fn x -> String.pad_leading(x, 10, "0") end)
      |> Enum.map(fn x -> "actor_#{x}" end)
      |> RendevousHash.Native.pre_compute_list()

    Benchee.run(
      %{
        "elixir" => fn ->
          actor_hashes
          |> Map.new(fn {actor, actor_hash} ->
              {actor, dcs_hashes |> RendevousHash.Elixir.list(actor_hash)}
          end)
        end,
        "rustler" => fn ->
          actor_hashes
          |> Map.new(fn {actor, actor_hash} ->
              {actor, dcs_hashes |> RendevousHash.Native.list(actor_hash)}
          end)
        end,
      },
      time: 10,
      memory_time: 2,
      formatters: [Benchee.Formatters.Console]
    )
  end

  def run_full_benchmark do
    dcs_hashes =
      for region <- ["DC1", "DC2", "DC3", "DC4"], # 4 regions, 2 AZs, 4 VMs -> 32 VMs
        az <- ["AZ1", "AZ2"],
        id <- Range.new(1, 4) do
        ComputeNode.new(region, az, id)
      end
      |> RendevousHash.Native.pre_compute_list()
        
    actor_hashes = 
      Range.new(1, 1024 * 1024)
      |> Enum.map(&amp;Integer.to_string/1)
      |> Enum.map(fn x -> String.pad_leading(x, 10, "0") end)
      |> Enum.map(fn x -> "actor_#{x}" end)
      |> RendevousHash.Native.pre_compute_list()

    Benchee.run(
      %{
        "rust_plus_elixir" => fn ->
          actor_hashes
          |> Map.new(fn {actor, actor_hash} ->
              {actor, dcs_hashes |> RendevousHash.Native.list(actor_hash) |> RendevousHash.Elixir.sort_by_optimum_storage_resiliency()}
          end)
        end,
        "pure_rust" => fn ->
          actor_hashes
          |> Map.new(fn {actor, actor_hash} ->
              {actor, dcs_hashes |> RendevousHash.Native.list(actor_hash) |> RendevousHash.Native.sort_by_optimum_storage_resiliency()}
          end)
        end,
        "official" => fn ->
          actor_hashes
          |> Map.new(fn {actor, actor_hash} ->
              {actor, dcs_hashes |> RendevousHash.list(actor_hash) |> RendevousHash.sort_by_optimum_storage_resiliency()}
          end)
        end,
      },
      time: 10,
      memory_time: 2,
      formatters: [Benchee.Formatters.Console]
    )
  end
end
HashBenchmark.run_full_benchmark()