Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Chapter 12: How Classifiers Work (2 of 2)

12_classifiers/12_classifiers_02.livemd

Chapter 12: How Classifiers Work (2 of 2)

Mix.install(
  [
    {:exla, "~> 0.5"},
    {:nx, "~> 0.5"},
    {:vega_lite, "~> 0.1.6"},
    {:kino, "~> 0.8.1"},
    {:kino_vega_lite, "~> 0.1.7"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Load the Data

filepath = Path.join(__DIR__, "./non_linearly_separable.txt") |> Path.expand()

[head | data] =
  filepath
  |> File.read!()
  |> String.split("\r\n", trim: true)

inputs =
  data
  |> Enum.map(&String.split(&1, "\s", trim: true))
  |> Enum.map(fn [input_a, input_b, label] ->
    %{
      "input_a" => String.to_float(input_a),
      "input_b" => String.to_float(input_b),
      "label" => String.to_integer(label)
    }
  end)

Kino.DataTable.new(inputs)
VegaLite.new(width: 600, height: 400)
|> VegaLite.data_from_values(inputs, only: ["input_a", "input_b", "label"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "input_a", type: :quantitative)
|> VegaLite.encode_field(:y, "input_b", type: :quantitative)
|> VegaLite.encode_field(:color, "label", type: :nominal)

Binary classification with perceptron

Perceptron

Perceptron based on C7.Classifier implementation.

defmodule C12.Perceptron do
  import Nx.Defn

  defn sigmoid(z) do
    Nx.divide(1, Nx.add(1, Nx.exp(Nx.negate(z))))
  end

  defn forward(x, weight) do
    weighted_sum = Nx.dot(x, weight)
    sigmoid(weighted_sum)
  end

  defn classify(x, weight) do
    y_hat = forward(x, weight)
    labels = Nx.argmax(y_hat, axis: 1)
    Nx.reshape(labels, {:auto, 1})
  end

  defn loss(x, y, weight) do
    y_hat = forward(x, weight)
    first_term = y * Nx.log(y_hat)
    second_term = Nx.subtract(1, y) * Nx.log(Nx.subtract(1, y_hat))

    Nx.add(first_term, second_term)
    |> Nx.sum()
    |> Nx.divide(elem(Nx.shape(x), 0))
    |> Nx.negate()
  end

  defn gradient(x, y, weight) do
    predictions = forward(x, weight)
    errors = Nx.subtract(predictions, y)
    n_examples = elem(Nx.shape(x), 0)

    Nx.transpose(x)
    |> Nx.dot(errors)
    |> Nx.divide(n_examples)
  end

  def report(iteration, x_train, y_train, x_test, y_test, weight) do
    matches = matches(x_test, y_test, weight) |> Nx.to_number()
    n_test_examples = elem(Nx.shape(y_test), 0)
    matches = matches * 100.0 / n_test_examples
    training_loss = loss(x_train, y_train, weight) |> Nx.to_number()

    IO.inspect("Iteration #{iteration} => Loss: #{training_loss}, #{matches}%")

    {iteration, training_loss, matches}
  end

  defnp matches(x_test, y_test, weight) do
    classify(x_test, weight)
    |> Nx.equal(y_test)
    |> Nx.sum()
  end

  def train(x_train, y_train, x_test, y_test, iterations, lr) do
    init_weight = init_weight(x_train, y_train)

    final_weight =
      Enum.reduce(0..(iterations - 1), init_weight, fn i, weight ->
        report(i, x_train, y_train, x_test, y_test, weight)
        step(x_train, y_train, weight, lr)
      end)

    report(iterations, x_train, y_train, x_test, y_test, final_weight)

    final_weight
  end

  defnp step(x, y, weight, lr) do
    Nx.subtract(weight, Nx.multiply(gradient(x, y, weight), lr))
  end

  defnp init_weight(x, y) do
    n_input_variables = elem(Nx.shape(x), 1)
    n_classes = elem(Nx.shape(y), 1)
    Nx.broadcast(0, {n_input_variables, n_classes})
  end
end

Classification

# Prepend the bias function
prepend_bias_fn = fn x ->
  bias = Nx.broadcast(1, {elem(Nx.shape(x), 0), 1})

  # Insert a column of 1s in the position 0 of x.
  # ("axis: 1" stands for: "insert a column, not a row")
  # in python: `np.insert(X, 0, 1, axis=1)`
  Nx.concatenate([bias, x], axis: 1)
end

# hot encode function
one_hot_encode_fn = fn y ->
  Nx.equal(y, Nx.tensor([0, 1]))
end

# Create tensors out of the inputs

# NOTE: the tensor type is float, double-precision because
# with an high number of iterations (> 7000) the loss is too small
# to be represented with single-precision floating points.
x_train =
  x_test =
  inputs
  |> Enum.map(&[&1["input_a"], &1["input_b"]])
  |> Nx.tensor(type: {:f, 64})
  |> then(fn x -> prepend_bias_fn.(x) end)

y_train_unencoded =
  y_test =
  inputs
  |> Enum.map(& &1["label"])
  |> Nx.tensor()
  |> Nx.reshape({:auto, 1})

y_train = one_hot_encode_fn.(y_train_unencoded)

# Train the system

iterations = 10_000
lr = 0.1
weight = C12.Perceptron.train(x_train, y_train, x_test, y_test, iterations, lr)

👆 As you can see the accuracy is way lower than the previous dataset.

Plot classified data with perceptron

The idea:

  • Generate a grid of points and use the min/max values from the initial dataset to compute the boundaries.
  • Classify each point using the weight computed before with the initial dataset
  • Plot the result highlighting the “decision boundary”
# Get x from the tensor
x =
  x_train
  |> Nx.slice_along_axis(1, 1, axis: 1)

# Get y from the tensor
y =
  x_train
  |> Nx.slice_along_axis(2, 1, axis: 1)

# Compute the grid boundaries 
x_min =
  x
  |> Nx.to_flat_list()
  |> Enum.min()

x_max =
  x
  |> Nx.to_flat_list()
  |> Enum.max()

y_min =
  y
  |> Nx.to_flat_list()
  |> Enum.min()

y_max =
  y
  |> Nx.to_flat_list()
  |> Enum.max()

padding = 0.05

boundaries = %{
  x_min: x_min - abs(x_min * padding),
  x_max: x_max + abs(x_max * padding),
  y_min: y_min - abs(y_min * padding),
  y_max: y_max + abs(y_max * padding)
}
# Define the grid of data that will be classified

resolution = 200
x_step = (boundaries.x_max - boundaries.x_min) / resolution
y_step = (boundaries.y_max - boundaries.y_min) / resolution

grid =
  for i <- 0..(resolution - 1), j <- 0..(resolution - 1) do
    [boundaries.x_min + x_step * i, boundaries.y_min + y_step * j]
  end
# Classification (weight computed with perceptron)

labels =
  grid
  |> Nx.tensor()
  |> then(fn t -> prepend_bias_fn.(t) end)
  |> C12.Perceptron.classify(weight)

# Add the labels to the grid dataset
data_with_labels =
  Enum.zip_with([grid, Nx.to_flat_list(labels)], fn [[x, y], label] ->
    %{x: x, y: y, label: label}
  end)
alias VegaLite, as: Vl

Vl.new(width: 600, height: 400)
|> Vl.layers([
  # Grid
  Vl.new()
  |> Vl.data_from_values(data_with_labels)
  |> Vl.mark(:point)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["lightblue", "aquamarine"]}),
  # Inputs
  Vl.new()
  |> Vl.data_from_values(inputs)
  |> Vl.mark(:circle, tooltip: true)
  |> Vl.encode_field(:x, "input_a", type: :quantitative)
  |> Vl.encode_field(:y, "input_b", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["blue", "green"]})
  |> Vl.encode(:shape, field: "label", scale: %{"range" => ["square", "triangle-up"]}),
  # Threshold line
  Vl.new()
  |> Vl.data_from_values(data_with_labels)
  |> Vl.transform(filter: "datum['label'] == 1")
  |> Vl.mark(:line, stroke: "red", stroke_width: 3)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative, aggregate: :max)
])
|> Vl.resolve(:scale, x: :shared, y: :shared, color: :independent)

