Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Chapter 3: Walking the Gradient

03_gradient/gradient_descend.livemd

Chapter 3: Walking the Gradient

Mix.install([
  {:vega_lite, "~> 0.1.6"},
  {:kino, "~> 0.8.1"},
  {:kino_vega_lite, "~> 0.1.7"}
])

Read the data

file =
  __DIR__
  |> Path.join("pizza.txt")
  |> Path.expand()

# Read the data from the file, remove the header and return
# `[%{reservations: integer(), pizzas: integer()}]`
data =
  file
  |> File.read!()
  |> String.split("\n", trim: true)
  |> Enum.slice(1..-1)
  |> Enum.map(&String.split(&1, ~r{\s+}, trim: true))
  |> Enum.map(fn [r, p] -> %{reservations: String.to_integer(r), pizzas: String.to_integer(p)} end)

Kino.DataTable.new(data)

Linear regression with bias

☝️ From chapter 2

defmodule C2.LinearRegressionWithBias do
  @doc """
  Returns a list of predictions.
  """
  def predict([item | rest], weight, bias) do
    [predict(item, weight, bias) | predict(rest, weight, bias)]
  end

  def predict([], _weight, _bias), do: []

  # The function predicts the pizzas from the reservations.
  # To be more precise, it takes the input variable, the weight
  # and the bias, and it uses them to calculate ŷ.
  def predict(x, weight, bias), do: x * weight + bias

  @doc """
  Returns the mean squared error.
  """
  def loss(x, y, weight, bias) when is_list(x) and is_list(y) do
    predictions = predict(x, weight, bias)
    errors = Enum.zip_with([predictions, y], fn [pr, y] -> pr - y end)
    squared_error = square(errors)
    avg(squared_error)
  end

  def train(x, y, iterations, lr) when is_list(x) and is_list(y) do
    Enum.reduce(0..(iterations - 1), %{weight: 0, bias: 0}, fn i, %{weight: w, bias: b} = acc ->
      current_loss = loss(x, y, w, b)

      IO.puts("Iteration #{i} => Loss: #{current_loss}")

      cond do
        loss(x, y, w + lr, b) < current_loss -> %{acc | weight: w + lr}
        loss(x, y, w - lr, b) < current_loss -> %{acc | weight: w - lr}
        loss(x, y, w, b + lr) < current_loss -> %{acc | bias: b + lr}
        loss(x, y, w, b - lr) < current_loss -> %{acc | bias: b - lr}
        true -> acc
      end
    end)
  end

  defp square(list) when is_list(list) do
    for i <- list, do: i * i
  end

  defp avg(list) when is_list(list) do
    Enum.sum(list) / length(list)
  end
end

Plot the loss curve

# Transform the data to unpack the 2 columns `reservations` and
# `pizzas` into separate arrays called x and y
%{x: x, y: y} =
  Enum.reduce(data, %{x: [], y: []}, fn item, %{x: x, y: y} ->
    %{x: x ++ [item.reservations], y: y ++ [item.pizzas]}
  end)
alias VegaLite, as: Vl

# Generate a sequence that will be used as `weight`
# From -1 to -4, step 0.01
weights = Enum.map(-100..400, &amp;(&amp;1 / 100))

# Compute the loss for each weight, with bias=0
losses = Enum.map(weights, &amp;C2.LinearRegressionWithBias.loss(x, y, &amp;1, 0))

# Get the min loss index
min_loss_index = Enum.find_index(losses, &amp;(&amp;1 == Enum.min(losses)))

Vl.new(width: 600, height: 400)
|> Vl.layers([
  Vl.new()
  |> Vl.data_from_values(weight: weights, loss: losses)
  |> Vl.mark(:line)
  |> Vl.encode_field(:x, "weight", type: :quantitative)
  |> Vl.encode_field(:y, "loss", type: :quantitative),
  Vl.new()
  |> Vl.data_from_values(
    weight: [Enum.at(weights, min_loss_index)],
    min_loss: [Enum.at(losses, min_loss_index)]
  )
  |> Vl.mark(:circle, tooltip: true, size: "100", color: "red")
  |> Vl.encode_field(:x, "weight", type: :quantitative)
  |> Vl.encode_field(:y, "min_loss", type: :quantitative, title: "loss")
])

