Hands On: The Smallest Batch
Mix.install(
[
{:nx, "~> 0.7.1"},
{:exla, "~> 0.7.1"},
{:kino_vega_lite, "~> 0.1.10"}
],
config: [nx: [default_backend: EXLA.Backend]]
)
MNIST Helpers
defmodule C7.MNIST do
@moduledoc "MNIST functions from Chapter 7"
def load_images(path) do
# Open and unzip the file of images then store header information into variables
<<_magic_number::32, n_images::32, n_rows::32, n_cols::32, images::binary>> =
path
|> File.read!()
|> :zlib.gunzip()
images
# Create 1D tensor of type unsigned 8-bit integer from binary
|> Nx.from_binary({:u, 8})
# Reshape the pixels into a matrix into a matrix where each row is an image
|> Nx.reshape({n_images, n_cols * n_rows})
end
@doc """
Inserts a column of 1's into position 0 of tensor X along the the x-axis
"""
def prepend_bias(x) do
{row, _col} = Nx.shape(x)
bias = Nx.broadcast(Nx.tensor(1), {row, 1})
Nx.concatenate([bias, x], axis: 1)
end
def load_labels(filename) do
# Open and unzip the file of images then read all the labels into a list
<<_magic_number::32, n_items::32, labels::binary>> =
filename
|> File.read!()
|> :zlib.gunzip()
labels
# Create 1D tensor of type unsigned 8-bit integer from binary
|> Nx.from_binary({:u, 8})
# Reshape the labels into a 1-column matrix
|> Nx.reshape({n_items, 1})
end
@doc "Flip hot values to 1"
def one_hot_encode(y) do
Nx.equal(y, Nx.tensor(Enum.to_list(0..9)))
end
end
files_path = __DIR__ |> Path.join("../files")
training_images_path = Path.join(files_path, "train-images-idx3-ubyte.gz")
training_labels_path = Path.join(files_path, "train-labels-idx1-ubyte.gz")
test_images_path = Path.join(files_path, "t10k-images-idx3-ubyte.gz")
test_labels_path = Path.join(files_path, "t10k-labels-idx1-ubyte.gz")
import C7.MNIST
y_train_unencoded = load_labels(training_labels_path)
y_train = one_hot_encode(y_train_unencoded)
y_test = load_labels(test_labels_path)
x_train = load_images(training_images_path)
x_test = load_images(test_images_path)
Neural Network
defmodule C11.NeuralNetwork do
import C7.MNIST, only: [prepend_bias: 1]
import Nx.Defn
@doc """
For w2_gradient:
- Swap the operands of the multiplication then transpose one of them to get a result with the
same dimension as w2.
- We also need to do a prepend_bias/1 since we did it with forward/3, we need to do the same
during backprop.
- The matrix multiplication needs to be divide by the rows of x (number of examples) to get
the average gradient
For w1_gradient:
- Use h as is, without a bias column since it has no effect on dL/dw1
- Since we ignored the first column of h, we also have to ignore the first row of w1 to match
columns by rows in matrix multiplication
- `sigmoid_gradient/1` calculates sigmoid's gradient from sigmoid's output `h`
- Lastly multiply x to the previous intermediate result with same rules when we calculate
w2_gradient
"""
def back(x, y, y_hat, w2, h) do
{num_samples, _} = Nx.shape(x)
w2_gradient =
h
|> prepend_bias()
|> Nx.transpose()
|> Nx.dot(Nx.subtract(y_hat, y))
|> Nx.divide(num_samples)
w1_gradient =
x
|> prepend_bias()
|> Nx.transpose()
|> Nx.dot(
y_hat
|> Nx.subtract(y)
|> Nx.dot(Nx.transpose(w2[1..-1//1]))
|> Nx.multiply(sigmoid_gradient(h))
)
|> Nx.divide(num_samples)
{w1_gradient, w2_gradient}
end
def initialize_weights(n_input_vars, n_hidden_nodes, n_classes) do
key = Nx.Random.key(1234)
mean = 0.0
standard_deviation = 0.01
w1_rows = n_input_vars + 1
{w1, new_key} =
Nx.Random.normal(key, mean, standard_deviation, shape: {w1_rows, n_hidden_nodes})
w2_rows = n_hidden_nodes + 1
{w2, _new_key} =
Nx.Random.normal(new_key, mean, standard_deviation, shape: {w2_rows, n_classes})
{w1, w2}
end
# Functions from `Ch 10: Building the Network` below
defn sigmoid(z) do
1 / (1 + Nx.exp(-z))
end
def softmax(logits) do
exponentials = Nx.exp(logits)
sum_of_exponentials_by_row =
exponentials
|> Nx.sum(axes: [1])
|> Nx.reshape({:auto, 1})
Nx.divide(exponentials, sum_of_exponentials_by_row)
end
def sigmoid_gradient(sigmoid) do
Nx.multiply(sigmoid, Nx.subtract(1, sigmoid))
end
@doc """
Cross-entropy loss
Loss formula specific for multiclass classifiers.
Measures the distance between the classifier's predictions and the labels.
Lower loss means better classifier
"""
def loss(y_train, y_hat) do
{rows, _} = Nx.shape(y_train)
y_train
|> Nx.multiply(Nx.log(y_hat))
|> Nx.sum()
|> Nx.multiply(-1)
|> Nx.divide(rows)
end
@doc """
Implements the operation called "forward propagation"
Steps:
1. We add a bias to the inputs
2. Compute the weighted sum using the 1st matrix of weights, w1
3. Pass the result to the activation function (sigmoid or softmax)
4. Repeat for all layers
"""
def forward(x, w1, w2) do
# Hidden layer
h =
x
|> prepend_bias()
|> then(&Nx.dot(&1, w1))
|> sigmoid()
# Output layer
y_hat =
h
|> prepend_bias()
|> then(&Nx.dot(&1, w2))
|> softmax()
{y_hat, h}
end
@doc """
Same classify/2 function from Ch 7 but modified for neutral network
"""
def classify(x, w1, w2) do
x
|> forward(w1, w2)
|> elem(0)
|> Nx.argmax(axis: 1)
|> Nx.reshape({:auto, 1})
end
end
Implementing Batches
defmodule C13.NeuralNetwork.MiniBatch do
import C11.NeuralNetwork,
only: [back: 5, initialize_weights: 3, forward: 3, loss: 2, classify: 3]
@doc """
Loops from 0 to the given batch size and slices x_train to get a batch of examples.
Doing the same with y_train. Resulting to 2 lists containing the training set split
into batches.
"""
def prepare_batches(x_train, y_train, batch_size) do
{n_examples, _} = Nx.shape(x_train)
{x_batches, y_batches} =
Range.new(0, n_examples, batch_size)
|> Enum.reduce_while({[], []}, fn batch_start, {x_batches, y_batches} ->
length =
if n_examples - batch_start < batch_size, do: n_examples - batch_start, else: batch_size
cond do
length == 0 ->
{:halt, {x_batches, y_batches}}
length < batch_size ->
x_batch = Nx.slice_along_axis(x_train, batch_start, length)
y_batch = Nx.slice_along_axis(y_train, batch_start, length)
{:halt, {[x_batch | x_batches], [y_batch | y_batches]}}
true ->
x_batch = Nx.slice_along_axis(x_train, batch_start, length)
y_batch = Nx.slice_along_axis(y_train, batch_start, length)
{:cont, {[x_batch | x_batches], [y_batch | y_batches]}}
end
end)
{Enum.reverse(x_batches), Enum.reverse(y_batches)}
end
def report(epoch, batch, x_train, y_train, x_test, y_test, w1, w2) do
{y_hat, _} = forward(x_train, w1, w2)
training_loss = loss(y_train, y_hat)
classifications = classify(x_test, w1, w2)
# y_test is not one-hot encoded
# Measure how many classifications were gotten correctly by comparing
# with y_test. The mean/1 function essentially will get the the sum of 1's (matches)
# divided by the total number of classifications
accuracy =
classifications
|> Nx.equal(y_test)
|> Nx.mean()
|> Nx.multiply(100.0)
IO.puts(
"#{epoch}-#{batch} > Loss: #{Nx.to_number(training_loss)}, Accuracy: #{Nx.to_number(accuracy)}"
)
Nx.to_number(training_loss)
end
def train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes,
batch_size,
lr,
training_time
) do
{_, n_input_variables} = Nx.shape(x_train)
{_, n_classes} = Nx.shape(y_train)
{w1, w2} = initialize_weights(n_input_variables, n_hidden_nodes, n_classes)
{x_batches, y_batches} = prepare_batches(x_train, y_train, batch_size)
state = %{
w1: w1,
w2: w2,
x_batches: x_batches,
y_batches: y_batches,
start_time: Time.utc_now(),
epoch: 0,
batch: 0,
steps: 0,
losses: [],
elapsed_times: [],
num_batches: length(x_batches),
training_time: training_time
}
do_train(x_train, y_train, x_test, y_test, lr, state)
end
defp do_train(x_train, y_train, x_test, y_test, lr, state) do
state =
%{
w1: w1,
w2: w2,
x_batches: x_batches,
y_batches: y_batches,
start_time: start_time,
epoch: epoch,
batch: batch,
steps: steps,
losses: losses,
elapsed_times: elapsed_times,
num_batches: num_batches,
training_time: training_time
} = state
elapsed_time = Time.diff(Time.utc_now(), start_time)
cond do
elapsed_time >= training_time ->
state
batch > num_batches - 1 ->
do_train(x_train, y_train, x_test, y_test, lr, %{state | epoch: epoch + 1, batch: 0})
true ->
x_train_batch = Enum.at(x_batches, batch)
y_train_batch = Enum.at(y_batches, batch)
{y_hat, h} = forward(x_train_batch, w1, w2)
{w1_gradient, w2_gradient} = back(x_train_batch, y_train_batch, y_hat, w2, h)
w1 = Nx.subtract(w1, Nx.multiply(w1_gradient, lr))
w2 = Nx.subtract(w2, Nx.multiply(w2_gradient, lr))
loss = report(epoch, batch, x_train, y_train, x_test, y_test, w1, w2)
state = %{
state
| w1: w1,
w2: w2,
losses: [loss | losses],
elapsed_times: [elapsed_time | elapsed_times],
batch: batch + 1,
steps: steps + 1
}
do_train(x_train, y_train, x_test, y_test, lr, state)
end
end
def plot_loss(training_fn, label) do
IO.puts("Training: #{label}")
%{losses: losses, elapsed_times: elapsed_times, steps: steps, epoch: epochs} = training_fn.()
IO.puts("Loss: #{List.first(losses)} (#{epochs} completed, #{steps} total steps)")
do_plot_loss(losses, elapsed_times, label)
end
defp do_plot_loss(losses, elapsed_times, label) do
[losses, elapsed_times]
|> Enum.zip_with(fn [loss, elapsed_time] ->
%{loss: loss, elapsed_time: elapsed_time, type: label}
end)
end
end
What’s the smallest batch size that gives us better early feedback than plain old batch GD?
alias C13.NeuralNetwork.MiniBatch
# Training time is 20 minutes (1200 seconds)
training_time = 1_200
n_hidden_nodes = 200
lr = 0.01
batch_size_3_fn = fn ->
batch_size = 3
MiniBatch.train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes,
batch_size,
lr,
training_time
)
end
batch_size_5_fn = fn ->
batch_size = 5
MiniBatch.train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes,
batch_size,
lr,
training_time
)
end
batch_size_7_fn = fn ->
batch_size = 7
MiniBatch.train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes,
batch_size,
lr,
training_time
)
end
batch_size_12_fn = fn ->
batch_size = 12
MiniBatch.train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes,
batch_size,
lr,
training_time
)
end
batch_size_15_fn = fn ->
batch_size = 15
MiniBatch.train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes,
batch_size,
lr,
training_time
)
end
{num_examples, _} = Nx.shape(x_train)
batch_gd_fn = fn ->
batch_size = num_examples
MiniBatch.train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes,
batch_size,
lr,
training_time
)
end
batch_size_3 = MiniBatch.plot_loss(batch_size_3_fn, "Batch size 3")
batch_size_5 = MiniBatch.plot_loss(batch_size_5_fn, "Batch size 5")
batch_size_7 = MiniBatch.plot_loss(batch_size_7_fn, "Batch size 7")
batch_size_12 = MiniBatch.plot_loss(batch_size_12_fn, "Batch size 12")
batch_size_15 = MiniBatch.plot_loss(batch_size_15_fn, "Batch size 15")
batch_gd = MiniBatch.plot_loss(batch_gd_fn, "Batch GD")
batches = [batch_size_3, batch_size_5, batch_size_7, batch_size_12, batch_size_15, batch_gd]
VegaLite.new(width: 500, height: 500, title: "Training with Batches")
|> VegaLite.layers(
batches
|> Enum.map(fn batch ->
VegaLite.new()
|> VegaLite.data_from_values(batch)
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "elapsed_time", type: :quantitative, title: "Seconds")
|> VegaLite.encode_field(:y, "loss", type: :quantitative, title: "Loss")
|> VegaLite.encode(:color, field: "type")
end)
)
As we can see in the diagram, the smallest batch size that can give us better early feedback is fifteen (15). Batch size 15 has the fastest feedback than those of smaller batches and its loss is the closest one to batch GD.
Stochastic GD with Different Learning Rate
alias C13.NeuralNetwork.MiniBatch
# Training time is 20 minutes (1200 seconds)
training_time = 1_200
n_hidden_nodes = 200
batch_size = 35
tenths_lr_fn = fn ->
learning_rate = 0.1
MiniBatch.train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes,
batch_size,
learning_rate,
training_time
)
end
hundredths_lr_fn = fn ->
learning_rate = 0.01
MiniBatch.train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes,
batch_size,
learning_rate,
training_time
)
end
thousandths_lr_fn = fn ->
learning_rate = 0.001
MiniBatch.train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes,
batch_size,
learning_rate,
training_time
)
end
ten_thousandths_lr_fn = fn ->
learning_rate = 0.0001
MiniBatch.train(
x_train,
y_train,
x_test,
y_test,
n_hidden_nodes,
batch_size,
learning_rate,
training_time
)
end
tenths_lr = MiniBatch.plot_loss(tenths_lr_fn, "lr = 0.1")
hundredths_lr = MiniBatch.plot_loss(hundredths_lr_fn, "lr = 0.01")
thousandths_lr = MiniBatch.plot_loss(thousandths_lr_fn, "lr = 0.001")
ten_thousandths_lr = MiniBatch.plot_loss(ten_thousandths_lr_fn, "lr = 0.0001")
learning_rates = [tenths_lr, hundredths_lr, thousandths_lr, ten_thousandths_lr]
VegaLite.new(width: 500, height: 500, title: "Stochastic GD with Different Learning Rates")
|> VegaLite.layers(
learning_rates
|> Enum.map(fn learning_rate ->
VegaLite.new()
|> VegaLite.data_from_values(learning_rate)
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "elapsed_time", type: :quantitative, title: "Seconds")
|> VegaLite.encode_field(:y, "loss", type: :quantitative, title: "Loss")
|> VegaLite.encode(:color, field: "type", title: "Learning Rates")
end)
)
With different learning rates, 0.0001 gives the smoothest loss. As observed, the smoothness of the loss if inversely proportional to the learning rate. Although the smoothest loss does not give the fastest feedback nor the lowest loss. The learning rate of 0.01 gives both the best loss and the fastest feedback.