👆 The perceptrion tried to divied zeroes and ones, but a line is not enough with this dataset

Neural Network

Bending the line: classification with Neural Network

The C12.NeuralNetwork is based on the C11.Classifier.

defmodule C12.NeuralNetwork do
  import Nx.Defn

  defn sigmoid(z) do
    Nx.divide(1, Nx.add(1, Nx.exp(Nx.negate(z))))
  end

  defn softmax(logits) do
    exponentials = Nx.exp(logits)

    Nx.divide(
      exponentials,
      Nx.sum(exponentials, axes: [1]) |> Nx.reshape({:auto, 1})
    )
  end

  defn sigmoid_gradient(sigmoid) do
    Nx.multiply(sigmoid, 1 - sigmoid)
  end

  defn loss(y, y_hat) do
    -Nx.sum(y * Nx.log(y_hat)) / elem(Nx.shape(y), 0)
  end

  defn prepend_bias(x) do
    bias = Nx.broadcast(1, {elem(Nx.shape(x), 0), 1})

    Nx.concatenate([bias, x], axis: 1)
  end

  defn forward(x, weight1, weight2) do
    h = sigmoid(Nx.dot(prepend_bias(x), weight1))
    y_hat = softmax(Nx.dot(prepend_bias(h), weight2))

    {y_hat, h}
  end

  defn back(x, y, y_hat, weight2, h) do
    w2_gradient =
      Nx.dot(
        Nx.transpose(prepend_bias(h)),
        Nx.subtract(y_hat, y)
      ) / elem(Nx.shape(x), 0)

    w1_gradient =
      Nx.dot(
        Nx.transpose(prepend_bias(x)),
        Nx.dot(y_hat - y, Nx.transpose(weight2[1..-1//1])) * sigmoid_gradient(h)
      ) / elem(Nx.shape(x), 0)

    {w1_gradient, w2_gradient}
  end

  defn classify(x, weight1, weight2) do
    {y_hat, _h} = forward(x, weight1, weight2)
    labels = Nx.argmax(y_hat, axis: 1)
    Nx.reshape(labels, {:auto, 1})
  end

  defn initialize_weights(opts \\ []) do
    opts = keyword!(opts, [:w1_shape, :w2_shape])
    mean = 0.0
    std_deviation = 0.01

    prng_key = Nx.Random.key(1234)

    {weight1, new_prng_key} =
      Nx.Random.normal(prng_key, mean, std_deviation, shape: opts[:w1_shape])

    {weight2, _new_prng_key} =
      Nx.Random.normal(new_prng_key, mean, std_deviation, shape: opts[:w2_shape])

    {weight1, weight2}
  end

  def report(iteration, x_train, y_train, x_test, y_test, weight1, weight2) do
    {y_hat, _h} = forward(x_train, weight1, weight2)
    training_loss = loss(y_train, y_hat) |> Nx.to_number()
    classifications = classify(x_test, weight1, weight2)
    accuracy = Nx.multiply(Nx.mean(Nx.equal(classifications, y_test)), 100.0) |> Nx.to_number()

    IO.puts("Iteration #{iteration}, Loss: #{training_loss}, Accuracy: #{accuracy}%")
  end

  def train(x_train, y_train, x_test, y_test, n_hidden_nodes, iterations, lr) do
    n_input_variables = elem(Nx.shape(x_train), 1)
    n_classes = elem(Nx.shape(y_train), 1)

    {initial_weight_1, initial_weight_2} =
      initialize_weights(
        w1_shape: {n_input_variables + 1, n_hidden_nodes},
        w2_shape: {n_hidden_nodes + 1, n_classes}
      )

    Enum.reduce(0..(iterations - 1), {initial_weight_1, initial_weight_2}, fn i, {w1, w2} ->
      {updated_w1, updated_w2} = step(x_train, y_train, w1, w2, lr)
      report(i, x_train, y_train, x_test, y_test, updated_w1, updated_w2)
      {updated_w1, updated_w2}
    end)
  end

  defnp step(x_train, y_train, w1, w2, lr) do
    {y_hat, h} = forward(x_train, w1, w2)
    {w1_gradient, w2_gradient} = back(x_train, y_train, y_hat, w2, h)
    w1 = w1 - w1_gradient * lr
    w2 = w2 - w2_gradient * lr

    {w1, w2}
  end
end

Classification

# Prepend the bias function
prepend_bias_fn = fn x ->
  bias = Nx.broadcast(1, {elem(Nx.shape(x), 0), 1})

  # Insert a column of 1s in the position 0 of x.
  # ("axis: 1" stands for: "insert a column, not a row")
  # in python: `np.insert(X, 0, 1, axis=1)`
  Nx.concatenate([bias, x], axis: 1)
end

# hot encode function
one_hot_encode_fn = fn y ->
  Nx.equal(y, Nx.tensor([0, 1]))
end

# Create tensors out of the inputs

# NOTE: for the neural network, there is no need to prepend the bias.
x_train =
  x_test =
  inputs
  |> Enum.map(&amp;[&amp;1["input_a"], &amp;1["input_b"]])
  |> Nx.tensor(type: {:f, 64})

y_train_unencoded =
  y_test =
  inputs
  |> Enum.map(&amp; &amp;1["label"])
  |> Nx.tensor()
  |> Nx.reshape({:auto, 1})

y_train = one_hot_encode_fn.(y_train_unencoded)

# Taken from the book code examples.
hidden_nodes = 10
iterations = 100_000
learning_rate = 0.3

{w1, w2} =
  C12.NeuralNetwork.train(
    x_train,
    y_train,
    x_test,
    y_test,
    hidden_nodes,
    iterations,
    learning_rate
  )

Plot classified data with neural network

Same steps used with the perceptron

The idea:

  • Generate a grid of points and use the min/max values from the initial dataset to compute the boundaries.
  • Classify each point using the weight computed before with the initial dataset
  • Plot the result highlighting the “decision boundary”
# Get x from the tensor (this time `x` is not prepended by the bias column)
x =
  x_train
  |> Nx.slice_along_axis(0, 1, axis: 1)

# Get y from the tensor
y =
  x_train
  |> Nx.slice_along_axis(1, 1, axis: 1)

# Compute the grid boundaries 
x_min =
  x
  |> Nx.to_flat_list()
  |> Enum.min()

x_max =
  x
  |> Nx.to_flat_list()
  |> Enum.max()

y_min =
  y
  |> Nx.to_flat_list()
  |> Enum.min()

y_max =
  y
  |> Nx.to_flat_list()
  |> Enum.max()

padding = 0.05

boundaries = %{
  x_min: x_min - abs(x_min * padding),
  x_max: x_max + abs(x_max * padding),
  y_min: y_min - abs(y_min * padding),
  y_max: y_max + abs(y_max * padding)
}
# Define the grid of data that will be classified

resolution = 200
x_step = (boundaries.x_max - boundaries.x_min) / resolution
y_step = (boundaries.y_max - boundaries.y_min) / resolution

grid =
  for i <- 0..(resolution - 1), j <- 0..(resolution - 1) do
    [boundaries.x_min + x_step * i, boundaries.y_min + y_step * j]
  end
# Classification (weights computed with neural network)

labels =
  grid
  |> Nx.tensor()
  |> C12.NeuralNetwork.classify(w1, w2)

# Add the labels to the grid dataset
data_with_labels =
  Enum.zip_with([grid, Nx.to_flat_list(labels)], fn [[x, y], label] ->
    %{x: x, y: y, label: label}
  end)
# Plot the grid with the labels

Vl.new(width: 600, height: 400)
|> Vl.layers([
  Vl.new()
  |> Vl.data_from_values(data_with_labels)
  |> Vl.mark(:point)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["lightblue", "aquamarine"]}),
  # Inputs
  Vl.new()
  |> Vl.data_from_values(inputs)
  |> Vl.mark(:circle, tooltip: true, opacity: 1.0)
  |> Vl.encode_field(:x, "input_a", type: :quantitative)
  |> Vl.encode_field(:y, "input_b", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["blue", "green"]})
  |> Vl.encode(:shape, field: "label", scale: %{"range" => ["square", "triangle-up"]})
])
|> Vl.resolve(:scale, x: :shared, y: :shared, color: :independent)