Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Chapter 12: How Classifiers Work

12_how_classifiers_work.livemd

Chapter 12: How Classifiers Work

Mix.install(
  [
    {:nx, "~> 0.7.1"},
    {:exla, "~> 0.7.1"},
    {:kino_vega_lite, "~> 0.1.10"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Perceptron on Linearly Separable Data

path = __DIR__ |> Path.join("files/linearly_separable.txt") |> Path.expand()

dataset =
  path
  |> File.stream!()
  |> Stream.map(&String.split/1)
  # Drop header
  |> Stream.drop(1)
  |> Stream.map(fn [input_a, input_b, label] ->
    %{
      input_a: String.to_float(input_a),
      input_b: String.to_float(input_b),
      label: String.to_integer(label)
    }
  end)
  |> Enum.into([])
VegaLite.new(width: 300, height: 300, title: "Linearly Separable Datasets")
|> VegaLite.data_from_values(dataset, only: ["input_a", "input_b", "label"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "input_a", type: :quantitative)
|> VegaLite.encode_field(:y, "input_b", type: :quantitative)
|> VegaLite.encode_field(:color, "label", type: :nominal)
defmodule C12.DataPrep do
  @doc """
  Inserts a column of 1's into position 0 of tensor X along the the x-axis
  """
  def prepend_bias(x) do
    {row, _col} = Nx.shape(x)
    bias = Nx.broadcast(Nx.tensor(1), {row, 1})
    Nx.concatenate([bias, x], axis: 1)
  end

  @doc "Flip hot values to 1"
  def one_hot_encode(y) do
    Nx.equal(y, Nx.tensor([0, 1]))
  end
end
import C12.DataPrep

# Return 2 axis
# Use f64 as type so we can represent losses that are too small to be represented with f32
input_a = dataset |> Enum.map(& &1.input_a) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})
input_b = dataset |> Enum.map(& &1.input_b) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})
labels = dataset |> Enum.map(& &1.label) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})

x_train = x_test = prepend_bias(Nx.concatenate([input_a, input_b], axis: 1))
y_train_unencoded = y_test = labels
y_train = one_hot_encode(y_train_unencoded)
x_train
defmodule C12.Perceptron do
  import Nx.Defn

  defn sigmoid(z) do
    1 / (1 + Nx.exp(-z))
  end

  def forward(x, w) do
    weighted_sum = Nx.dot(x, w)
    sigmoid(weighted_sum)
  end

  def classify(x, w) do
    x
    |> forward(w)
    |> Nx.argmax(axis: 1)
    |> Nx.reshape({:auto, 1})
  end

  def loss(x, y, w) do
    y_hat = forward(x, w)
    first_term = Nx.multiply(y, Nx.log(y_hat))
    second_term = Nx.multiply(Nx.subtract(1, y), Nx.log(Nx.subtract(1, y_hat)))

    first_term
    |> Nx.add(second_term)
    |> Nx.sum()
    |> Nx.divide(elem(Nx.shape(x), 0))
    |> Nx.negate()
  end

  def gradient(x, y, w) do
    {num_samples, _} = Nx.shape(x)

    x
    |> forward(w)
    |> Nx.subtract(y)
    |> then(&Nx.dot(Nx.transpose(x), &1))
    |> Nx.divide(num_samples)
  end

  def report(iteration, x_train, y_train, x_test, y_test, w) do
    matches =
      x_test
      |> classify(w)
      |> Nx.equal(y_test)
      |> Nx.sum()
      |> Nx.to_number()

    n_test_examples = Nx.shape(y_test) |> elem(0)
    matches = matches * 100.0 / n_test_examples
    training_loss = loss(x_train, y_train, w)

    IO.puts("#{iteration} - Loss: #{Nx.to_number(training_loss)}, #{matches}%")
  end

  def train(x_train, y_train, x_test, y_test, iterations, lr) do
    {_, x_cols} = Nx.shape(x_train)
    {_, y_cols} = Nx.shape(y_train)
    w = Nx.broadcast(0, {x_cols, y_cols})

    Enum.reduce(0..iterations, w, fn i, w ->
      report(i, x_train, y_train, x_test, y_test, w)
      gradient = gradient(x_train, y_train, w)
      Nx.subtract(w, Nx.multiply(gradient, lr))
    end)
  end
end
weight = C12.Perceptron.train(x_train, y_train, x_test, y_test, iterations = 10_000, lr = 0.1)

