Hands On: Data from Hell
Mix.install(
[
{:nx, "~> 0.7.1"},
{:exla, "~> 0.7.1"},
{:kino_vega_lite, "~> 0.1.10"}
],
config: [nx: [default_backend: EXLA.Backend]]
)
Plotting the Data
path = __DIR__ |> Path.join("files/circles.txt") |> Path.expand()
dataset =
path
|> File.stream!()
|> Stream.map(&String.split/1)
# Drop header
|> Stream.drop(1)
|> Stream.map(fn [input_a, input_b, label] ->
%{
input_a: String.to_float(input_a),
input_b: String.to_float(input_b),
label: String.to_integer(label)
}
end)
|> Enum.into([])
VegaLite.new(width: 300, height: 300, title: "Linearly Separable Datasets")
|> VegaLite.data_from_values(dataset, only: ["input_a", "input_b", "label"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "input_a", type: :quantitative)
|> VegaLite.encode_field(:y, "input_b", type: :quantitative)
|> VegaLite.encode_field(:color, "label", type: :nominal)
defmodule C12.DataPrep do
@doc """
Inserts a column of 1's into position 0 of tensor X along the the x-axis
"""
def prepend_bias(x) do
{row, _col} = Nx.shape(x)
bias = Nx.broadcast(Nx.tensor(1), {row, 1})
Nx.concatenate([bias, x], axis: 1)
end
@doc "Flip hot values to 1"
def one_hot_encode(y) do
Nx.equal(y, Nx.tensor([0, 1]))
end
end
import C12.DataPrep
# Return 2 axis
# Use f64 as type so we can represent losses that are too small to be represented with f32
input_a = dataset |> Enum.map(& &1.input_a) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})
input_b = dataset |> Enum.map(& &1.input_b) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})
labels = dataset |> Enum.map(& &1.label) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})
x_train = x_test = prepend_bias(Nx.concatenate([input_a, input_b], axis: 1))
y_train_unencoded = y_test = labels
y_train = one_hot_encode(y_train_unencoded)
x_train
defmodule C12.Perceptron do
import Nx.Defn
defn sigmoid(z) do
1 / (1 + Nx.exp(-z))
end
def forward(x, w) do
weighted_sum = Nx.dot(x, w)
sigmoid(weighted_sum)
end
def classify(x, w) do
x
|> forward(w)
|> Nx.argmax(axis: 1)
|> Nx.reshape({:auto, 1})
end
def loss(x, y, w) do
y_hat = forward(x, w)
first_term = Nx.multiply(y, Nx.log(y_hat))
second_term = Nx.multiply(Nx.subtract(1, y), Nx.log(Nx.subtract(1, y_hat)))
first_term
|> Nx.add(second_term)
|> Nx.sum()
|> Nx.divide(elem(Nx.shape(x), 0))
|> Nx.negate()
end
def gradient(x, y, w) do
{num_samples, _} = Nx.shape(x)
x
|> forward(w)
|> Nx.subtract(y)
|> then(&Nx.dot(Nx.transpose(x), &1))
|> Nx.divide(num_samples)
end
def report(iteration, x_train, y_train, x_test, y_test, w) do
matches =
x_test
|> classify(w)
|> Nx.equal(y_test)
|> Nx.sum()
|> Nx.to_number()
n_test_examples = Nx.shape(y_test) |> elem(0)
matches = matches * 100.0 / n_test_examples
training_loss = loss(x_train, y_train, w)
IO.puts("#{iteration} - Loss: #{Nx.to_number(training_loss)}, #{matches}%")
end
def train(x_train, y_train, x_test, y_test, iterations, lr) do
{_, x_cols} = Nx.shape(x_train)
{_, y_cols} = Nx.shape(y_train)
w = Nx.broadcast(0, {x_cols, y_cols})
Enum.reduce(0..iterations, w, fn i, w ->
report(i, x_train, y_train, x_test, y_test, w)
gradient = gradient(x_train, y_train, w)
Nx.subtract(w, Nx.multiply(gradient, lr))
end)
end
end
weight = C12.Perceptron.train(x_train, y_train, x_test, y_test, iterations = 10_000, lr = 0.1)
Decision Boundary
defmodule C12.DecisionBoundary do
def build_grid_dataset(x, y, classify_fn) do
# Compute the grid boundaries
x_min = Nx.reduce_min(x) |> Nx.to_number()
x_max = Nx.reduce_max(x) |> Nx.to_number()
y_min = Nx.reduce_min(x) |> Nx.to_number()
y_max = Nx.reduce_max(y) |> Nx.to_number()
# Define the grid of data that will be classified
resolution = 200
x_step = (x_max - x_min) / resolution
y_step = (y_max - y_min) / resolution
grid =
for i <- 0..resolution, j <- 0..resolution do
[x_min + x_step * i, y_min + y_step * j]
end
# Classification
labels =
grid
|> Nx.tensor()
|> classify_fn.()
# Add the labels to the grid dataset
Enum.zip_with([grid, Nx.to_flat_list(labels)], fn [[x, y], label] ->
%{x: x, y: y, label: label}
end)
end
end
Plotting the Perceptron Boundary
# Get input A
x = x_train[[0..-1//1, 1..1]]
# Get input B
y = x_train[[0..-1//1, 2..2]]
data_with_labels =
C12.DecisionBoundary.build_grid_dataset(x, y, fn grid ->
grid
|> C12.DataPrep.prepend_bias()
|> C12.Perceptron.classify(weight)
end)
alias VegaLite, as: Vl
Vl.new(width: 600, height: 400)
|> Vl.layers([
# Grid
Vl.new()
|> Vl.data_from_values(data_with_labels)
|> Vl.mark(:point)
|> Vl.encode_field(:x, "x", type: :quantitative)
|> Vl.encode_field(:y, "y", type: :quantitative)
|> Vl.encode(:color, field: "label", scale: %{"range" => ["lightblue", "aquamarine"]}),
# Inputs
Vl.new()
|> Vl.data_from_values(dataset)
|> Vl.mark(:point, filled: true, tooltip: true)
|> Vl.encode_field(:x, "input_a", type: :quantitative)
|> Vl.encode_field(:y, "input_b", type: :quantitative)
|> Vl.encode(:color, field: "label", scale: %{"range" => ["blue", "green"]})
|> Vl.encode(:shape, field: "label", scale: %{"range" => ["square", "triangle-up"]}),
# Threshold line
Vl.new()
|> Vl.data_from_values(data_with_labels)
|> Vl.transform(filter: "datum['label'] == 1")
|> Vl.mark(:line, stroke: "red", stroke_width: 3)
|> Vl.encode_field(:x, "x", type: :quantitative)
|> Vl.encode_field(:y, "y", type: :quantitative, aggregate: :max)
])
|> Vl.resolve(:scale, x: :shared, y: :shared, color: :independent)
What does the perceptron’s boundary look like?
The decision boundary of the perceptron cuts the circle formed by the data points into half. This perfectly describes the 49% accuracy produced by the model because half of the points are misclassified as half of the data points are seen on the area they don’t belong.
Plotting the Neural Network Boundary
defmodule C11.NeuralNetwork do
import Nx.Defn
@doc """
For w2_gradient:
- Swap the operands of the multiplication then transpose one of them to get a result with the
same dimension as w2.
- We also need to do a prepend_bias/1 since we did it with forward/3, we need to do the same
during backprop.
- The matrix multiplication needs to be divide by the rows of x (number of examples) to get
the average gradient
For w1_gradient:
- Use h as is, without a bias column since it has no effect on dL/dw1
- Since we ignored the first column of h, we also have to ignore the first row of w1 to match
columns by rows in matrix multiplication
- `sigmoid_gradient/1` calculates sigmoid's gradient from sigmoid's output `h`
- Lastly multiply x to the previous intermediate result with same rules when we calculate
w2_gradient
"""
def back(x, y, y_hat, w2, h) do
{num_samples, _} = Nx.shape(x)
w2_gradient =
h
|> prepend_bias()
|> Nx.transpose()
|> Nx.dot(Nx.subtract(y_hat, y))
|> Nx.divide(num_samples)
w1_gradient =
x
|> prepend_bias()
|> Nx.transpose()
|> Nx.dot(
y_hat
|> Nx.subtract(y)
|> Nx.dot(Nx.transpose(w2[1..-1//1]))
|> Nx.multiply(sigmoid_gradient(h))
)
|> Nx.divide(num_samples)
{w1_gradient, w2_gradient}
end
def initialize_weights(n_input_vars, n_hidden_nodes, n_classes) do
key = Nx.Random.key(1234)
mean = 0.0
standard_deviation = 0.01
w1_rows = n_input_vars + 1
{normal, new_key} =
Nx.Random.normal(key, mean, standard_deviation, shape: {w1_rows, n_hidden_nodes})
w1 = Nx.multiply(normal, Nx.sqrt(1 / w1_rows))
w2_rows = n_hidden_nodes + 1
{normal, _new_key} =
Nx.Random.normal(new_key, mean, standard_deviation, shape: {w2_rows, n_classes})
w2 = Nx.multiply(normal, Nx.sqrt(1 / w2_rows))
{w1, w2}
end
def train(x_train, y_train, x_test, y_test, n_hidden_nodes, iterations, lr) do
{_, n_input_variables} = Nx.shape(x_train)
{_, n_classes} = Nx.shape(y_train)
{w1, w2} = initialize_weights(n_input_variables, n_hidden_nodes, n_classes)
Enum.reduce(1..iterations, {w1, w2}, fn iteration, {w1, w2} ->
{y_hat, h} = forward(x_train, w1, w2)
{w1_gradient, w2_gradient} = back(x_train, y_train, y_hat, w2, h)
w1 = Nx.subtract(w1, Nx.multiply(w1_gradient, lr))
w2 = Nx.subtract(w2, Nx.multiply(w2_gradient, lr))
report(iteration, x_train, y_train, x_test, y_test, w1, w2)
{w1, w2}
end)
end
# Functions from `Ch 10: Building the Network` below
defn sigmoid(z) do
1 / (1 + Nx.exp(-z))
end
def softmax(logits) do
exponentials = Nx.exp(logits)
sum_of_exponentials_by_row =
exponentials
|> Nx.sum(axes: [1])
|> Nx.reshape({:auto, 1})
Nx.divide(exponentials, sum_of_exponentials_by_row)
end
def sigmoid_gradient(sigmoid) do
Nx.multiply(sigmoid, Nx.subtract(1, sigmoid))
end
@doc """
Cross-entropy loss
Loss formula specific for multiclass classifiers.
Measures the distance between the classifier's predictions and the labels.
Lower loss means better classifier
"""
def loss(y_train, y_hat) do
{rows, _} = Nx.shape(y_train)
y_train
|> Nx.multiply(Nx.log(y_hat))
|> Nx.sum()
|> Nx.multiply(-1)
|> Nx.divide(rows)
end
@doc """
Implements the operation called "forward propagation"
Steps:
1. We add a bias to the inputs
2. Compute the weighted sum using the 1st matrix of weights, w1
3. Pass the result to the activation function (sigmoid or softmax)
4. Repeat for all layers
"""
def forward(x, w1, w2) do
# Hidden layer
h =
x
|> prepend_bias()
|> then(&Nx.dot(&1, w1))
|> sigmoid()
# Output layer
y_hat =
h
|> prepend_bias()
|> then(&Nx.dot(&1, w2))
|> softmax()
{y_hat, h}
end
@doc """
Same classify/2 function from Ch 7 but modified for neutral network
"""
def classify(x, w1, w2) do
x
|> forward(w1, w2)
|> elem(0)
|> Nx.argmax(axis: 1)
|> Nx.reshape({:auto, 1})
end
def report(iteration, x_train, y_train, x_test, y_test, w1, w2) do
{y_hat, _} = forward(x_train, w1, w2)
training_loss = loss(y_train, y_hat)
classifications = classify(x_test, w1, w2)
# y_test is not one-hot encoded
# Measure how many classifications were gotten correctly by comparing
# with y_test. The mean/1 function essentially will get the the sum of 1's (matches)
# divided by the total number of classifications
accuracy =
classifications
|> Nx.equal(y_test)
|> Nx.mean()
|> Nx.multiply(100.0)
IO.puts(
"Iteration: #{iteration}, Loss: #{Nx.to_number(training_loss)}, Accuracy: #{Nx.to_number(accuracy)}"
)
end
end
# No prepend bias on input data as the network's code take care of that
x_train = x_test = Nx.concatenate([input_a, input_b], axis: 1)
{weight_1, weight_2} =
C11.NeuralNetwork.train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes = 10,
iterations = 100_000,
lr = 0.3
)
# Get input A
x = x_train[[0..-1//1, 0..0]]
# Get input B
y = x_train[[0..-1//1, 1..1]]
data_with_labels =
C12.DecisionBoundary.build_grid_dataset(x, y, fn grid ->
grid
|> C11.NeuralNetwork.classify(weight_1, weight_2)
end)
alias VegaLite, as: Vl
# Plot the grid with the labels
Vl.new(width: 600, height: 400)
|> Vl.layers([
# Grid
Vl.new()
|> Vl.data_from_values(data_with_labels)
|> Vl.mark(:point)
|> Vl.encode_field(:x, "x", type: :quantitative)
|> Vl.encode_field(:y, "y", type: :quantitative)
|> Vl.encode(:color, field: "label", scale: %{"range" => ["lightblue", "aquamarine"]}),
# Inputs
Vl.new()
|> Vl.data_from_values(dataset)
|> Vl.mark(:point, filled: true, tooltip: true)
|> Vl.encode_field(:x, "input_a", type: :quantitative)
|> Vl.encode_field(:y, "input_b", type: :quantitative)
|> Vl.encode(:color, field: "label", scale: %{"range" => ["blue", "green"]})
|> Vl.encode(:shape, field: "label", scale: %{"range" => ["square", "triangle-up"]})
])
|> Vl.resolve(:scale, x: :shared, y: :shared, color: :independent)
How does the neural network fare on these data?
Neural network’s decision boundary almost perfectly encloses the green triangles on the green area while mostly all blue squares are in the blue area outside the decision boundary. There are few blue squares in the green area which was evident with the 98% accuracy of the model.