Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Hands On: The Smallest Batch

the_smallest_batch.livemd

Hands On: The Smallest Batch

Mix.install(
  [
    {:nx, "~> 0.7.1"},
    {:exla, "~> 0.7.1"},
    {:kino_vega_lite, "~> 0.1.10"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

MNIST Helpers

defmodule C7.MNIST do
  @moduledoc "MNIST functions from Chapter 7"
  def load_images(path) do
    # Open and unzip the file of images then store header information into variables
    <<_magic_number::32, n_images::32, n_rows::32, n_cols::32, images::binary>> =
      path
      |> File.read!()
      |> :zlib.gunzip()

    images
    # Create 1D tensor of type unsigned 8-bit integer from binary
    |> Nx.from_binary({:u, 8})
    # Reshape the pixels into a matrix into a matrix where each row is an image
    |> Nx.reshape({n_images, n_cols * n_rows})
  end

  @doc """
  Inserts a column of 1's into position 0 of tensor X along the the x-axis
  """
  def prepend_bias(x) do
    {row, _col} = Nx.shape(x)
    bias = Nx.broadcast(Nx.tensor(1), {row, 1})
    Nx.concatenate([bias, x], axis: 1)
  end

  def load_labels(filename) do
    # Open and unzip the file of images then read all the labels into a list
    <<_magic_number::32, n_items::32, labels::binary>> =
      filename
      |> File.read!()
      |> :zlib.gunzip()

    labels
    # Create 1D tensor of type unsigned 8-bit integer from binary
    |> Nx.from_binary({:u, 8})
    # Reshape the labels into a 1-column matrix
    |> Nx.reshape({n_items, 1})
  end

  @doc "Flip hot values to 1"
  def one_hot_encode(y) do
    Nx.equal(y, Nx.tensor(Enum.to_list(0..9)))
  end
end
files_path = __DIR__ |> Path.join("../files")

training_images_path = Path.join(files_path, "train-images-idx3-ubyte.gz")
training_labels_path = Path.join(files_path, "train-labels-idx1-ubyte.gz")

test_images_path = Path.join(files_path, "t10k-images-idx3-ubyte.gz")
test_labels_path = Path.join(files_path, "t10k-labels-idx1-ubyte.gz")

import C7.MNIST
y_train_unencoded = load_labels(training_labels_path)
y_train = one_hot_encode(y_train_unencoded)
y_test = load_labels(test_labels_path)

x_train = load_images(training_images_path)
x_test = load_images(test_images_path)

Neural Network

defmodule C11.NeuralNetwork do
  import C7.MNIST, only: [prepend_bias: 1]
  import Nx.Defn

  @doc """
  For w2_gradient:
  - Swap the operands of the multiplication then transpose one of them to get a result with the
  same dimension as w2.
  - We also need to do a prepend_bias/1 since we did it with forward/3, we need to do the same
  during backprop.
  - The matrix multiplication needs to be divide by the rows of x (number of examples) to get
  the average gradient

  For w1_gradient:
  - Use h as is, without a bias column since it has no effect on dL/dw1
  - Since we ignored the first column of h, we also have to ignore the first row of w1 to match
  columns by rows in matrix multiplication
  - `sigmoid_gradient/1` calculates sigmoid's gradient from sigmoid's output `h`
  - Lastly multiply x to the previous intermediate result with same rules when we calculate
  w2_gradient
  """
  def back(x, y, y_hat, w2, h) do
    {num_samples, _} = Nx.shape(x)

    w2_gradient =
      h
      |> prepend_bias()
      |> Nx.transpose()
      |> Nx.dot(Nx.subtract(y_hat, y))
      |> Nx.divide(num_samples)

    w1_gradient =
      x
      |> prepend_bias()
      |> Nx.transpose()
      |> Nx.dot(
        y_hat
        |> Nx.subtract(y)
        |> Nx.dot(Nx.transpose(w2[1..-1//1]))
        |> Nx.multiply(sigmoid_gradient(h))
      )
      |> Nx.divide(num_samples)

    {w1_gradient, w2_gradient}
  end

  def initialize_weights(n_input_vars, n_hidden_nodes, n_classes) do
    key = Nx.Random.key(1234)
    mean = 0.0
    standard_deviation = 0.01

    w1_rows = n_input_vars + 1

    {w1, new_key} =
      Nx.Random.normal(key, mean, standard_deviation, shape: {w1_rows, n_hidden_nodes})

    w2_rows = n_hidden_nodes + 1

    {w2, _new_key} =
      Nx.Random.normal(new_key, mean, standard_deviation, shape: {w2_rows, n_classes})

    {w1, w2}
  end

  # Functions from `Ch 10: Building the Network` below

  defn sigmoid(z) do
    1 / (1 + Nx.exp(-z))
  end

  def softmax(logits) do
    exponentials = Nx.exp(logits)

    sum_of_exponentials_by_row =
      exponentials
      |> Nx.sum(axes: [1])
      |> Nx.reshape({:auto, 1})

    Nx.divide(exponentials, sum_of_exponentials_by_row)
  end

  def sigmoid_gradient(sigmoid) do
    Nx.multiply(sigmoid, Nx.subtract(1, sigmoid))
  end

  @doc """
  Cross-entropy loss
  Loss formula specific for multiclass classifiers.
  Measures the distance between the classifier's predictions and the labels.
  Lower loss means better classifier
  """
  def loss(y_train, y_hat) do
    {rows, _} = Nx.shape(y_train)

    y_train
    |> Nx.multiply(Nx.log(y_hat))
    |> Nx.sum()
    |> Nx.multiply(-1)
    |> Nx.divide(rows)
  end

  @doc """
  Implements the operation called "forward propagation"
  Steps:
  1. We add a bias to the inputs
  2. Compute the weighted sum using the 1st matrix of weights, w1
  3. Pass the result to the activation function (sigmoid or softmax)
  4. Repeat for all layers
  """
  def forward(x, w1, w2) do
    # Hidden layer
    h =
      x
      |> prepend_bias()
      |> then(&amp;Nx.dot(&amp;1, w1))
      |> sigmoid()

    # Output layer
    y_hat =
      h
      |> prepend_bias()
      |> then(&amp;Nx.dot(&amp;1, w2))
      |> softmax()

    {y_hat, h}
  end

  @doc """
  Same classify/2 function from Ch 7 but modified for neutral network
  """
  def classify(x, w1, w2) do
    x
    |> forward(w1, w2)
    |> elem(0)
    |> Nx.argmax(axis: 1)
    |> Nx.reshape({:auto, 1})
  end
end

Implementing Batches

defmodule C13.NeuralNetwork.MiniBatch do
  import C11.NeuralNetwork,
    only: [back: 5, initialize_weights: 3, forward: 3, loss: 2, classify: 3]

  @doc """
  Loops from 0 to the given batch size and slices x_train to get a batch of examples.
  Doing the same with y_train. Resulting to 2 lists containing the training set split
  into batches.
  """
  def prepare_batches(x_train, y_train, batch_size) do
    {n_examples, _} = Nx.shape(x_train)

    {x_batches, y_batches} =
      Range.new(0, n_examples, batch_size)
      |> Enum.reduce_while({[], []}, fn batch_start, {x_batches, y_batches} ->
        length =
          if n_examples - batch_start < batch_size, do: n_examples - batch_start, else: batch_size

        cond do
          length == 0 ->
            {:halt, {x_batches, y_batches}}

          length < batch_size ->
            x_batch = Nx.slice_along_axis(x_train, batch_start, length)
            y_batch = Nx.slice_along_axis(y_train, batch_start, length)
            {:halt, {[x_batch | x_batches], [y_batch | y_batches]}}

          true ->
            x_batch = Nx.slice_along_axis(x_train, batch_start, length)
            y_batch = Nx.slice_along_axis(y_train, batch_start, length)
            {:cont, {[x_batch | x_batches], [y_batch | y_batches]}}
        end
      end)

    {Enum.reverse(x_batches), Enum.reverse(y_batches)}
  end

  def report(epoch, batch, x_train, y_train, x_test, y_test, w1, w2) do
    {y_hat, _} = forward(x_train, w1, w2)
    training_loss = loss(y_train, y_hat)
    classifications = classify(x_test, w1, w2)
    # y_test is not one-hot encoded

    # Measure how many classifications were gotten correctly by comparing
    # with y_test. The mean/1 function essentially will get the the sum of 1's (matches)
    # divided by the total number of classifications
    accuracy =
      classifications
      |> Nx.equal(y_test)
      |> Nx.mean()
      |> Nx.multiply(100.0)

    IO.puts(
      "#{epoch}-#{batch} > Loss: #{Nx.to_number(training_loss)}, Accuracy: #{Nx.to_number(accuracy)}"
    )

    Nx.to_number(training_loss)
  end

  def train(
        x_train,
        y_train,
        x_test,
        y_test,
        n_hidden_nodes,
        batch_size,
        lr,
        training_time
      ) do
    {_, n_input_variables} = Nx.shape(x_train)
    {_, n_classes} = Nx.shape(y_train)
    {w1, w2} = initialize_weights(n_input_variables, n_hidden_nodes, n_classes)
    {x_batches, y_batches} = prepare_batches(x_train, y_train, batch_size)

    state = %{
      w1: w1,
      w2: w2,
      x_batches: x_batches,
      y_batches: y_batches,
      start_time: Time.utc_now(),
      epoch: 0,
      batch: 0,
      steps: 0,
      losses: [],
      elapsed_times: [],
      num_batches: length(x_batches),
      training_time: training_time
    }

    do_train(x_train, y_train, x_test, y_test, lr, state)
  end

  defp do_train(x_train, y_train, x_test, y_test, lr, state) do
    state =
      %{
        w1: w1,
        w2: w2,
        x_batches: x_batches,
        y_batches: y_batches,
        start_time: start_time,
        epoch: epoch,
        batch: batch,
        steps: steps,
        losses: losses,
        elapsed_times: elapsed_times,
        num_batches: num_batches,
        training_time: training_time
      } = state

    elapsed_time = Time.diff(Time.utc_now(), start_time)

    cond do
      elapsed_time >= training_time ->
        state

      batch > num_batches - 1 ->
        do_train(x_train, y_train, x_test, y_test, lr, %{state | epoch: epoch + 1, batch: 0})

      true ->
        x_train_batch = Enum.at(x_batches, batch)
        y_train_batch = Enum.at(y_batches, batch)

        {y_hat, h} = forward(x_train_batch, w1, w2)
        {w1_gradient, w2_gradient} = back(x_train_batch, y_train_batch, y_hat, w2, h)
        w1 = Nx.subtract(w1, Nx.multiply(w1_gradient, lr))
        w2 = Nx.subtract(w2, Nx.multiply(w2_gradient, lr))
        loss = report(epoch, batch, x_train, y_train, x_test, y_test, w1, w2)

        state = %{
          state
          | w1: w1,
            w2: w2,
            losses: [loss | losses],
            elapsed_times: [elapsed_time | elapsed_times],
            batch: batch + 1,
            steps: steps + 1
        }

        do_train(x_train, y_train, x_test, y_test, lr, state)
    end
  end

  def plot_loss(training_fn, label) do
    IO.puts("Training: #{label}")
    %{losses: losses, elapsed_times: elapsed_times, steps: steps, epoch: epochs} = training_fn.()
    IO.puts("Loss: #{List.first(losses)} (#{epochs} completed, #{steps} total steps)")
    do_plot_loss(losses, elapsed_times, label)
  end

  defp do_plot_loss(losses, elapsed_times, label) do
    [losses, elapsed_times]
    |> Enum.zip_with(fn [loss, elapsed_time] ->
      %{loss: loss, elapsed_time: elapsed_time, type: label}
    end)
  end
end

What’s the smallest batch size that gives us better early feedback than plain old batch GD?

alias C13.NeuralNetwork.MiniBatch
# Training time is 20 minutes (1200 seconds)
training_time = 1_200
n_hidden_nodes = 200
lr = 0.01

batch_size_3_fn = fn ->
  batch_size = 3

  MiniBatch.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes,
    batch_size,
    lr,
    training_time
  )
end

batch_size_5_fn = fn ->
  batch_size = 5

  MiniBatch.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes,
    batch_size,
    lr,
    training_time
  )
end

batch_size_7_fn = fn ->
  batch_size = 7

  MiniBatch.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes,
    batch_size,
    lr,
    training_time
  )
end

batch_size_12_fn = fn ->
  batch_size = 12

  MiniBatch.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes,
    batch_size,
    lr,
    training_time
  )
end

batch_size_15_fn = fn ->
  batch_size = 15

  MiniBatch.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes,
    batch_size,
    lr,
    training_time
  )
end

{num_examples, _} = Nx.shape(x_train)

batch_gd_fn = fn ->
  batch_size = num_examples

  MiniBatch.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes,
    batch_size,
    lr,
    training_time
  )