Here’s what we get:
0 - Loss: 1.3862943611198901, 50.0%
1 - Loss: 1.2927451108167307, 100.0%

10000 - Loss: 0.00702079210910519, 100.0%

With just one iteration, we get a 100% accuracy

Decision Boundary

Decision boundary is the invisible line that separates two classes

  • During training the classifier finds the boundary
  • During classification it uses the boundary to classify whether data point is blue or orange (as per the graph)

Credits to https://github.com/nickgnd/programming-machine-learning-livebooks for the idea on how to plot decision boundary. As mentioned:

> The idea: > > Generate a grid of points and use the min/max values from the inital dataset to compute the boundaries. > Classify each point using the weight computed before with the initial dataset > * Plot the result highlighting the “decision boundary”

defmodule C12.DecisionBoundary do
  def build_grid_dataset(x, y, classify_fn) do
    # Compute the grid boundaries
    x_min = Nx.reduce_min(x) |> Nx.to_number()
    x_max = Nx.reduce_max(x) |> Nx.to_number()
    y_min = Nx.reduce_min(x) |> Nx.to_number()
    y_max = Nx.reduce_max(y) |> Nx.to_number()

    # Define the grid of data that will be classified
    resolution = 200
    x_step = (x_max - x_min) / resolution
    y_step = (y_max - y_min) / resolution

    grid =
      for i <- 0..resolution, j <- 0..resolution do
        [x_min + x_step * i, y_min + y_step * j]
      end

    # Classification
    labels =
      grid
      |> Nx.tensor()
      |> classify_fn.()

    # Add the labels to the grid dataset
    Enum.zip_with([grid, Nx.to_flat_list(labels)], fn [[x, y], label] ->
      %{x: x, y: y, label: label}
    end)
  end
end

Plotting the Perceptron Decision Boundary

# Get input A
x = x_train[[0..-1//1, 1..1]]
# Get input B
y = x_train[[0..-1//1, 2..2]]

data_with_labels =
  C12.DecisionBoundary.build_grid_dataset(x, y, fn grid ->
    grid
    |> C12.DataPrep.prepend_bias()
    |> C12.Perceptron.classify(weight)
  end)
alias VegaLite, as: Vl

Vl.new(width: 600, height: 400)
|> Vl.layers([
  # Grid
  Vl.new()
  |> Vl.data_from_values(data_with_labels)
  |> Vl.mark(:point)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["lightblue", "aquamarine"]}),
  # Inputs
  Vl.new()
  |> Vl.data_from_values(dataset)
  |> Vl.mark(:point, filled: true, tooltip: true)
  |> Vl.encode_field(:x, "input_a", type: :quantitative)
  |> Vl.encode_field(:y, "input_b", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["blue", "green"]})
  |> Vl.encode(:shape, field: "label", scale: %{"range" => ["square", "triangle-up"]}),
  # Threshold line
  Vl.new()
  |> Vl.data_from_values(data_with_labels)
  |> Vl.transform(filter: "datum['label'] == 1")
  |> Vl.mark(:line, stroke: "red", stroke_width: 3)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative, aggregate: :max)
])
|> Vl.resolve(:scale, x: :shared, y: :shared, color: :independent)

As we can see in the diagram, the decision boundary perfectly separates green triangles and blue squares classes. Both classes are in their respective area thus the 100% accuracy.

No matter how much you change the perceptron’s weight, the decision boundary is always a straight line (for 2D or a higher dimensional equivalent of a line for higher dimensions). Perceptron don’t do curves.

Perceptron on Non Linearly Separable Data

path = __DIR__ |> Path.join("files/non_linearly_separable.txt") |> Path.expand()

dataset =
  path
  |> File.stream!()
  |> Stream.map(&amp;String.split/1)
  # Drop header
  |> Stream.drop(1)
  |> Stream.map(fn [input_a, input_b, label] ->
    %{
      input_a: String.to_float(input_a),
      input_b: String.to_float(input_b),
      label: String.to_integer(label)
    }
  end)
  |> Enum.into([])
VegaLite.new(width: 300, height: 300, title: "Non Linearly Separable Datasets")
|> VegaLite.data_from_values(dataset, only: ["input_a", "input_b", "label"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "input_a", type: :quantitative)
|> VegaLite.encode_field(:y, "input_b", type: :quantitative)
|> VegaLite.encode_field(:color, "label", type: :nominal)
import C12.DataPrep

# Return 2 axis
# Use f64 as type so we can represent losses that are too small to be represented with f32
input_a = dataset |> Enum.map(&amp; &amp;1.input_a) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})
input_b = dataset |> Enum.map(&amp; &amp;1.input_b) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})
labels = dataset |> Enum.map(&amp; &amp;1.label) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})

