Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Chapter 5: A Discerning Machine

05_discerning/classifier.livemd

Chapter 5: A Discerning Machine

Mix.install(
  [
    {:exla, "~> 0.5"},
    {:nx, "~> 0.5"},
    {:vega_lite, "~> 0.1.6"},
    {:kino, "~> 0.8.1"},
    {:kino_vega_lite, "~> 0.1.7"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Invasion of the Sigmoids

$$ \sigma(z) = \cfrac{1}{1 + e^{-z}} $$

alias VegaLite, as: Vl

sigmoid_fn = fn z -> 1 / (1 + :math.exp(-z)) end

# Generate a sequence that will be used as `z`
# From -5 to 5, step 0.1
z = Enum.map(-50..50, &(&1 / 10))

# Compute the sigmoids
sigmoids = Enum.map(z, fn v -> sigmoid_fn.(v) end)

Vl.new(width: 600, height: 400)
|> Vl.data_from_values(z: z, sigmoids: sigmoids)
|> Vl.mark(:line)
|> Vl.encode_field(:x, "z", type: :quantitative)
|> Vl.encode_field(:y, "sigmoids", type: :quantitative, title: "sigmoid(z)")

Classification in Action

defmodule C5.Classifier do
  import Nx.Defn

  @doc """
  A sigmoid function is a mathematical function having
  a characteristic "S"-shaped curve or sigmoid curve.

  A sigmoid function:
  - is monotonic
  - has no local minimums
  - has a non-negative derivative for each point

  More here https://en.wikipedia.org/wiki/Sigmoid_function
  """
  defn sigmoid(z) do
    Nx.divide(1, Nx.add(1, Nx.exp(Nx.negate(z))))
  end

  @doc """
  Return the prediction tensor ŷ given the inputs and weight.
  The returned tensor is a matrix with the same dimensions as
  the weighted sum: one row per example, and one column.
  Each element in the matrix is now constrained between 0 and 1.  
  """
  defn forward(x, weight) do
    weighted_sum = Nx.dot(x, weight)
    sigmoid(weighted_sum)
  end

  @doc """
  Return the prediction rounded to forecast a binary value (0, 1).
  """
  defn classify(x, weight) do
    forward(x, weight)
    |> Nx.round()
  end

  @doc """
  Log loss function.
  """
  defn loss(x, y, weight) do
    # in python:
    # y_hat = forward(X, w)
    # first_term = Y * np.log(y_hat)
    # second_term = (1 - Y) * np.log(1 - y_hat)
    # return -np.average(first_term + second_term)

    y_hat = forward(x, weight)

    # Each label in the matrix `y_hat` is either `0` or `1`.
    # - `first_term` disappears when `y_hat` is 0
    # - `second_term` disappears when `y_hat` is 1
    first_term = y * Nx.log(y_hat)
    second_term = Nx.subtract(1, y) * Nx.log(Nx.subtract(1, y_hat))

    Nx.add(first_term, second_term)
    |> Nx.mean()
    |> Nx.negate()
  end

  @doc """
  Returns the derivative of the loss curve.
  """
  defn gradient(x, y, weight) do
    # in python:
    # np.matmul(X.T, (predict(X, w) - Y)) / X.shape[0]

    predictions = forward(x, weight)
    errors = Nx.subtract(predictions, y)
    n_examples = elem(Nx.shape(x), 0)

    Nx.transpose(x)
    |> Nx.dot(errors)
    |> Nx.divide(n_examples)
  end

  @doc """
  Computes the weight by training the system
  with the given inputs and labels, by iterating
  over the examples the specified number of times.
  """
  def train(x, y, iterations, lr) do
    Enum.reduce(0..(iterations - 1), init_weight(x), fn i, weight ->
      IO.inspect("Iteration #{i} => Loss: #{Nx.to_number(loss(x, y, weight))}")

      step(x, y, weight, lr)
    end)
  end

  defnp step(x, y, weight, lr) do
    Nx.subtract(weight, Nx.multiply(gradient(x, y, weight), lr))
  end

  def test(x, y, weight) do
    total_examples = elem(Nx.shape(x), 0)

    correct_results =
      classify(x, weight)
      |> Nx.equal(y)
      |> Nx.sum()
      |> Nx.to_number()

    # Accuracy of the classifier
    success_percent = Float.round(correct_results * 100 / total_examples, 2)

    IO.puts("Success: #{correct_results}/#{total_examples} (#{success_percent}%)")
  end

  # Given n elements it returns a tensor
  # with this shape {n, 1}, each element
  # initialized to 0
  defnp init_weight(x) do
    Nx.broadcast(Nx.tensor([0]), {elem(Nx.shape(x), 1), 1})
  end
end

Read the data

file =
  __DIR__
  |> Path.join("police.txt")
  |> Path.expand()

# Read the data from the file, remove the header and return
# `[%{reservations: integer(), temperature: integer(), tourists: integer(), police: integer()}]`
data =
  File.read!(file)
  |> String.split("\n", trim: true)
  |> Enum.slice(1..-1)
  |> Enum.map(&String.split(&1, ~r{\s+}, trim: true))
  |> Enum.map(fn [r, temp, tour, p] ->
    %{
      reservations: String.to_integer(r),
      temperature: String.to_integer(temp),
      tourists: String.to_integer(tour),
      police: String.to_integer(p)
    }
  end)

Kino.DataTable.new(data, keys: [:reservations, :temperature, :tourists, :police])

Prepare the data

# Transform the data to unpack the 4 columns `reservations`,
# `temperature`, `tourists` and `police` into separate arrays
# called x1, x2, x3 and y
%{x1: x1, x2: x2, x3: x3, y: y} =
  Enum.reduce(data, %{x1: [], x2: [], x3: [], y: []}, fn item, %{x1: x1, x2: x2, x3: x3, y: y} ->
    %{
      x1: x1 ++ [item.reservations],
      x2: x2 ++ [item.temperature],
      x3: x3 ++ [item.tourists],
      y: y ++ [item.police]
    }
  end)
# bias
x0 = List.duplicate(1, length(x1))

x =
  [x0, x1, x2, x3]
  |> Nx.tensor()
  |> Nx.transpose()

# Same of `y.reshape(-1, 1)` used in the book
y = Nx.tensor(y) |> Nx.reshape({:auto, 1})

Our new model

Plot of the forward() function.

alias VegaLite, as: Vl

reservations_tensor = Nx.tensor([x0, x1]) |> Nx.transpose()

# It can take a bit of time
weight = C5.Classifier.train(reservations_tensor, y, iterations = 1_000_000, lr = 0.01)

predictions = C5.Classifier.forward(reservations_tensor, weight)
rounded_predictions = C5.Classifier.classify(reservations_tensor, weight)

:ok
Vl.new(width: 600, height: 400, title: "Model - forward()")
|> Vl.layers([
  Vl.new()
  |> Vl.data_from_values(
    reservations: x1,
    police_calls: Nx.to_flat_list(y)
  )
  |> Vl.mark(:circle)
  |> Vl.encode_field(:x, "reservations", type: :quantitative, title: "Reservations")
  |> Vl.encode_field(:y, "police_calls", type: :quantitative, title: "Police Calls"),
  Vl.new()
  |> Vl.data_from_values(
    reservations: x1,
    forward: Nx.to_flat_list(predictions)
  )
  |> Vl.mark(:line)
  |> Vl.encode_field(:x, "reservations", type: :quantitative, title: "Reservations")
  |> Vl.encode_field(:y, "forward", type: :quantitative, title: "forward(x, w)")
])
Vl.new(width: 600, height: 400, title: "Predictions based on binary classification - classify()")
|> Vl.layers([
  Vl.new()
  |> Vl.data_from_values(
    reservations: x1,
    police_calls: Nx.to_flat_list(y)
  )
  |> Vl.mark(:circle)
  |> Vl.encode_field(:x, "reservations", type: :quantitative, title: "Reservations")
  |> Vl.encode_field(:y, "police_calls", type: :quantitative, title: "Police Calls"),
  Vl.new()
  |> Vl.data_from_values(
    reservations: x1,
    classify: Nx.to_flat_list(rounded_predictions)
  )
  |> Vl.mark(:line)
  |> Vl.encode_field(:x, "reservations", type: :quantitative, title: "Reservations")
  |> Vl.encode_field(:y, "classify", type: :quantitative, title: "classify(x, w)")
])

Train the system

weight = C5.Classifier.train(x, y, iterations = 10_000, lr = 0.001)

Test the system

The percentage of correctly classified examples is called the accuracy of the classifier.

C5.Classifier.test(x, y, weight)

Kino.nothing()