end

batch_size_3 = MiniBatch.plot_loss(batch_size_3_fn, "Batch size 3")
batch_size_5 = MiniBatch.plot_loss(batch_size_5_fn, "Batch size 5")
batch_size_7 = MiniBatch.plot_loss(batch_size_7_fn, "Batch size 7")
batch_size_12 = MiniBatch.plot_loss(batch_size_12_fn, "Batch size 12")
batch_size_15 = MiniBatch.plot_loss(batch_size_15_fn, "Batch size 15")
batch_gd = MiniBatch.plot_loss(batch_gd_fn, "Batch GD")
batches = [batch_size_3, batch_size_5, batch_size_7, batch_size_12, batch_size_15, batch_gd]

VegaLite.new(width: 500, height: 500, title: "Training with Batches")
|> VegaLite.layers(
  batches
  |> Enum.map(fn batch ->
    VegaLite.new()
    |> VegaLite.data_from_values(batch)
    |> VegaLite.mark(:line)
    |> VegaLite.encode_field(:x, "elapsed_time", type: :quantitative, title: "Seconds")
    |> VegaLite.encode_field(:y, "loss", type: :quantitative, title: "Loss")
    |> VegaLite.encode(:color, field: "type")
  end)
)

As we can see in the diagram, the smallest batch size that can give us better early feedback is fifteen (15). Batch size 15 has the fastest feedback than those of smaller batches and its loss is the closest one to batch GD.

