Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Hands On: Data from Hell

data_from_hell.livemd

Hands On: Data from Hell

Mix.install(
  [
    {:nx, "~> 0.7.1"},
    {:exla, "~> 0.7.1"},
    {:kino_vega_lite, "~> 0.1.10"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Plotting the Data

path = __DIR__ |> Path.join("files/circles.txt") |> Path.expand()

dataset =
  path
  |> File.stream!()
  |> Stream.map(&String.split/1)
  # Drop header
  |> Stream.drop(1)
  |> Stream.map(fn [input_a, input_b, label] ->
    %{
      input_a: String.to_float(input_a),
      input_b: String.to_float(input_b),
      label: String.to_integer(label)
    }
  end)
  |> Enum.into([])
VegaLite.new(width: 300, height: 300, title: "Linearly Separable Datasets")
|> VegaLite.data_from_values(dataset, only: ["input_a", "input_b", "label"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "input_a", type: :quantitative)
|> VegaLite.encode_field(:y, "input_b", type: :quantitative)
|> VegaLite.encode_field(:color, "label", type: :nominal)
defmodule C12.DataPrep do
  @doc """
  Inserts a column of 1's into position 0 of tensor X along the the x-axis
  """
  def prepend_bias(x) do
    {row, _col} = Nx.shape(x)
    bias = Nx.broadcast(Nx.tensor(1), {row, 1})
    Nx.concatenate([bias, x], axis: 1)
  end

  @doc "Flip hot values to 1"
  def one_hot_encode(y) do
    Nx.equal(y, Nx.tensor([0, 1]))
  end
end
import C12.DataPrep

# Return 2 axis
# Use f64 as type so we can represent losses that are too small to be represented with f32
input_a = dataset |> Enum.map(& &1.input_a) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})
input_b = dataset |> Enum.map(& &1.input_b) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})
labels = dataset |> Enum.map(& &1.label) |> Nx.tensor(type: :f64) |> Nx.reshape({:auto, 1})

x_train = x_test = prepend_bias(Nx.concatenate([input_a, input_b], axis: 1))
y_train_unencoded = y_test = labels
y_train = one_hot_encode(y_train_unencoded)
x_train
defmodule C12.Perceptron do
  import Nx.Defn

  defn sigmoid(z) do
    1 / (1 + Nx.exp(-z))
  end

  def forward(x, w) do
    weighted_sum = Nx.dot(x, w)
    sigmoid(weighted_sum)
  end

  def classify(x, w) do
    x
    |> forward(w)
    |> Nx.argmax(axis: 1)
    |> Nx.reshape({:auto, 1})
  end

  def loss(x, y, w) do
    y_hat = forward(x, w)
    first_term = Nx.multiply(y, Nx.log(y_hat))
    second_term = Nx.multiply(Nx.subtract(1, y), Nx.log(Nx.subtract(1, y_hat)))

    first_term
    |> Nx.add(second_term)
    |> Nx.sum()
    |> Nx.divide(elem(Nx.shape(x), 0))
    |> Nx.negate()
  end

  def gradient(x, y, w) do
    {num_samples, _} = Nx.shape(x)

    x
    |> forward(w)
    |> Nx.subtract(y)
    |> then(&Nx.dot(Nx.transpose(x), &1))
    |> Nx.divide(num_samples)
  end

  def report(iteration, x_train, y_train, x_test, y_test, w) do
    matches =
      x_test
      |> classify(w)
      |> Nx.equal(y_test)
      |> Nx.sum()
      |> Nx.to_number()

    n_test_examples = Nx.shape(y_test) |> elem(0)
    matches = matches * 100.0 / n_test_examples
    training_loss = loss(x_train, y_train, w)

    IO.puts("#{iteration} - Loss: #{Nx.to_number(training_loss)}, #{matches}%")
  end

  def train(x_train, y_train, x_test, y_test, iterations, lr) do
    {_, x_cols} = Nx.shape(x_train)
    {_, y_cols} = Nx.shape(y_train)
    w = Nx.broadcast(0, {x_cols, y_cols})

    Enum.reduce(0..iterations, w, fn i, w ->
      report(i, x_train, y_train, x_test, y_test, w)
      gradient = gradient(x_train, y_train, w)
      Nx.subtract(w, Nx.multiply(gradient, lr))
    end)
  end
end
weight = C12.Perceptron.train(x_train, y_train, x_test, y_test, iterations = 10_000, lr = 0.1)

Decision Boundary

