Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Chapter 13: Batchin' Up

13_batching/13_batching.livemd

Chapter 13: Batchin’ Up

Mix.install(
  [
    {:exla, "~> 0.5"},
    {:nx, "~> 0.5"},
    {:vega_lite, "~> 0.1.6"},
    {:kino, "~> 0.8.1"},
    {:kino_vega_lite, "~> 0.1.7"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Learning, Visualized

Load MNIST images

The module to load MNIST data is the same used in the chapter 11, where y_train is already hot-encoded.

defmodule C11.MNIST do
  @moduledoc """
  Use this Module to load the MNIST database (test, train, and labels).

  MNIST dataset specifications can be found here: http://yann.lecun.com/exdb/mnist/
  """

  @data_path Path.join(__DIR__, "../data/mnist") |> Path.expand()

  @train_images_filename Path.join(@data_path, "train-images-idx3-ubyte.gz")
  @test_images_filename Path.join(@data_path, "t10k-images-idx3-ubyte.gz")
  @train_labels_filename Path.join(@data_path, "train-labels-idx1-ubyte.gz")
  @test_labels_filename Path.join(@data_path, "t10k-labels-idx1-ubyte.gz")

  defstruct [:x_train, :x_test, :y_train, :y_test]

  @doc """
  Load the MNIST database and return the train and test images.

  `y_train` already hot-encoded.
  """
  def load() do
    %__MODULE__{
      # 60000 images, each 784 elements (28 * 28 pixels)
      x_train: load_images(@train_images_filename),
      # 10000 images, each 784 elements, with the same structure as `x_train`
      x_test: load_images(@test_images_filename),
      # 60000 labels
      y_train: load_labels(@train_labels_filename) |> one_hot_encode(),
      # 10000 labels, with the same encoding as `y_train`
      y_test: load_labels(@test_labels_filename)
    }
  end

  @doc """
  One-hot encode the given tensor (classes: from 0 to 9).
  """
  def one_hot_encode(y) do
    Nx.equal(y, Nx.tensor(Enum.to_list(0..9)))
  end

  @doc """
  Load the MNIST labels from the given file
  and return a matrix.
  """
  def load_labels(filename) do
    # Open and unzip the file of labels
    with {:ok, binary} <- File.read(filename) do
      <<_::32, n_labels::32, labels_binary::binary>> = :zlib.gunzip(binary)

      # Create a tensor from the binary and
      # reshape the list of labels into a one-column matrix.
      labels_binary
      |> Nx.from_binary({:u, 8})
      |> Nx.reshape({n_labels, 1})
    end
  end

  @doc """
  Load the MNIST images from the given file
  and return a matrix.
  """
  def load_images(filename) do
    # Open and unzip the file of images
    with {:ok, binary} <- File.read(filename) do
      <<_::32, n_images::32, n_rows::32, n_cols::32, images_binary::binary>> =
        :zlib.gunzip(binary)

      # Create a tensor from the binary and
      # reshape the pixels into a matrix where each line is an image.
      images_binary
      |> Nx.from_binary({:u, 8})
      |> Nx.reshape({n_images, n_cols * n_rows})
    end
  end
end

Load the data

# Use the public API to get train and test images
%{x_train: x_train, x_test: x_test, y_train: y_train, y_test: y_test} = C11.MNIST.load()

Classifier

Based on the one developed in Chapter 11.

defmodule C13.Classifier do
  import Nx.Defn

  defn sigmoid(z) do
    Nx.divide(1, Nx.add(1, Nx.exp(Nx.negate(z))))
  end

  defn softmax(logits) do
    exponentials = Nx.exp(logits)

    Nx.divide(
      exponentials,
      Nx.sum(exponentials, axes: [1]) |> Nx.reshape({:auto, 1})
    )
  end

  defn sigmoid_gradient(sigmoid) do
    Nx.multiply(sigmoid, 1 - sigmoid)
  end

  defn loss(y, y_hat) do
    -Nx.sum(y * Nx.log(y_hat)) / elem(Nx.shape(y), 0)
  end

  defn prepend_bias(x) do
    bias = Nx.broadcast(1, {elem(Nx.shape(x), 0), 1})

    # Insert a column of 1s in the position 0 of x.
    # ("axis: 1" stands for: "insert a column, not a row")
    Nx.concatenate([bias, x], axis: 1)
  end

  defn forward(x, weight1, weight2) do
    h = sigmoid(Nx.dot(prepend_bias(x), weight1))
    y_hat = softmax(Nx.dot(prepend_bias(h), weight2))

    {y_hat, h}
  end

  defn back(x, y, y_hat, weight2, h) do
    w2_gradient =
      Nx.dot(
        Nx.transpose(prepend_bias(h)),
        Nx.subtract(y_hat, y)
      ) / elem(Nx.shape(x), 0)

    w1_gradient =
      Nx.dot(
        Nx.transpose(prepend_bias(x)),
        Nx.dot(y_hat - y, Nx.transpose(weight2[1..-1//1])) * sigmoid_gradient(h)
      ) / elem(Nx.shape(x), 0)

    {w1_gradient, w2_gradient}
  end

  defn classify(x, weight1, weight2) do
    {y_hat, _h} = forward(x, weight1, weight2)
    labels = Nx.argmax(y_hat, axis: 1)
    Nx.reshape(labels, {:auto, 1})
  end

  defn initialize_weights(opts \\ []) do
    opts = keyword!(opts, [:w1_shape, :w2_shape])
    mean = 0.0
    std_deviation = 0.01

    prng_key = Nx.Random.key(1234)

    {weight1, new_prng_key} =
      Nx.Random.normal(prng_key, mean, std_deviation, shape: opts[:w1_shape])

    {weight2, _new_prng_key} =
      Nx.Random.normal(new_prng_key, mean, std_deviation, shape: opts[:w2_shape])

    {weight1, weight2}
  end

  def report(iteration, x_train, y_train, x_test, y_test, weight1, weight2) do
    {y_hat, _h} = forward(x_train, weight1, weight2)
    training_loss = loss(y_train, y_hat) |> Nx.to_number()
    classifications = classify(x_test, weight1, weight2)
    accuracy = Nx.multiply(Nx.mean(Nx.equal(classifications, y_test)), 100.0) |> Nx.to_number()

    IO.puts("Iteration #{iteration}, Loss: #{training_loss}, Accuracy: #{accuracy}%")

    {training_loss, accuracy}
  end

  def train(x_train, y_train, x_test, y_test, n_hidden_nodes, iterations, lr) do
    n_input_variables = elem(Nx.shape(x_train), 1)
    n_classes = elem(Nx.shape(y_train), 1)

    {initial_weight_1, initial_weight_2} =
      initialize_weights(
        w1_shape: {n_input_variables + 1, n_hidden_nodes},
        w2_shape: {n_hidden_nodes + 1, n_classes}
      )

    initial_loss_history = []
    initial_accuracy_history = []

    initial_accumulator = {
      {initial_weight_1, initial_weight_2},
      {initial_loss_history, initial_accuracy_history}
    }

    Enum.reduce(0..(iterations - 1), initial_accumulator, fn i,
                                                             {{w1, w2},
                                                              {loss_hist, accuracy_hist}} ->
      {updated_w1, updated_w2} = step(x_train, y_train, w1, w2, lr)
      {loss, accuracy} = report(i, x_train, y_train, x_test, y_test, updated_w1, updated_w2)

      {{updated_w1, updated_w2}, {loss_hist ++ [loss], accuracy_hist ++ [accuracy]}}
    end)
  end

  defnp step(x_train, y_train, w1, w2, lr) do
    {y_hat, h} = forward(x_train, w1, w2)
    {w1_gradient, w2_gradient} = back(x_train, y_train, y_hat, w2, h)
    w1 = w1 - w1_gradient * lr
    w2 = w2 - w2_gradient * lr

    {w1, w2}
  end
end

Loss and Accuracy

hidden_nodes = 200
# 1000
iterations = 30
learning_rate = 0.01

{{_w1, _w2}, {loss_history, accuracy_history}} =
  C13.Classifier.train(
    x_train,
    y_train,
    x_test,
    y_test,
    hidden_nodes,
    iterations,
    learning_rate
  )
alias VegaLite, as: Vl

loss_inputs =
  loss_history
  |> Enum.with_index(1)
  |> Enum.map(fn {loss, iteration} -> %{loss: loss, iteration: iteration} end)

accuracy_inputs =
  accuracy_history
  |> Enum.with_index(1)
  |> Enum.map(fn {accuracy, iteration} -> %{accuracy: accuracy, iteration: iteration} end)

Vl.new(width: 600, height: 400)
|> Vl.concat([
  Vl.new()
  |> Vl.data_from_values(loss_inputs)
  |> Vl.mark(:line)
  |> Vl.encode_field(:x, "iteration", type: :quantitative)
  |> Vl.encode_field(:y, "loss", type: :quantitative),
  Vl.new()
  |> Vl.data_from_values(accuracy_inputs)
  |> Vl.mark(:line)
  |> Vl.encode_field(:x, "iteration", type: :quantitative)
  |> Vl.encode_field(:y, "accuracy", type: :quantitative)
])

Batch by Batch

Update the Classifier to accept the data divided in batches.

defmodule C13.ClassifierWithBatches do
  import Nx.Defn

  defn sigmoid(z) do
    Nx.divide(1, Nx.add(1, Nx.exp(Nx.negate(z))))
  end

  defn softmax(logits) do
    exponentials = Nx.exp(logits)

    Nx.divide(
      exponentials,
      Nx.sum(exponentials, axes: [1]) |> Nx.reshape({:auto, 1})
    )
  end

  defn sigmoid_gradient(sigmoid) do
    Nx.multiply(sigmoid, 1 - sigmoid)
  end

  defn loss(y, y_hat) do
    -Nx.sum(y * Nx.log(y_hat)) / elem(Nx.shape(y), 0)
  end

  defn prepend_bias(x) do
    bias = Nx.broadcast(1, {elem(Nx.shape(x), 0), 1})

    # Insert a column of 1s in the position 0 of x.
    # ("axis: 1" stands for: "insert a column, not a row")
    Nx.concatenate([bias, x], axis: 1)
  end

  defn forward(x, weight1, weight2) do
    h = sigmoid(Nx.dot(prepend_bias(x), weight1))
    y_hat = softmax(Nx.dot(prepend_bias(h), weight2))

    {y_hat, h}
  end

  defn back(x, y, y_hat, weight2, h) do
    w2_gradient =
      Nx.dot(
        Nx.transpose(prepend_bias(h)),
        Nx.subtract(y_hat, y)
      ) / elem(Nx.shape(x), 0)

    w1_gradient =
      Nx.dot(
        Nx.transpose(prepend_bias(x)),
        Nx.dot(y_hat - y, Nx.transpose(weight2[1..-1//1])) * sigmoid_gradient(h)
      ) / elem(Nx.shape(x), 0)

    {w1_gradient, w2_gradient}
  end

  defn classify(x, weight1, weight2) do
    {y_hat, _h} = forward(x, weight1, weight2)
    labels = Nx.argmax(y_hat, axis: 1)
    Nx.reshape(labels, {:auto, 1})
  end

  defn initialize_weights(opts \\ []) do
    opts = keyword!(opts, [:w1_shape, :w2_shape])
    mean = 0.0
    std_deviation = 0.01

    prng_key = Nx.Random.key(1234)

    {weight1, new_prng_key} =
      Nx.Random.normal(prng_key, mean, std_deviation, shape: opts[:w1_shape])

    {weight2, _new_prng_key} =
      Nx.Random.normal(new_prng_key, mean, std_deviation, shape: opts[:w2_shape])

    {weight1, weight2}
  end

  def prepare_batches(x_train, y_train, batch_size) do
    x_batches = Nx.to_batched(x_train, batch_size)
    y_batches = Nx.to_batched(y_train, batch_size)

    {x_batches, y_batches}
  end

  def report(epoch, batch, x_train, y_train, x_test, y_test, weight1, weight2) do
    {y_hat, _h} = forward(x_train, weight1, weight2)
    training_loss = loss(y_train, y_hat) |> Nx.to_number()
    classifications = classify(x_test, weight1, weight2)
    accuracy = Nx.multiply(Nx.mean(Nx.equal(classifications, y_test)), 100.0) |> Nx.to_number()

    IO.puts("#{epoch}-#{batch} > Loss: #{training_loss}, Accuracy: #{accuracy}%")

    {training_loss, accuracy}
  end

  def train(x_train, y_train, x_test, y_test, n_hidden_nodes, epochs, batch_size, lr) do
    n_input_variables = elem(Nx.shape(x_train), 1)
    n_classes = elem(Nx.shape(y_train), 1)

    {initial_weight_1, initial_weight_2} =
      initialize_weights(
        w1_shape: {n_input_variables + 1, n_hidden_nodes},
        w2_shape: {n_hidden_nodes + 1, n_classes}
      )

    {x_batches, y_batches} = prepare_batches(x_train, y_train, batch_size)

    batches_count = x_batches |> Enum.to_list() |> length()

    for epoch <- 0..(epochs - 1),
        batch <- 0..(batches_count - 1),
        reduce: {initial_weight_1, initial_weight_2} do
      {w1, w2} ->
        {updated_w1, updated_w2} =
          step(Enum.at(x_batches, batch), Enum.at(y_batches, batch), w1, w2, lr)

        {_loss, _accuracy} =
          report(epoch, batch, x_train, y_train, x_test, y_test, updated_w1, updated_w2)

        {updated_w1, updated_w2}
    end
  end

  defnp step(x_batch, y_batch, w1, w2, lr) do
    {y_hat, h} = forward(x_batch, w1, w2)
    {w1_gradient, w2_gradient} = back(x_batch, y_batch, y_hat, w2, h)
    w1 = w1 - w1_gradient * lr
    w2 = w2 - w2_gradient * lr

    {w1, w2}
  end
end
hidden_nodes = 200
epochs = 2
batch_size = 20_000
learning_rate = 0.01

{_w1, _w2} =
  C13.ClassifierWithBatches.train(
    x_train,
    y_train,
    x_test,
    y_test,
    hidden_nodes,
    epochs,
    batch_size,
    learning_rate
  )

:ok

Understanding Batches

This train function is different from the previous one in a few ways:

  • it goes on until a specified time has passed, rather than after a specified number of epochs;
  • it does its job quietly instead of reporting the loss and accuracy at each step;
  • it stores the loss and the time passed after each step, so that it can return that history to the caller;
  • it also returns the number of training epochs and the total number of gradient descent steps.
defmodule C13.ClassifierWithTimeSeries do
  import Nx.Defn

  defn sigmoid(z) do
    Nx.divide(1, Nx.add(1, Nx.exp(Nx.negate(z))))
  end

  defn softmax(logits) do
    exponentials = Nx.exp(logits)

    Nx.divide(
      exponentials,
      Nx.sum(exponentials, axes: [1]) |> Nx.reshape({:auto, 1})
    )
  end

  defn sigmoid_gradient(sigmoid) do
    Nx.multiply(sigmoid, 1 - sigmoid)
  end

  defn loss(y, y_hat) do
    -Nx.sum(y * Nx.log(y_hat)) / elem(Nx.shape(y), 0)
  end

  defn prepend_bias(x) do
    bias = Nx.broadcast(1, {elem(Nx.shape(x), 0), 1})

    # Insert a column of 1s in the position 0 of x.
    # ("axis: 1" stands for: "insert a column, not a row")
    Nx.concatenate([bias, x], axis: 1)
  end

  defn forward(x, weight1, weight2) do
    h = sigmoid(Nx.dot(prepend_bias(x), weight1))
    y_hat = softmax(Nx.dot(prepend_bias(h), weight2))

    {y_hat, h}
  end

  defn back(x, y, y_hat, weight2, h) do
    w2_gradient =
      Nx.dot(
        Nx.transpose(prepend_bias(h)),
        Nx.subtract(y_hat, y)
      ) / elem(Nx.shape(x), 0)

    w1_gradient =
      Nx.dot(
        Nx.transpose(prepend_bias(x)),
        Nx.dot(y_hat - y, Nx.transpose(weight2[1..-1//1])) * sigmoid_gradient(h)
      ) / elem(Nx.shape(x), 0)

    {w1_gradient, w2_gradient}
  end

  defn classify(x, weight1, weight2) do
    {y_hat, _h} = forward(x, weight1, weight2)
    labels = Nx.argmax(y_hat, axis: 1)
    Nx.reshape(labels, {:auto, 1})
  end

  defn initialize_weights(opts \\ []) do
    opts = keyword!(opts, [:w1_shape, :w2_shape])
    mean = 0.0
    std_deviation = 0.01

    prng_key = Nx.Random.key(1234)

    {weight1, new_prng_key} =
      Nx.Random.normal(prng_key, mean, std_deviation, shape: opts[:w1_shape])

    {weight2, _new_prng_key} =
      Nx.Random.normal(new_prng_key, mean, std_deviation, shape: opts[:w2_shape])

    {weight1, weight2}
  end

  def prepare_batches(x_train, y_train, batch_size) do
    x_batches = Nx.to_batched(x_train, batch_size)
    y_batches = Nx.to_batched(y_train, batch_size)

    {x_batches, y_batches}
  end

  def compute_loss_and_update_state(state, x_train, y_train, w1, w2) do
    {y_hat, _h} = forward(x_train, w1, w2)
    training_loss = loss(y_train, y_hat) |> Nx.to_number()

    %{
      state
      | losses: state.losses ++ [training_loss],
        times: state.times ++ [DateTime.utc_now()]
    }
  end

  def train(
        x_train,
        y_train,
        _x_test,
        _y_test,
        n_hidden_nodes,
        batch_size,
        lr,
        timeout_after_in_seconds
      ) do
    n_input_variables = elem(Nx.shape(x_train), 1)
    n_classes = elem(Nx.shape(y_train), 1)

    {initial_weight_1, initial_weight_2} =
      initialize_weights(
        w1_shape: {n_input_variables + 1, n_hidden_nodes},
        w2_shape: {n_hidden_nodes + 1, n_classes}
      )

    {x_batches, y_batches} = prepare_batches(x_train, y_train, batch_size)

    now = DateTime.utc_now()

    initial_state = %{
      w1: initial_weight_1,
      w2: initial_weight_2,
      losses: [],
      times: [],
      steps: 0,
      epochs: 0,
      batch: 0,
      start_time: now,
      timeout_at: DateTime.add(now, timeout_after_in_seconds, :second)
    }

    do_train(x_train, y_train, x_batches, y_batches, lr, initial_state)
  end

  defp do_train(
         x_train,
         y_train,
         x_batches,
         y_batches,
         lr,
         %{w1: w1, w2: w2, timeout_at: timeout_at, batch: batch} = state
       ) do
    updated_state = compute_loss_and_update_state(state, x_train, y_train, w1, w2)

    batches_count = x_batches |> Enum.to_list() |> length()

    cond do
      DateTime.compare(DateTime.utc_now(), timeout_at) == :gt ->
        updated_state

      batch > batches_count - 1 ->
        do_train(
          x_train,
          y_train,
          x_batches,
          y_batches,
          lr,
          %{updated_state | batch: 0, epochs: state.epochs + 1}
        )

      true ->
        {updated_w1, updated_w2} =
          step(Enum.at(x_batches, batch), Enum.at(y_batches, batch), w1, w2, lr)

        do_train(
          x_train,
          y_train,
          x_batches,
          y_batches,
          lr,
          %{
            updated_state
            | w1: updated_w1,
              w2: updated_w2,
              batch: batch + 1,
              steps: state.steps + 1
          }
        )
    end
  end

  defnp step(x_batch, y_batch, w1, w2, lr) do
    {y_hat, h} = forward(x_batch, w1, w2)
    {w1_gradient, w2_gradient} = back(x_batch, y_batch, y_hat, w2, h)
    w1 = w1 - w1_gradient * lr
    w2 = w2 - w2_gradient * lr

    {w1, w2}
  end
end
hidden_nodes = 200
learning_rate = 0.01

# In the book is 1800 seconds for each training
# timeout_after_in_seconds = 1800

timeout_after_in_seconds = 60

# Stochastic GD
stochastic_gd_size = 1

stochastic_gd_result =
  C13.ClassifierWithTimeSeries.train(
    x_train,
    y_train,
    x_test,
    y_test,
    hidden_nodes,
    stochastic_gd_size,
    learning_rate,
    timeout_after_in_seconds
  )

# Batch size 32
batch32_result =
  C13.ClassifierWithTimeSeries.train(
    x_train,
    y_train,
    x_test,
    y_test,
    hidden_nodes,
    32,
    learning_rate,
    timeout_after_in_seconds
  )

# Batch size 128
batch128_result =
  C13.ClassifierWithTimeSeries.train(
    x_train,
    y_train,
    x_test,
    y_test,
    hidden_nodes,
    128,
    learning_rate,
    timeout_after_in_seconds
  )

# Batch GD
batch_gd_size = elem(Nx.shape(x_train), 0)

batch_gd_result =
  C13.ClassifierWithTimeSeries.train(
    x_train,
    y_train,
    x_test,
    y_test,
    hidden_nodes,
    batch_gd_size,
    learning_rate,
    timeout_after_in_seconds
  )
alias VegaLite, as: Vl

plot_loss_fn = fn %{losses: losses, times: times, start_time: start_time}, type ->
  Enum.zip_with([losses, times], fn [l, t] ->
    %{
      type: type,
      loss: l,
      time: DateTime.diff(t, start_time, :millisecond) / 1000
    }
  end)
end

stochastic_gd_inputs = plot_loss_fn.(stochastic_gd_result, "Stochastic GD")
batch32_inputs = plot_loss_fn.(batch32_result, "Batch 32")
batch128_inputs = plot_loss_fn.(batch128_result, "Batch 128")
batch_gd_inputs = plot_loss_fn.(batch_gd_result, "Batch GD")

Vl.new(width: 600, height: 400)
|> Vl.layers([
  Vl.new()
  |> Vl.data_from_values(stochastic_gd_inputs)
  |> Vl.mark(:line)
  |> Vl.encode_field(:x, "time", title: "time (seconds)", type: :quantitative)
  |> Vl.encode_field(:y, "loss", title: "loss", type: :quantitative)
  |> Vl.encode(:color, field: "type"),
  Vl.new()
  |> Vl.data_from_values(batch32_inputs)
  |> Vl.mark(:line)
  |> Vl.encode_field(:x, "time", title: "time (seconds)", type: :quantitative)
  |> Vl.encode_field(:y, "loss", title: "loss", type: :quantitative)
  |> Vl.encode(:color, field: "type"),
  Vl.new()
  |> Vl.data_from_values(batch128_inputs)
  |> Vl.mark(:line)
  |> Vl.encode_field(:x, "time", title: "time (seconds)", type: :quantitative)
  |> Vl.encode_field(:y, "loss", title: "loss", type: :quantitative)
  |> Vl.encode(:color, field: "type"),
  Vl.new()
  |> Vl.data_from_values(batch_gd_inputs)
  |> Vl.mark(:line)
  |> Vl.encode_field(:x, "time", title: "time (seconds)", type: :quantitative)
  |> Vl.encode_field(:y, "loss", title: "loss", type: :quantitative)
  |> Vl.encode(:color, field: "type")
])

1800 seconds trainings each

In the book the training for each batch size lasts 1800 seconds, and therefore it takes 2 hours ca. Below the final result.