Stochastic GD with Different Learning Rate

alias C13.NeuralNetwork.MiniBatch
# Training time is 20 minutes (1200 seconds)
training_time = 1_200
n_hidden_nodes = 200
batch_size = 35

tenths_lr_fn = fn ->
  learning_rate = 0.1

  MiniBatch.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes,
    batch_size,
    learning_rate,
    training_time
  )
end

hundredths_lr_fn = fn ->
  learning_rate = 0.01

  MiniBatch.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes,
    batch_size,
    learning_rate,
    training_time
  )
end

thousandths_lr_fn = fn ->
  learning_rate = 0.001

  MiniBatch.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes,
    batch_size,
    learning_rate,
    training_time
  )
end

ten_thousandths_lr_fn = fn ->
  learning_rate = 0.0001

  MiniBatch.train(
    x_train,
    y_train,
    x_test,
    y_test,
    n_hidden_nodes,
    batch_size,
    learning_rate,
    training_time
  )
end

tenths_lr = MiniBatch.plot_loss(tenths_lr_fn, "lr = 0.1")
hundredths_lr = MiniBatch.plot_loss(hundredths_lr_fn, "lr = 0.01")
thousandths_lr = MiniBatch.plot_loss(thousandths_lr_fn, "lr = 0.001")
ten_thousandths_lr = MiniBatch.plot_loss(ten_thousandths_lr_fn, "lr = 0.0001")
learning_rates = [tenths_lr, hundredths_lr, thousandths_lr, ten_thousandths_lr]

VegaLite.new(width: 500, height: 500, title: "Stochastic GD with Different Learning Rates")
|> VegaLite.layers(
  learning_rates
  |> Enum.map(fn learning_rate ->
    VegaLite.new()
    |> VegaLite.data_from_values(learning_rate)
    |> VegaLite.mark(:line)
    |> VegaLite.encode_field(:x, "elapsed_time", type: :quantitative, title: "Seconds")
    |> VegaLite.encode_field(:y, "loss", type: :quantitative, title: "Loss")
    |> VegaLite.encode(:color, field: "type", title: "Learning Rates")
  end)
)

With different learning rates, 0.0001 gives the smoothest loss. As observed, the smoothness of the loss if inversely proportional to the learning rate. Although the smoothest loss does not give the fastest feedback nor the lowest loss. The learning rate of 0.01 gives both the best loss and the fastest feedback.