Voting
Prepare
Mix.install([
{:kino, github: "elixir-nx/kino"},
{:png, ">= 0.0.0"},
{:nx, "~> 0.1.0-dev", github: "elixir-nx/nx", branch: "main", sparse: "nx", override: true},
{:exla, "~> 0.1.0-dev", github: "elixir-nx/nx", sparse: "exla"}
])
Define behaviour for ranked voting methods. These methods requires each voter to order candidates by their preference. Such methods are currently the most popular ones in current use in most countries that have elections.
defmodule Voting do
import Nx.Defn
@defn_compiler {EXLA, keep_on_device: true}
def candidates(list) do
list
|> Nx.tensor(names: [:candidate, :pos])
|> Nx.new_axis(1, :y)
|> Nx.new_axis(1, :x)
|> Nx.new_axis(1, :voter)
end
def coordinates(size) do
for j <- 0..(size - 1), i <- 0..(size - 1) do
[1.5 * i / size - 0.25, 1.5 * j / size - 0.25]
end
|> Nx.tensor()
|> Nx.reshape({size, size, 2}, names: [:x, :y, :pos])
end
def votes(pos, candidates, voters) do
Nx.random_normal({voters, 2}, 0.0, 0.5, names: [:voter, :pos])
|> build_votes(pos, candidates)
end
defnp build_votes(scatter, pos, candidates) do
scatter
|> Nx.new_axis(1, :y)
|> Nx.new_axis(1, :x)
|> Nx.add(pos)
|> Nx.new_axis(0, :candidate)
|> Nx.subtract(candidates)
|> Nx.power(2)
|> Nx.sum(axes: [:pos])
|> Nx.transpose(axes: [:x, :y, :voter, :candidate])
|> Nx.argsort(axis: :candidate)
end
end
First Past The Post is the simplest election method. We count which candidate was placed first by most of the voters and pick them as a winner.
defmodule Hare do
@behaviour Voting.Ranked
def pick_winner(votes), do: round(votes, length(votes) / 2, [])
def round(votes, threshold, eliminated) do
results =
votes
# Remove discarded candidates
|> Enum.map(&(&1 -- eliminated))
|> Enum.frequencies_by(&hd/1)
|> Enum.sort_by(&elem(&1, 1), :desc)
case results do
[{winner, votes} | _rest] when votes >= threshold ->
winner
_ ->
{candidate, _} = List.last(results)
round(votes, threshold, [candidate | eliminated])
end
end
end
defmodule Voting.Graph do
@palette [
{0x66, 0xC2, 0xA5},
{0xFC, 0x8D, 0x62},
{0x8D, 0xA0, 0xCB},
{0xE7, 0x8A, 0xC3},
{0xA6, 0xD8, 0x54}
]
def draw(%Nx.Tensor{shape: {size, size}} = results) do
draw(results, size)
end
def draw(results, size) do
{:ok, file} = StringIO.open(<<>>, encoding: :latin1)
png =
:png.create(%{
size: {size, size},
mode: {:indexed, 8},
call: &IO.binwrite(file, &1),
palette: {:rgb, 8, @palette}
})
each_row(results, size, &:png.append(png, {:row, &1}))
:ok = :png.close(png)
file
|> StringIO.contents()
|> elem(1)
end
defp each_row(%Nx.Tensor{shape: {size, size}} = t, size, cb) do
t
|> Nx.to_batched_list(1)
|> Enum.each(&cb.(Nx.to_flat_list(&1)))
end
defp each_row(stream, size, cb) do
stream
|> Stream.chunk_every(size)
|> Stream.take(size)
|> Enum.each(cb)
end
end
candidates = [{0.5, 0.99}, {0.07, 0.25}, {0.93, 0.25}]
candidates = Voting.candidates([[0.5, 0.99], [0.07, 0.25], [0.93, 0.25]])
coordinates = Voting.coordinates(100)
voters =
Stream.repeatedly(fn -> Voting.votes(coordinates, candidates, 1000) end)
|> Enum.take(2)
:ok
# Borda
defmodule NxBorda do
import Nx.Defn
@defn_compiler {EXLA, keep_on_device: true}
defn run(votes) do
a = score(votes, 0)
b = score(votes, 1)
c = score(votes, 2)
[a, b, c]
|> Nx.concatenate(axis: :score)
end
defnp score(sorted, candidate) do
Nx.sum(sorted == candidate, axes: [:voter])
|> Nx.multiply(Nx.iota({3}) + 1)
|> Nx.sum(axes: [:candidate])
|> Nx.new_axis(-1, :score)
end
end
voters
|> Task.async_stream(&NxBorda.run/1)
|> Stream.map(fn {:ok, t} -> t end)
|> Enum.reduce(&Nx.add/2)
|> Nx.argmin(axis: :score)
|> Voting.Graph.draw()
|> Kino.Image.new(:png)
defmodule NxFPTP do
import Nx.Defn
@defn_compiler {EXLA, keep_on_device: true}
defn run(votes) do
votes
|> Nx.slice([0, 0, 0, 0], [100, 100, 1000, 1])
|> Nx.equal(Nx.iota({3}))
|> Nx.sum(axes: [:voter])
end
end
voters
|> Enum.map(&NxFPTP.run/1)
|> Enum.take(1)
|> Enum.reduce(&Nx.add/2)
|> Nx.argmax(axis: :candidate)
|> Voting.Graph.draw()
|> Kino.Image.new(:png)
defmodule NxSTV do
import Kernel, except: [round: 1]
import Nx.Defn
@defn_compiler {EXLA, keep_on_device: true}
def run(votes, candidates) do
for perm <- permutations(Enum.to_list(0..(candidates - 1))), into: %{} do
{perm, score(votes, Nx.tensor(perm))}
end
end
defp permutations([]), do: [[]]
defp permutations(list) do
for h <- list, t <- permutations(list -- [h]), do: [h | t]
end
defnp score(sorted, set) do
(sorted == set)
|> Nx.all?(axes: [:candidate])
|> Nx.sum(axes: [:voter])
|> Nx.new_axis(-1, :score)
end
def reduce(results, agg) do
results
|> Enum.reduce(agg, fn {c, tensor}, agg ->
Map.update(agg, c, tensor, &Nx.add(&1, tensor))
end)
end
def winner(results) do
# results = Nx.concatenate(tensors, axis: :score)
half =
results
|> Map.values()
|> Nx.concatenate(axis: -1)
|> Nx.sum(axes: [:score])
|> Nx.divide(2)
|> Nx.new_axis(-1)
round(results, half)
# |> Map.new(fn {k, v} -> {k, Nx.select(Nx.greater(v, half), Nx.tensor(k), Nx.tensor(-1))} end)
end
defp round(results, half) do
totals =
results
|> Enum.group_by(&hd(elem(&1, 0)))
|> Enum.map(fn {k, v} ->
total =
v
|> Enum.map(&elem(&1, 1))
|> Enum.reduce(&Nx.add/2)
total
end)
|> Nx.concatenate(axis: :score)
# |> IO.inspect()
# compute_round(totals, half)
end
defnp compute_round(totals, half) do
losers = Nx.argmin(totals, axis: :score)
winners = totals |> Nx.greater(half)
winners = Nx.select(Nx.sum(winners, axes: [:score]), Nx.argmax(winners, axis: :score), -1)
empty = Nx.equal(winners, -1)
{winners, losers, empty}
end
end
voters
|> Stream.map(&NxSTV.run(&1, 3))
|> Enum.take(2)
|> Enum.reduce(&NxSTV.reduce/2)
|> NxSTV.winner()
s = 10
g = Nx.iota({s})
candidates = [{0.07, 0.17}, {0.49, 0.01}]
candidates = candidates ++ [{0.41, 0.02}]
candidates = [{0.93, 0.49}, {0.79, 0.42}, {0.27, 0.45}]