defmodule C12.DecisionBoundary do
  def build_grid_dataset(x, y, classify_fn) do
    # Compute the grid boundaries
    x_min = Nx.reduce_min(x) |> Nx.to_number()
    x_max = Nx.reduce_max(x) |> Nx.to_number()
    y_min = Nx.reduce_min(x) |> Nx.to_number()
    y_max = Nx.reduce_max(y) |> Nx.to_number()

    # Define the grid of data that will be classified
    resolution = 200
    x_step = (x_max - x_min) / resolution
    y_step = (y_max - y_min) / resolution

    grid =
      for i <- 0..resolution, j <- 0..resolution do
        [x_min + x_step * i, y_min + y_step * j]
      end

    # Classification
    labels =
      grid
      |> Nx.tensor()
      |> classify_fn.()

    # Add the labels to the grid dataset
    Enum.zip_with([grid, Nx.to_flat_list(labels)], fn [[x, y], label] ->
      %{x: x, y: y, label: label}
    end)
  end
end

Plotting the Perceptron Boundary

# Get input A
x = x_train[[0..-1//1, 1..1]]
# Get input B
y = x_train[[0..-1//1, 2..2]]

data_with_labels =
  C12.DecisionBoundary.build_grid_dataset(x, y, fn grid ->
    grid
    |> C12.DataPrep.prepend_bias()
    |> C12.Perceptron.classify(weight)
  end)
alias VegaLite, as: Vl

Vl.new(width: 600, height: 400)
|> Vl.layers([
  # Grid
  Vl.new()
  |> Vl.data_from_values(data_with_labels)
  |> Vl.mark(:point)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["lightblue", "aquamarine"]}),
  # Inputs
  Vl.new()
  |> Vl.data_from_values(dataset)
  |> Vl.mark(:point, filled: true, tooltip: true)
  |> Vl.encode_field(:x, "input_a", type: :quantitative)
  |> Vl.encode_field(:y, "input_b", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["blue", "green"]})
  |> Vl.encode(:shape, field: "label", scale: %{"range" => ["square", "triangle-up"]}),
  # Threshold line
  Vl.new()
  |> Vl.data_from_values(data_with_labels)
  |> Vl.transform(filter: "datum['label'] == 1")
  |> Vl.mark(:line, stroke: "red", stroke_width: 3)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative, aggregate: :max)
])
|> Vl.resolve(:scale, x: :shared, y: :shared, color: :independent)

What does the perceptron’s boundary look like?

The decision boundary of the perceptron cuts the circle formed by the data points into half. This perfectly describes the 49% accuracy produced by the model because half of the points are misclassified as half of the data points are seen on the area they don’t belong.

Plotting the Neural Network Boundary

