Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Chapter 12: How Classifiers Works (1 of 2)

12_classifiers/12_classifiers_01.livemd

Chapter 12: How Classifiers Works (1 of 2)

Mix.install(
  [
    {:exla, "~> 0.5"},
    {:nx, "~> 0.5"},
    {:vega_lite, "~> 0.1.6"},
    {:kino, "~> 0.8.1"},
    {:kino_vega_lite, "~> 0.1.7"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Load the Data

filepath = Path.join(__DIR__, "./linearly_separable.txt") |> Path.expand()

[head | data] =
  filepath
  |> File.read!()
  |> String.split("\r\n", trim: true)

inputs =
  data
  |> Enum.map(&String.split(&1, "\s", trim: true))
  |> Enum.map(fn [input_a, input_b, label] ->
    %{
      "input_a" => String.to_float(input_a),
      "input_b" => String.to_float(input_b),
      "label" => String.to_integer(label)
    }
  end)

Kino.DataTable.new(inputs)
VegaLite.new(width: 600, height: 400)
|> VegaLite.data_from_values(inputs, only: ["input_a", "input_b", "label"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "input_a", type: :quantitative)
|> VegaLite.encode_field(:y, "input_b", type: :quantitative)
|> VegaLite.encode_field(:color, "label", type: :nominal)

Perceptron

Perceptron based on C7.Classifier implementation.

defmodule C12.Perceptron do
  import Nx.Defn

  defn sigmoid(z) do
    Nx.divide(1, Nx.add(1, Nx.exp(Nx.negate(z))))
  end

  defn forward(x, weight) do
    weighted_sum = Nx.dot(x, weight)
    sigmoid(weighted_sum)
  end

  defn classify(x, weight) do
    y_hat = forward(x, weight)
    labels = Nx.argmax(y_hat, axis: 1)
    Nx.reshape(labels, {:auto, 1})
  end

  defn loss(x, y, weight) do
    y_hat = forward(x, weight)
    first_term = y * Nx.log(y_hat)
    second_term = Nx.subtract(1, y) * Nx.log(Nx.subtract(1, y_hat))

    Nx.add(first_term, second_term)
    |> Nx.sum()
    |> Nx.divide(elem(Nx.shape(x), 0))
    |> Nx.negate()
  end

  defn gradient(x, y, weight) do
    predictions = forward(x, weight)
    errors = Nx.subtract(predictions, y)
    n_examples = elem(Nx.shape(x), 0)

    Nx.transpose(x)
    |> Nx.dot(errors)
    |> Nx.divide(n_examples)
  end

  def report(iteration, x_train, y_train, x_test, y_test, weight) do
    matches = matches(x_test, y_test, weight) |> Nx.to_number()
    n_test_examples = elem(Nx.shape(y_test), 0)
    matches = matches * 100.0 / n_test_examples
    training_loss = loss(x_train, y_train, weight) |> Nx.to_number()

    IO.inspect("Iteration #{iteration} => Loss: #{training_loss}, #{matches}%")

    {iteration, training_loss, matches}
  end

  defnp matches(x_test, y_test, weight) do
    classify(x_test, weight)
    |> Nx.equal(y_test)
    |> Nx.sum()
  end

  def train(x_train, y_train, x_test, y_test, iterations, lr) do
    init_weight = init_weight(x_train, y_train)

    final_weight =
      Enum.reduce(0..(iterations - 1), init_weight, fn i, weight ->
        report(i, x_train, y_train, x_test, y_test, weight)
        step(x_train, y_train, weight, lr)
      end)

    report(iterations, x_train, y_train, x_test, y_test, final_weight)

    final_weight
  end

  defnp step(x, y, weight, lr) do
    Nx.subtract(weight, Nx.multiply(gradient(x, y, weight), lr))
  end

  defnp init_weight(x, y) do
    n_input_variables = elem(Nx.shape(x), 1)
    n_classes = elem(Nx.shape(y), 1)
    Nx.broadcast(0, {n_input_variables, n_classes})
  end
end

Train Perceptron

# Prepend the bias function
prepend_bias_fn = fn x ->
  bias = Nx.broadcast(1, {elem(Nx.shape(x), 0), 1})

  # Insert a column of 1s in the position 0 of x.
  # ("axis: 1" stands for: "insert a column, not a row")
  # in python: `np.insert(X, 0, 1, axis=1)`
  Nx.concatenate([bias, x], axis: 1)
end

# hot encode function
one_hot_encode_fn = fn y ->
  Nx.equal(y, Nx.tensor([0, 1]))
end

# Create tensors out of the inputs

# NOTE: the tensor type is float, double-precision because
# with an high number of iterations (> 7000) the loss is too small
# to be represented with single-precision floating points.
x_train =
  x_test =
  inputs
  |> Enum.map(&[&1["input_a"], &1["input_b"]])
  |> Nx.tensor(type: {:f, 64})
  |> then(fn x -> prepend_bias_fn.(x) end)

y_train_unencoded =
  y_test =
  inputs
  |> Enum.map(& &1["label"])
  |> Nx.tensor()
  |> Nx.reshape({:auto, 1})

y_train = one_hot_encode_fn.(y_train_unencoded)

# Train the system

iterations = 10_000
lr = 0.1
weight = C12.Perceptron.train(x_train, y_train, x_test, y_test, iterations, lr)

Plot Decision Boundary

The idea:

  • Generate a grid of points and use the min/max values from the initial dataset to compute the boundaries.
  • Classify each point using the weight computed before with the initial dataset
  • Plot the result highlighting the “decision boundary”
# Get x from the tensor
x =
  x_train
  |> Nx.slice_along_axis(1, 1, axis: 1)

# Get y from the tensor
y =
  x_train
  |> Nx.slice_along_axis(2, 1, axis: 1)

# Compute the grid boundaries 
x_min =
  x
  |> Nx.to_flat_list()
  |> Enum.min()

x_max =
  x
  |> Nx.to_flat_list()
  |> Enum.max()

y_min =
  y
  |> Nx.to_flat_list()
  |> Enum.min()

y_max =
  y
  |> Nx.to_flat_list()
  |> Enum.max()

padding = 0.05

boundaries = %{
  x_min: x_min - abs(x_min * padding),
  x_max: x_max + abs(x_max * padding),
  y_min: y_min - abs(y_min * padding),
  y_max: y_max + abs(y_max * padding)
}
# Define the grid of data that will be classified

resolution = 200
x_step = (boundaries.x_max - boundaries.x_min) / resolution
y_step = (boundaries.y_max - boundaries.y_min) / resolution

grid =
  for i <- 0..(resolution - 1), j <- 0..(resolution - 1) do
    [boundaries.x_min + x_step * i, boundaries.y_min + y_step * j]
  end
# Classification

labels =
  grid
  |> Nx.tensor()
  |> then(fn t -> prepend_bias_fn.(t) end)
  |> C12.Perceptron.classify(weight)

# Add the labels to the grid dataset
data_with_labels =
  Enum.zip_with([grid, Nx.to_flat_list(labels)], fn [[x, y], label] ->
    %{x: x, y: y, label: label}
  end)
alias VegaLite, as: Vl

Vl.new(width: 600, height: 400)
|> Vl.layers([
  # Grid
  Vl.new()
  |> Vl.data_from_values(data_with_labels)
  |> Vl.mark(:point)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["lightblue", "aquamarine"]}),
  # Inputs
  Vl.new()
  |> Vl.data_from_values(inputs)
  |> Vl.mark(:point, filled: true, tooltip: true)
  |> Vl.encode_field(:x, "input_a", type: :quantitative)
  |> Vl.encode_field(:y, "input_b", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["blue", "green"]})
  |> Vl.encode(:shape, field: "label", scale: %{"range" => ["square", "triangle-up"]}),
  # Threshold line
  Vl.new()
  |> Vl.data_from_values(data_with_labels)
  |> Vl.transform(filter: "datum['label'] == 1")
  |> Vl.mark(:line, stroke: "red", stroke_width: 3)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative, aggregate: :max)
])
|> Vl.resolve(:scale, x: :shared, y: :shared, color: :independent)