Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Voting

voting.livemd

Voting

Prepare

Mix.install([
  {:kino, github: "elixir-nx/kino"},
  {:png, ">= 0.0.0"},
  {:nx, "~> 0.1.0-dev", github: "elixir-nx/nx", branch: "main", sparse: "nx", override: true},
  {:exla, "~> 0.1.0-dev", github: "elixir-nx/nx", sparse: "exla"}
])

Define behaviour for ranked voting methods. These methods requires each voter to order candidates by their preference. Such methods are currently the most popular ones in current use in most countries that have elections.

defmodule Voting do
  import Nx.Defn

  @defn_compiler {EXLA, keep_on_device: true}

  def candidates(list) do
    list
    |> Nx.tensor(names: [:candidate, :pos])
    |> Nx.new_axis(1, :y)
    |> Nx.new_axis(1, :x)
    |> Nx.new_axis(1, :voter)
  end

  def coordinates(size) do
    for j <- 0..(size - 1), i <- 0..(size - 1) do
      [1.5 * i / size - 0.25, 1.5 * j / size - 0.25]
    end
    |> Nx.tensor()
    |> Nx.reshape({size, size, 2}, names: [:x, :y, :pos])
  end

  def votes(pos, candidates, voters) do
    Nx.random_normal({voters, 2}, 0.0, 0.5, names: [:voter, :pos])
    |> build_votes(pos, candidates)
  end

  defnp build_votes(scatter, pos, candidates) do
    scatter
    |> Nx.new_axis(1, :y)
    |> Nx.new_axis(1, :x)
    |> Nx.add(pos)
    |> Nx.new_axis(0, :candidate)
    |> Nx.subtract(candidates)
    |> Nx.power(2)
    |> Nx.sum(axes: [:pos])
    |> Nx.transpose(axes: [:x, :y, :voter, :candidate])
    |> Nx.argsort(axis: :candidate)
  end
end

First Past The Post is the simplest election method. We count which candidate was placed first by most of the voters and pick them as a winner.

defmodule Hare do
  @behaviour Voting.Ranked

  def pick_winner(votes), do: round(votes, length(votes) / 2, [])

  def round(votes, threshold, eliminated) do
    results =
      votes
      # Remove discarded candidates
      |> Enum.map(&amp;(&amp;1 -- eliminated))
      |> Enum.frequencies_by(&amp;hd/1)
      |> Enum.sort_by(&amp;elem(&amp;1, 1), :desc)

    case results do
      [{winner, votes} | _rest] when votes >= threshold ->
        winner

      _ ->
        {candidate, _} = List.last(results)

        round(votes, threshold, [candidate | eliminated])
    end
  end
end
defmodule Voting.Graph do
  @palette [
    {0x66, 0xC2, 0xA5},
    {0xFC, 0x8D, 0x62},
    {0x8D, 0xA0, 0xCB},
    {0xE7, 0x8A, 0xC3},
    {0xA6, 0xD8, 0x54}
  ]

  def draw(%Nx.Tensor{shape: {size, size}} = results) do
    draw(results, size)
  end

  def draw(results, size) do
    {:ok, file} = StringIO.open(<<>>, encoding: :latin1)

    png =
      :png.create(%{
        size: {size, size},
        mode: {:indexed, 8},
        call: &amp;IO.binwrite(file, &amp;1),
        palette: {:rgb, 8, @palette}
      })

    each_row(results, size, &amp;:png.append(png, {:row, &amp;1}))

    :ok = :png.close(png)

    file
    |> StringIO.contents()
    |> elem(1)
  end

  defp each_row(%Nx.Tensor{shape: {size, size}} = t, size, cb) do
    t
    |> Nx.to_batched_list(1)
    |> Enum.each(&amp;cb.(Nx.to_flat_list(&amp;1)))
  end

  defp each_row(stream, size, cb) do
    stream
    |> Stream.chunk_every(size)
    |> Stream.take(size)
    |> Enum.each(cb)
  end
end
candidates = [{0.5, 0.99}, {0.07, 0.25}, {0.93, 0.25}]
candidates = Voting.candidates([[0.5, 0.99], [0.07, 0.25], [0.93, 0.25]])
coordinates = Voting.coordinates(100)
voters =
  Stream.repeatedly(fn -> Voting.votes(coordinates, candidates, 1000) end)
  |> Enum.take(2)

