Chapter 12: How Classifiers Works (1 of 2)
Mix.install(
[
{:exla, "~> 0.5"},
{:nx, "~> 0.5"},
{:vega_lite, "~> 0.1.6"},
{:kino, "~> 0.8.1"},
{:kino_vega_lite, "~> 0.1.7"}
],
config: [nx: [default_backend: EXLA.Backend]]
)
Load the Data
filepath = Path.join(__DIR__, "./linearly_separable.txt") |> Path.expand()
[head | data] =
filepath
|> File.read!()
|> String.split("\r\n", trim: true)
inputs =
data
|> Enum.map(&String.split(&1, "\s", trim: true))
|> Enum.map(fn [input_a, input_b, label] ->
%{
"input_a" => String.to_float(input_a),
"input_b" => String.to_float(input_b),
"label" => String.to_integer(label)
}
end)
Kino.DataTable.new(inputs)
VegaLite.new(width: 600, height: 400)
|> VegaLite.data_from_values(inputs, only: ["input_a", "input_b", "label"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "input_a", type: :quantitative)
|> VegaLite.encode_field(:y, "input_b", type: :quantitative)
|> VegaLite.encode_field(:color, "label", type: :nominal)
Perceptron
Perceptron based on C7.Classifier
implementation.
defmodule C12.Perceptron do
import Nx.Defn
defn sigmoid(z) do
Nx.divide(1, Nx.add(1, Nx.exp(Nx.negate(z))))
end
defn forward(x, weight) do
weighted_sum = Nx.dot(x, weight)
sigmoid(weighted_sum)
end
defn classify(x, weight) do
y_hat = forward(x, weight)
labels = Nx.argmax(y_hat, axis: 1)
Nx.reshape(labels, {:auto, 1})
end
defn loss(x, y, weight) do
y_hat = forward(x, weight)
first_term = y * Nx.log(y_hat)
second_term = Nx.subtract(1, y) * Nx.log(Nx.subtract(1, y_hat))
Nx.add(first_term, second_term)
|> Nx.sum()
|> Nx.divide(elem(Nx.shape(x), 0))
|> Nx.negate()
end
defn gradient(x, y, weight) do
predictions = forward(x, weight)
errors = Nx.subtract(predictions, y)
n_examples = elem(Nx.shape(x), 0)
Nx.transpose(x)
|> Nx.dot(errors)
|> Nx.divide(n_examples)
end
def report(iteration, x_train, y_train, x_test, y_test, weight) do
matches = matches(x_test, y_test, weight) |> Nx.to_number()
n_test_examples = elem(Nx.shape(y_test), 0)
matches = matches * 100.0 / n_test_examples
training_loss = loss(x_train, y_train, weight) |> Nx.to_number()
IO.inspect("Iteration #{iteration} => Loss: #{training_loss}, #{matches}%")
{iteration, training_loss, matches}
end
defnp matches(x_test, y_test, weight) do
classify(x_test, weight)
|> Nx.equal(y_test)
|> Nx.sum()
end
def train(x_train, y_train, x_test, y_test, iterations, lr) do
init_weight = init_weight(x_train, y_train)
final_weight =
Enum.reduce(0..(iterations - 1), init_weight, fn i, weight ->
report(i, x_train, y_train, x_test, y_test, weight)
step(x_train, y_train, weight, lr)
end)
report(iterations, x_train, y_train, x_test, y_test, final_weight)
final_weight
end
defnp step(x, y, weight, lr) do
Nx.subtract(weight, Nx.multiply(gradient(x, y, weight), lr))
end
defnp init_weight(x, y) do
n_input_variables = elem(Nx.shape(x), 1)
n_classes = elem(Nx.shape(y), 1)
Nx.broadcast(0, {n_input_variables, n_classes})
end
end
Train Perceptron
# Prepend the bias function
prepend_bias_fn = fn x ->
bias = Nx.broadcast(1, {elem(Nx.shape(x), 0), 1})
# Insert a column of 1s in the position 0 of x.
# ("axis: 1" stands for: "insert a column, not a row")
# in python: `np.insert(X, 0, 1, axis=1)`
Nx.concatenate([bias, x], axis: 1)
end
# hot encode function
one_hot_encode_fn = fn y ->
Nx.equal(y, Nx.tensor([0, 1]))
end
# Create tensors out of the inputs
# NOTE: the tensor type is float, double-precision because
# with an high number of iterations (> 7000) the loss is too small
# to be represented with single-precision floating points.
x_train =
x_test =
inputs
|> Enum.map(&[&1["input_a"], &1["input_b"]])
|> Nx.tensor(type: {:f, 64})
|> then(fn x -> prepend_bias_fn.(x) end)
y_train_unencoded =
y_test =
inputs
|> Enum.map(& &1["label"])
|> Nx.tensor()
|> Nx.reshape({:auto, 1})
y_train = one_hot_encode_fn.(y_train_unencoded)
# Train the system
iterations = 10_000
lr = 0.1
weight = C12.Perceptron.train(x_train, y_train, x_test, y_test, iterations, lr)
Plot Decision Boundary
The idea:
- Generate a grid of points and use the min/max values from the initial dataset to compute the boundaries.
- Classify each point using the weight computed before with the initial dataset
- Plot the result highlighting the “decision boundary”
# Get x from the tensor
x =
x_train
|> Nx.slice_along_axis(1, 1, axis: 1)
# Get y from the tensor
y =
x_train
|> Nx.slice_along_axis(2, 1, axis: 1)
# Compute the grid boundaries
x_min =
x
|> Nx.to_flat_list()
|> Enum.min()
x_max =
x
|> Nx.to_flat_list()
|> Enum.max()
y_min =
y
|> Nx.to_flat_list()
|> Enum.min()
y_max =
y
|> Nx.to_flat_list()
|> Enum.max()
padding = 0.05
boundaries = %{
x_min: x_min - abs(x_min * padding),
x_max: x_max + abs(x_max * padding),
y_min: y_min - abs(y_min * padding),
y_max: y_max + abs(y_max * padding)
}
# Define the grid of data that will be classified
resolution = 200
x_step = (boundaries.x_max - boundaries.x_min) / resolution
y_step = (boundaries.y_max - boundaries.y_min) / resolution
grid =
for i <- 0..(resolution - 1), j <- 0..(resolution - 1) do
[boundaries.x_min + x_step * i, boundaries.y_min + y_step * j]
end
# Classification
labels =
grid
|> Nx.tensor()
|> then(fn t -> prepend_bias_fn.(t) end)
|> C12.Perceptron.classify(weight)
# Add the labels to the grid dataset
data_with_labels =
Enum.zip_with([grid, Nx.to_flat_list(labels)], fn [[x, y], label] ->
%{x: x, y: y, label: label}
end)
alias VegaLite, as: Vl
Vl.new(width: 600, height: 400)
|> Vl.layers([
# Grid
Vl.new()
|> Vl.data_from_values(data_with_labels)
|> Vl.mark(:point)
|> Vl.encode_field(:x, "x", type: :quantitative)
|> Vl.encode_field(:y, "y", type: :quantitative)
|> Vl.encode(:color, field: "label", scale: %{"range" => ["lightblue", "aquamarine"]}),
# Inputs
Vl.new()
|> Vl.data_from_values(inputs)
|> Vl.mark(:point, filled: true, tooltip: true)
|> Vl.encode_field(:x, "input_a", type: :quantitative)
|> Vl.encode_field(:y, "input_b", type: :quantitative)
|> Vl.encode(:color, field: "label", scale: %{"range" => ["blue", "green"]})
|> Vl.encode(:shape, field: "label", scale: %{"range" => ["square", "triangle-up"]}),
# Threshold line
Vl.new()
|> Vl.data_from_values(data_with_labels)
|> Vl.transform(filter: "datum['label'] == 1")
|> Vl.mark(:line, stroke: "red", stroke_width: 3)
|> Vl.encode_field(:x, "x", type: :quantitative)
|> Vl.encode_field(:y, "y", type: :quantitative, aggregate: :max)
])
|> Vl.resolve(:scale, x: :shared, y: :shared, color: :independent)