defmodule C11.NeuralNetwork do
  import Nx.Defn

  @doc """
  For w2_gradient:
  - Swap the operands of the multiplication then transpose one of them to get a result with the
  same dimension as w2.
  - We also need to do a prepend_bias/1 since we did it with forward/3, we need to do the same
  during backprop.
  - The matrix multiplication needs to be divide by the rows of x (number of examples) to get
  the average gradient

  For w1_gradient:
  - Use h as is, without a bias column since it has no effect on dL/dw1
  - Since we ignored the first column of h, we also have to ignore the first row of w1 to match
  columns by rows in matrix multiplication
  - `sigmoid_gradient/1` calculates sigmoid's gradient from sigmoid's output `h`
  - Lastly multiply x to the previous intermediate result with same rules when we calculate
  w2_gradient
  """
  def back(x, y, y_hat, w2, h) do
    {num_samples, _} = Nx.shape(x)

    w2_gradient =
      h
      |> prepend_bias()
      |> Nx.transpose()
      |> Nx.dot(Nx.subtract(y_hat, y))
      |> Nx.divide(num_samples)

    w1_gradient =
      x
      |> prepend_bias()
      |> Nx.transpose()
      |> Nx.dot(
        y_hat
        |> Nx.subtract(y)
        |> Nx.dot(Nx.transpose(w2[1..-1//1]))
        |> Nx.multiply(sigmoid_gradient(h))
      )
      |> Nx.divide(num_samples)

    {w1_gradient, w2_gradient}
  end

  def initialize_weights(n_input_vars, n_hidden_nodes, n_classes) do
    key = Nx.Random.key(1234)
    mean = 0.0
    standard_deviation = 0.01

    w1_rows = n_input_vars + 1

    {normal, new_key} =
      Nx.Random.normal(key, mean, standard_deviation, shape: {w1_rows, n_hidden_nodes})

    w1 = Nx.multiply(normal, Nx.sqrt(1 / w1_rows))

    w2_rows = n_hidden_nodes + 1

    {normal, _new_key} =
      Nx.Random.normal(new_key, mean, standard_deviation, shape: {w2_rows, n_classes})

    w2 = Nx.multiply(normal, Nx.sqrt(1 / w2_rows))

    {w1, w2}
  end

  def train(x_train, y_train, x_test, y_test, n_hidden_nodes, iterations, lr) do
    {_, n_input_variables} = Nx.shape(x_train)
    {_, n_classes} = Nx.shape(y_train)
    {w1, w2} = initialize_weights(n_input_variables, n_hidden_nodes, n_classes)

    Enum.reduce(1..iterations, {w1, w2}, fn iteration, {w1, w2} ->
      {y_hat, h} = forward(x_train, w1, w2)
      {w1_gradient, w2_gradient} = back(x_train, y_train, y_hat, w2, h)
      w1 = Nx.subtract(w1, Nx.multiply(w1_gradient, lr))
      w2 = Nx.subtract(w2, Nx.multiply(w2_gradient, lr))
      report(iteration, x_train, y_train, x_test, y_test, w1, w2)
      {w1, w2}
    end)
  end

  # Functions from `Ch 10: Building the Network` below

  defn sigmoid(z) do
    1 / (1 + Nx.exp(-z))
  end

  def softmax(logits) do
    exponentials = Nx.exp(logits)

    sum_of_exponentials_by_row =
      exponentials
      |> Nx.sum(axes: [1])
      |> Nx.reshape({:auto, 1})

    Nx.divide(exponentials, sum_of_exponentials_by_row)
  end

  def sigmoid_gradient(sigmoid) do
    Nx.multiply(sigmoid, Nx.subtract(1, sigmoid))
  end

  @doc """
  Cross-entropy loss
  Loss formula specific for multiclass classifiers.
  Measures the distance between the classifier's predictions and the labels.
  Lower loss means better classifier
  """
  def loss(y_train, y_hat) do
    {rows, _} = Nx.shape(y_train)

    y_train
    |> Nx.multiply(Nx.log(y_hat))
    |> Nx.sum()
    |> Nx.multiply(-1)
    |> Nx.divide(rows)
  end

  @doc """
  Implements the operation called "forward propagation"
  Steps:
  1. We add a bias to the inputs
  2. Compute the weighted sum using the 1st matrix of weights, w1
  3. Pass the result to the activation function (sigmoid or softmax)
  4. Repeat for all layers
  """
  def forward(x, w1, w2) do
    # Hidden layer
    h =
      x
      |> prepend_bias()
      |> then(&amp;Nx.dot(&amp;1, w1))
      |> sigmoid()

    # Output layer
    y_hat =
      h
      |> prepend_bias()
      |> then(&amp;Nx.dot(&amp;1, w2))
      |> softmax()

    {y_hat, h}
  end

  @doc """
  Same classify/2 function from Ch 7 but modified for neutral network
  """
  def classify(x, w1, w2) do
    x
    |> forward(w1, w2)
    |> elem(0)
    |> Nx.argmax(axis: 1)
    |> Nx.reshape({:auto, 1})
  end

  def report(iteration, x_train, y_train, x_test, y_test, w1, w2) do
    {y_hat, _} = forward(x_train, w1, w2)
    training_loss = loss(y_train, y_hat)
    classifications = classify(x_test, w1, w2)
    # y_test is not one-hot encoded

    # Measure how many classifications were gotten correctly by comparing
    # with y_test. The mean/1 function essentially will get the the sum of 1's (matches)
    # divided by the total number of classifications
    accuracy =
      classifications
      |> Nx.equal(y_test)
      |> Nx.mean()
      |> Nx.multiply(100.0)

    IO.puts(
      "Iteration: #{iteration}, Loss: #{Nx.to_number(training_loss)}, Accuracy: #{Nx.to_number(accuracy)}"
    )
  end
end
# No prepend bias on input data as the network's code take care of that
x_train = x_test = Nx.concatenate([input_a, input_b], axis: 1)

{weight_1, weight_2} =
  C11.NeuralNetwork.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes = 10,
    iterations = 100_000,
    lr = 0.3
  )
# Get input A
x = x_train[[0..-1//1, 0..0]]
# Get input B
y = x_train[[0..-1//1, 1..1]]

data_with_labels =
  C12.DecisionBoundary.build_grid_dataset(x, y, fn grid ->
    grid
    |> C11.NeuralNetwork.classify(weight_1, weight_2)
  end)
alias VegaLite, as: Vl

# Plot the grid with the labels

Vl.new(width: 600, height: 400)
|> Vl.layers([
  # Grid
  Vl.new()
  |> Vl.data_from_values(data_with_labels)
  |> Vl.mark(:point)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["lightblue", "aquamarine"]}),
  # Inputs
  Vl.new()
  |> Vl.data_from_values(dataset)
  |> Vl.mark(:point, filled: true, tooltip: true)
  |> Vl.encode_field(:x, "input_a", type: :quantitative)
  |> Vl.encode_field(:y, "input_b", type: :quantitative)
  |> Vl.encode(:color, field: "label", scale: %{"range" => ["blue", "green"]})
  |> Vl.encode(:shape, field: "label", scale: %{"range" => ["square", "triangle-up"]})
])
|> Vl.resolve(:scale, x: :shared, y: :shared, color: :independent)

How does the neural network fare on these data?

Neural network’s decision boundary almost perfectly encloses the green triangles on the green area while mostly all blue squares are in the blue area outside the decision boundary. There are few blue squares in the green area which was evident with the 98% accuracy of the model.