x_train = x_test = prepend_bias(Nx.concatenate([input_a, input_b], axis: 1))
y_train_unencoded = y_test = labels
y_train = one_hot_encode(y_train_unencoded)
x_train
C12.Perceptron.train(x_train, y_train, x_test, y_test, iterations = 10_000, lr = 0.1)

Here’s what we get:
0 - Loss: 1.3862943611198901, 36.0%
1 - Loss: 1.3809206533861333, 64.0%

10000 - Loss: 1.0974531594775727, 73.33333333333333%

Even with 10,000 iterations, we only achieve 73.33% accuracy. As we can see perceptron struggles with non linearly separable data

Neural Network from Chapter 11

defmodule C11.NeuralNetwork do
  import Nx.Defn

  @doc """
  For w2_gradient:
  - Swap the operands of the multiplication then transpose one of them to get a result with the
  same dimension as w2.
  - We also need to do a prepend_bias/1 since we did it with forward/3, we need to do the same
  during backprop.
  - The matrix multiplication needs to be divide by the rows of x (number of examples) to get
  the average gradient

  For w1_gradient:
  - Use h as is, without a bias column since it has no effect on dL/dw1
  - Since we ignored the first column of h, we also have to ignore the first row of w1 to match
  columns by rows in matrix multiplication
  - `sigmoid_gradient/1` calculates sigmoid's gradient from sigmoid's output `h`
  - Lastly multiply x to the previous intermediate result with same rules when we calculate
  w2_gradient
  """
  def back(x, y, y_hat, w2, h) do
    {num_samples, _} = Nx.shape(x)

    w2_gradient =
      h
      |> prepend_bias()
      |> Nx.transpose()
      |> Nx.dot(Nx.subtract(y_hat, y))
      |> Nx.divide(num_samples)

    w1_gradient =
      x
      |> prepend_bias()
      |> Nx.transpose()
      |> Nx.dot(
        y_hat
        |> Nx.subtract(y)
        |> Nx.dot(Nx.transpose(w2[1..-1//1]))
        |> Nx.multiply(sigmoid_gradient(h))
      )
      |> Nx.divide(num_samples)

    {w1_gradient, w2_gradient}
  end

  def initialize_weights(n_input_vars, n_hidden_nodes, n_classes) do
    key = Nx.Random.key(1234)
    mean = 0.0
    standard_deviation = 0.01

    w1_rows = n_input_vars + 1

    {normal, new_key} =
      Nx.Random.normal(key, mean, standard_deviation, shape: {w1_rows, n_hidden_nodes})

    w1 = Nx.multiply(normal, Nx.sqrt(1 / w1_rows))

    w2_rows = n_hidden_nodes + 1

    {normal, _new_key} =
      Nx.Random.normal(new_key, mean, standard_deviation, shape: {w2_rows, n_classes})

    w2 = Nx.multiply(normal, Nx.sqrt(1 / w2_rows))

    {w1, w2}
  end

  def train(x_train, y_train, x_test, y_test, n_hidden_nodes, iterations, lr) do
    {_, n_input_variables} = Nx.shape(x_train)
    {_, n_classes} = Nx.shape(y_train)
    {w1, w2} = initialize_weights(n_input_variables, n_hidden_nodes, n_classes)

    Enum.reduce(1..iterations, {w1, w2}, fn iteration, {w1, w2} ->
      {y_hat, h} = forward(x_train, w1, w2)
      {w1_gradient, w2_gradient} = back(x_train, y_train, y_hat, w2, h)
      w1 = Nx.subtract(w1, Nx.multiply(w1_gradient, lr))
      w2 = Nx.subtract(w2, Nx.multiply(w2_gradient, lr))
      report(iteration, x_train, y_train, x_test, y_test, w1, w2)
      {w1, w2}
    end)
  end

  # Functions from `Ch 10: Building the Network` below

  defn sigmoid(z) do
    1 / (1 + Nx.exp(-z))
  end

  def softmax(logits) do
    exponentials = Nx.exp(logits)

    sum_of_exponentials_by_row =
      exponentials
      |> Nx.sum(axes: [1])
      |> Nx.reshape({:auto, 1})

    Nx.divide(exponentials, sum_of_exponentials_by_row)
  end

  def sigmoid_gradient(sigmoid) do
    Nx.multiply(sigmoid, Nx.subtract(1, sigmoid))
  end

  @doc """
  Cross-entropy loss
  Loss formula specific for multiclass classifiers.
  Measures the distance between the classifier's predictions and the labels.
  Lower loss means better classifier
  """
  def loss(y_train, y_hat) do
    {rows, _} = Nx.shape(y_train)

    y_train
    |> Nx.multiply(Nx.log(y_hat))
    |> Nx.sum()
    |> Nx.multiply(-1)
    |> Nx.divide(rows)
  end

  @doc """
  Implements the operation called "forward propagation"
  Steps:
  1. We add a bias to the inputs
  2. Compute the weighted sum using the 1st matrix of weights, w1
  3. Pass the result to the activation function (sigmoid or softmax)
  4. Repeat for all layers
  """
  def forward(x, w1, w2) do
    # Hidden layer
    h =
      x
      |> prepend_bias()
      |> then(&amp;Nx.dot(&amp;1, w1))
      |> sigmoid()

    # Output layer
    y_hat =
      h
      |> prepend_bias()
      |> then(&amp;Nx.dot(&amp;1, w2))
      |> softmax()

    {y_hat, h}
  end

  @doc """
  Same classify/2 function from Ch 7 but modified for neutral network
  """
  def classify(x, w1, w2) do
    x
    |> forward(w1, w2)
    |> elem(0)
    |> Nx.argmax(axis: 1)
    |> Nx.reshape({:auto, 1})
  end

  def report(iteration, x_train, y_train, x_test, y_test, w1, w2) do
    {y_hat, _} = forward(x_train, w1, w2)
    training_loss = loss(y_train, y_hat)
    classifications = classify(x_test, w1, w2)
    # y_test is not one-hot encoded

    # Measure how many classifications were gotten correctly by comparing
    # with y_test. The mean/1 function essentially will get the the sum of 1's (matches)
    # divided by the total number of classifications
    accuracy =
      classifications
      |> Nx.equal(y_test)
      |> Nx.mean()
      |> Nx.multiply(100.0)

    IO.puts(
      "Iteration: #{iteration}, Loss: #{Nx.to_number(training_loss)}, Accuracy: #{Nx.to_number(accuracy)}"
    )
  end
end

Neural Network on Non Linearly Separable Data

# No prepend bias on input data as the network's code take care of that
x_train = x_test = Nx.concatenate([input_a, input_b], axis: 1)

{weight_1, weight_2} =
  C11.NeuralNetwork.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes = 10,
    iterations = 100_000,
    lr = 0.3
  )

Here we get:
Iteration: 1, Loss: 0.6627994495760408, Accuracy: 64.0%
Iteration: 2, Loss: 0.655730658562772, Accuracy: 64.0%

Iteration: 100000, Loss: 0.05194653477044733, Accuracy: 98.33%

The reason for this high accuracy is the ability of neural network to bend its decision boundary because of its multiple layers

Plotting the Neural Network Decision Boundary

# Get input A
x = x_train[[0..-1//1, 0..0]]
# Get input B
y = x_train[[0..-1//1, 1..1]]

data_with_labels =
  C12.DecisionBoundary.build_grid_dataset(x, y, fn grid ->
    grid
    |> C11.NeuralNetwork.classify(weight_1, weight_2)
  end)
alias VegaLite, as: Vl

# Plot the grid with the labels

Vl.new(width: 600, height: 400)
|> Vl.layers([
  # Grid
  Vl.new()
  |> Vl.data_from_values(data_with_labels)
  |> Vl.mark(:point)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["lightblue", "aquamarine"]}),
  # Inputs
  Vl.new()
  |> Vl.data_from_values(dataset)
  |> Vl.mark(:point, filled: true, tooltip: true)
  |> Vl.encode_field(:x, "input_a", type: :quantitative)
  |> Vl.encode_field(:y, "input_b", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["blue", "green"]})
  |> Vl.encode(:shape, field: "label", scale: %{"range" => ["square", "triangle-up"]})
])
|> Vl.resolve(:scale, x: :shared, y: :shared, color: :independent)

As seen on the diagram, neural network can bend its model & decision boundary while perceptrons can’t. Though there are some misclassified points like the three (3) blue squares in the green area hence an accuracy of less than 100%.