:ok
# Borda

defmodule NxBorda do
  import Nx.Defn

  @defn_compiler {EXLA, keep_on_device: true}

  defn run(votes) do
    a = score(votes, 0)
    b = score(votes, 1)
    c = score(votes, 2)

    [a, b, c]
    |> Nx.concatenate(axis: :score)
  end

  defnp score(sorted, candidate) do
    Nx.sum(sorted == candidate, axes: [:voter])
    |> Nx.multiply(Nx.iota({3}) + 1)
    |> Nx.sum(axes: [:candidate])
    |> Nx.new_axis(-1, :score)
  end
end
voters
|> Task.async_stream(&amp;NxBorda.run/1)
|> Stream.map(fn {:ok, t} -> t end)
|> Enum.reduce(&amp;Nx.add/2)
|> Nx.argmin(axis: :score)
|> Voting.Graph.draw()
|> Kino.Image.new(:png)
defmodule NxFPTP do
  import Nx.Defn

  @defn_compiler {EXLA, keep_on_device: true}

  defn run(votes) do
    votes
    |> Nx.slice([0, 0, 0, 0], [100, 100, 1000, 1])
    |> Nx.equal(Nx.iota({3}))
    |> Nx.sum(axes: [:voter])
  end
end

voters
|> Enum.map(&amp;NxFPTP.run/1)
|> Enum.take(1)
|> Enum.reduce(&amp;Nx.add/2)
|> Nx.argmax(axis: :candidate)
|> Voting.Graph.draw()
|> Kino.Image.new(:png)
defmodule NxSTV do
  import Kernel, except: [round: 1]
  import Nx.Defn

  @defn_compiler {EXLA, keep_on_device: true}

  def run(votes, candidates) do
    for perm <- permutations(Enum.to_list(0..(candidates - 1))), into: %{} do
      {perm, score(votes, Nx.tensor(perm))}
    end
  end

  defp permutations([]), do: [[]]

  defp permutations(list) do
    for h <- list, t <- permutations(list -- [h]), do: [h | t]
  end

  defnp score(sorted, set) do
    (sorted == set)
    |> Nx.all?(axes: [:candidate])
    |> Nx.sum(axes: [:voter])
    |> Nx.new_axis(-1, :score)
  end

  def reduce(results, agg) do
    results
    |> Enum.reduce(agg, fn {c, tensor}, agg ->
      Map.update(agg, c, tensor, &amp;Nx.add(&amp;1, tensor))
    end)
  end

  def winner(results) do
    # results = Nx.concatenate(tensors, axis: :score)
    half =
      results
      |> Map.values()
      |> Nx.concatenate(axis: -1)
      |> Nx.sum(axes: [:score])
      |> Nx.divide(2)
      |> Nx.new_axis(-1)

    round(results, half)

    # |> Map.new(fn {k, v} -> {k, Nx.select(Nx.greater(v, half), Nx.tensor(k), Nx.tensor(-1))} end)
  end

  defp round(results, half) do
    totals =
      results
      |> Enum.group_by(&amp;hd(elem(&amp;1, 0)))
      |> Enum.map(fn {k, v} ->
        total =
          v
          |> Enum.map(&amp;elem(&amp;1, 1))
          |> Enum.reduce(&amp;Nx.add/2)

        total
      end)
      |> Nx.concatenate(axis: :score)

    # |> IO.inspect()

    # compute_round(totals, half)
  end

  defnp compute_round(totals, half) do
    losers = Nx.argmin(totals, axis: :score)
    winners = totals |> Nx.greater(half)
    winners = Nx.select(Nx.sum(winners, axes: [:score]), Nx.argmax(winners, axis: :score), -1)
    empty = Nx.equal(winners, -1)

    {winners, losers, empty}
  end
end

voters
|> Stream.map(&amp;NxSTV.run(&amp;1, 3))
|> Enum.take(2)
|> Enum.reduce(&amp;NxSTV.reduce/2)
|> NxSTV.winner()
s = 10
g = Nx.iota({s})
candidates = [{0.07, 0.17}, {0.49, 0.01}]
candidates = candidates ++ [{0.41, 0.02}]
candidates = [{0.93, 0.49}, {0.79, 0.42}, {0.27, 0.45}]