Gradient Descent

defmodule C3.LinearRegressionWithoutBias do
  def predict([item | rest], weight, bias) do
    [predict(item, weight, bias) | predict(rest, weight, bias)]
  end

  def predict([], _weight, _bias), do: []
  def predict(x, weight, bias), do: x * weight + bias

  @doc """
  Returns the mean squared error.
  """
  def loss(x, y, weight, bias) when is_list(x) and is_list(y) do
    predictions = predict(x, weight, bias)
    errors = Enum.zip_with([predictions, y], fn [pr, y] -> pr - y end)
    squared_error = square(errors)
    avg(squared_error)
  end

  @doc """
  Returns the derivative of the loss curve
  """
  def gradient(x, y, weight) do
    predictions = predict(x, weight, 0)
    errors = Enum.zip_with([predictions, y], fn [pr, y] -> pr - y end)
    2 * avg(Enum.zip_with([x, errors], fn [x_item, error] -> x_item * error end))
  end

  def train(x, y, iterations, lr) when is_list(x) and is_list(y) do
    Enum.reduce(0..(iterations - 1), 0, fn i, weight ->
      IO.puts("Iteration #{i} => Loss: #{loss(x, y, weight, 0)}")
      weight - gradient(x, y, weight) * lr
    end)
  end

  defp square(list) when is_list(list) do
    for i <- list, do: i * i
  end

  defp avg(list) when is_list(list) do
    Enum.sum(list) / length(list)
  end
end

Train the system

iterations = Kino.Input.number("iterations", default: 100)
lr = Kino.Input.number("lr (learning rate)", default: 0.001)
iterations = Kino.Input.read(iterations)
lr = Kino.Input.read(lr)

weight = C3.LinearRegressionWithoutBias.train(x, y, iterations = 100, lr = 0.001)

Putting Gradient Descent to the Test

defmodule C3.LinearRegressionWithBias do
  def predict([item | rest], weight, bias) do
    [predict(item, weight, bias) | predict(rest, weight, bias)]
  end

  def predict([], _weight, _bias), do: []
  def predict(x, weight, bias), do: x * weight + bias

  @doc """
  Returns the mean squared error.
  """
  def loss(x, y, weight, bias) when is_list(x) and is_list(y) do
    predictions = predict(x, weight, bias)
    errors = Enum.zip_with([predictions, y], fn [pr, y] -> pr - y end)
    squared_error = square(errors)
    avg(squared_error)
  end

  @doc """
  Returns the derivative of the loss curve
  """
  def gradient(x, y, weight, bias) do
    predictions = predict(x, weight, bias)
    errors = Enum.zip_with([predictions, y], fn [pr, y] -> pr - y end)

    w_gradient = 2 * avg(Enum.zip_with([x, errors], fn [x_item, error] -> x_item * error end))
    b_gradient = 2 * avg(errors)

    {w_gradient, b_gradient}
  end

  def train(x, y, iterations, lr) when is_list(x) and is_list(y) do
    Enum.reduce(0..(iterations - 1), %{weight: 0, bias: 0}, fn i, %{weight: weight, bias: bias} ->
      IO.puts("Iteration #{i} => Loss: #{loss(x, y, weight, bias)}")

      {w_gradient, b_gradient} = gradient(x, y, weight, bias)
      %{weight: weight - w_gradient * lr, bias: bias - b_gradient * lr}
    end)
  end

  defp square(list) when is_list(list) do
    for i <- list, do: i * i
  end

  defp avg(list) when is_list(list) do
    Enum.sum(list) / length(list)
  end
end

Train the system

iterations = Kino.Input.number("iterations", default: 20_000)
lr = Kino.Input.number("lr (learning rate)", default: 0.001)
iterations = Kino.Input.read(iterations)
lr = Kino.Input.read(lr)

%{weight: weight, bias: bias} =
  C3.LinearRegressionWithBias.train(x, y, iterations = iterations, lr = lr)

Predict the number of pizzas

n_reservations = Kino.Input.number("number of reservations", default: 20)
n = Kino.Input.read(n_reservations)

C3.LinearRegressionWithBias.predict(n, weight, bias)