Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Machine Learning in Elixir (Ch 6+)

ml_ch6.livemd

Machine Learning in Elixir (Ch 6+)

Mix.install(
  [
    # {:axon_onnx, github: "mortont/axon_onnx", override: true},
    # {:axon, "~> 0.5"},
    # {:axon, "~> 0.5"},
    # {:bumblebee, ">= 0.0.0"},
    {:bumblebee, github: "elixir-nx/bumblebee", override: true},
    {:nx, "~> 0.5"},
    {:polaris, "~> 0.1"},
    {:explorer, "~> 0.5"},
    {:kino, "~> 0.8"},
    {:kino_bumblebee, ">= 0.0.0"},
    {:scholar, "~> 0.3.1"},
    {:exla, "~> 0.6"},
    {:benchee, github: "bencheeorg/benchee", override: true},
    {:table_rex, "~> 3.1.1"},
    {:scidata, "~> 0.1"},
    {:stb_image, "~> 0.6"},
    {:vega_lite, "~> 0.1"},
    {:kino_vega_lite, "~> 0.1"}
  ],
  config: [
    nx: [
      default_backend: {EXLA.Backend, client: :cuda},
      default_defn_options: [compiler: EXLA, client: :cuda]
    ],
    exla: [
      clients: [
        cuda: [platform: :cuda, preallocate: false]
      ]
    ]
  ],
  system_env: [
    XLA_TARGET: "cuda120",
  ]
)

Setup v2

require Explorer.DataFrame, as: DF
alias VegaLite, as: Vl

Chapter 6

Nx.add(Nx.tensor([1, 2, 3]), Nx.tensor([1, 2, 3]))

Neural network

defmodule NeuralNetwork do
  import Nx.Defn

  defn hidden(input, weight, bias) do
    input
    |> dense(weight, bias)
    |> activation()
  end

  defn output(input, weight, bias) do
    input
    |> dense(weight, bias)
    |> activation()
  end

  defn predict(input, w1, b1, w2, b2) do
    input
    |> hidden(w1, b1)
    |> output(w2, b2)
  end

  defn dense(input, weight, bias) do
    input
    |> Nx.dot(weight)
    |> Nx.add(bias)
  end

  defn activation(input) do
    Nx.sigmoid(input)
  end
end
key = Nx.Random.key(42)
{w1, new_key} = Nx.Random.uniform(key)
{b1, new_key} = Nx.Random.uniform(new_key)
{w2, new_key} = Nx.Random.uniform(new_key)
{b2, new_key} = Nx.Random.uniform(new_key)

{input, _new_key} = Nx.Random.uniform(new_key, shape: {})

input
|> NeuralNetwork.predict(w1, b2, w2, b2)

Axon

{images, labels} = Scidata.MNIST.download()
{image_data, image_type, image_shape} = images
{label_data, label_type, label_shape} = labels

images =
  image_data
  |> Nx.from_binary(image_type)
  |> Nx.divide(255)
  |> Nx.reshape({60000, :auto})

labels =
  label_data
  |> Nx.from_binary(label_type)
  |> Nx.reshape(label_shape)
  |> Nx.new_axis(-1)
  |> Nx.equal(Nx.iota({1, 10}))
train_range = 0..49_999//1
test_range = 50_000..-1//1

train_images = images[train_range]
train_labels = labels[train_range]

test_images = images[test_range]
test_labels = labels[test_range]
batch_size = 64

train_data =
  train_images
  |> Nx.to_batched(batch_size)
  |> Stream.zip(Nx.to_batched(train_labels, batch_size))

test_data =
  test_images
  |> Nx.to_batched(batch_size)
  |> Stream.zip(Nx.to_batched(test_labels, batch_size))

Building the model with Axon

model =
  Axon.input("images", shape: {nil, 784})
  |> Axon.dense(128, activation: :relu)
  |> Axon.dense(128, activation: :relu)
  |> Axon.dense(10, activation: :softmax)
template = Nx.template({1, 784}, :f32)
Axon.Display.as_graph(model, template)
Axon.Display.as_table(model, template)
|> IO.puts()
IO.inspect(model, structs: false)

Training the model

trained_model_state =
  model
  |> Axon.Loop.trainer(:categorical_cross_entropy, :sgd)
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.run(train_data, %{}, epochs: 10, compiler: EXLA)

Evaluating the model

model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_data, trained_model_state, compiler: EXLA)

Executing models with Axon

{test_batch, _} = Enum.at(test_data, 0)
test_image = test_batch[0]

test_image
|> Nx.reshape({28, 28})
|> Nx.to_heatmap()
{_, predict_fn} = Axon.build(model, compiler: EXLA)

probabilities =
  test_image
  |> Nx.new_axis(0)
  |> then(&predict_fn.(trained_model_state, &1))
probabilities |> Nx.argmax()

Chapter 7

Creating a pipeline

defmodule CatsAndDogs do
  def pipeline(paths, batch_size, target_height, target_width) do
    paths
    |> Enum.shuffle()
    |> Task.async_stream(&parse_image/1)
    |> Stream.filter(fn
      {:ok, {%StbImage{}, _}} -> true
      _ -> false
    end)
    |> Stream.map(&to_tensors(&1, target_height, target_width))
    |> Stream.chunk_every(batch_size, batch_size, :discard)
    |> Stream.map(fn chunks ->
      {img_chunk, label_chunk} = Enum.unzip(chunks)
      {Nx.stack(img_chunk), Nx.stack(label_chunk)}
    end)
  end

  def pipeline_with_aug(paths, batch_size, target_height, target_width) do
    paths
    |> Enum.shuffle()
    |> Task.async_stream(&parse_image/1)
    |> Stream.filter(fn
      {:ok, {%StbImage{}, _}} -> true
      _ -> false
    end)
    |> Stream.map(&to_tensors(&1, target_height, target_width))
    |> Stream.map(&random_flip(&1, :height))
    |> Stream.map(&random_flip(&1, :width))
    |> Stream.chunk_every(batch_size, batch_size, :discard)
    |> Stream.map(fn chunks ->
      {img_chunk, label_chunk} = Enum.unzip(chunks)
      {Nx.stack(img_chunk), Nx.stack(label_chunk)}
    end)
  end

  defp parse_image(path) do
    base = Path.basename(path)
    label = if String.contains?(base, "cat"), do: 0, else: 1

    case StbImage.read_file(path) do
      {:ok, img} -> {img, label}
      _error -> :error
    end
  end

  defp to_tensors({:ok, {img, label}}, target_height, target_width) do
    img_tensor =
      img
      |> StbImage.resize(target_height, target_width)
      |> StbImage.to_nx()
      |> Nx.divide(255)

    label_tensor = Nx.tensor([label])

    {img_tensor, label_tensor}
  end

  defp random_flip({image, label}, axis) do
    if :rand.uniform() < 0.5 do
      {Nx.reverse(image, axes: [axis]), label}
    else
      {image, label}
    end
  end
end
batch_size = 128
target_height = 96
target_width = 96

{test_paths, train_paths} =
  Path.wildcard("train/cats_dogs/*.jpg")
  |> Enum.shuffle()
  |> Enum.split(1000)

{test_paths, val_paths} = test_paths |> Enum.split(750)

train_pipeline = CatsAndDogs.pipeline_with_aug(
  train_paths, batch_size, target_height, target_width
)
val_pipeline = CatsAndDogs.pipeline(
  val_paths, batch_size, target_height, target_width
)
test_pipeline = CatsAndDogs.pipeline(
  test_paths, batch_size, target_height, target_width
)

Enum.take(train_pipeline, 1)

Training the MLP

mlp_model =
  Axon.input("images", shape: {nil, target_height, target_width, 3})
  |> Axon.flatten()
  |> Axon.dense(256, activation: :relu)
  |> Axon.dense(128, activation: :relu)
  |> Axon.dense(1, activation: :sigmoid)
mlp_trained_model_state =
  mlp_model
  |> Axon.Loop.trainer(:binary_cross_entropy, :adam)
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.run(train_pipeline, %{}, epochs: 5, compiler: EXLA)
mlp_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, mlp_trained_model_state, compiler: EXLA)

Convolutional Networks

path = "train/cats_dogs/dog.5.jpg"
img =
  path
  |> StbImage.read_file!()
  |> StbImage.to_nx()
  |> Nx.transpose(axes: [:channels, :height, :width])
  |> Nx.new_axis(0)

kernel = Nx.tensor([
  [-1, 0, 1],
  [-1, 0, 1],
  [-1, 0, 1]
])
kernel = kernel |> Nx.reshape({1, 1, 3, 3}) |> Nx.broadcast({3, 3, 3, 3})

img
|> Nx.conv(kernel)
|> Nx.as_type({:u, 8})
|> Nx.squeeze(axes: [0])
|> Nx.transpose(axes: [:height, :width, :channels])
|> Kino.Image.new()

Implementing CNNs

cnn_model =
  Axon.input("images", shape: {nil, 96, 96, 3})
  |> Axon.conv(32, kernel_size: {3, 3}, activation: :relu, padding: :same)
  |> Axon.batch_norm()
  |> Axon.max_pool(kernel_size: {2, 2}, strides: [2, 2])
  |> Axon.conv(64, kernel_size: {3, 3}, activation: :relu, padding: :same)
  |> Axon.batch_norm()
  |> Axon.max_pool(kernel_size: {2, 2}, strides: [2, 2])
  |> Axon.conv(128, kernel_size: {3, 3}, activation: :relu, padding: :same)
  |> Axon.max_pool(kernel_size: {2, 2}, strides: [2, 2])
  |> Axon.flatten()
  |> Axon.dense(128, activation: :relu)
  |> Axon.dropout(rate: 0.5)
  |> Axon.dense(1, activation: :sigmoid)
template = Nx.template({1, 96, 96, 3}, :f32)
Axon.Display.as_graph(cnn_model, template)
cnn_trained_model_state =
  cnn_model
  |> Axon.Loop.trainer(:binary_cross_entropy, Polaris.Optimizers.adam(learning_rate: 1.0e-3))
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.validate(cnn_model, val_pipeline)
  |> Axon.Loop.early_stop("validation_loss", mode: :min)
  |> Axon.Loop.run(train_pipeline, %{}, epochs: 100, compiler: EXLA)
cnn_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, cnn_trained_model_state, compiler: EXLA)

Chapter 8 - Vision (Convolutional NNs)

defmodule CatsAndDogs do
  def pipeline(paths, batch_size, target_height, target_width) do
    paths
    |> Enum.shuffle()
    |> Task.async_stream(&amp;parse_image/1)
    |> Stream.filter(fn
      {:ok, {%StbImage{}, _}} -> true
      _ -> false
    end)
    |> Stream.map(&amp;to_tensors(&amp;1, target_height, target_width))
    |> Stream.chunk_every(batch_size, batch_size, :discard)
    |> Stream.map(fn chunks ->
      {img_chunk, label_chunk} = Enum.unzip(chunks)
      {Nx.stack(img_chunk), Nx.stack(label_chunk)}
    end)
  end
  
  def pipeline_with_augs(paths, batch_size, target_height, target_width) do
    paths
    |> Enum.shuffle()
    |> Task.async_stream(&amp;parse_image/1)
    |> Stream.filter(fn
      {:ok, {%StbImage{}, _}} -> true
      _ -> false
    end)
    |> Stream.map(&amp;to_tensors(&amp;1, target_height, target_width))
    |> Stream.map(&amp;random_flip(&amp;1, :height))
    |> Stream.map(&amp;random_flip(&amp;1, :width))
    |> Stream.chunk_every(batch_size, batch_size, :discard)
    |> Stream.map(fn chunks ->
      {img_chunk, label_chunk} = Enum.unzip(chunks)
      {Nx.stack(img_chunk), Nx.stack(label_chunk)}
    end)
  end

  defp random_flip({image, label}, axis) do
    if :rand.uniform() < 0.5 do
      {Nx.reverse(image, axes: [axis]), label}
    else
      {image, label}
    end
  end

  defp parse_image(path) do
    base = Path.basename(path)
    label = if String.contains?(base, "cat"), do: 0, else: 1

    case StbImage.read_file(path) do
      {:ok, img} -> {img, label}
      _error -> :error
    end
  end

  defp to_tensors({:ok, {img, label}}, target_height, target_width) do
    img_tensor =
      img
      |> StbImage.resize(target_height, target_width)
      |> StbImage.to_nx()
      |> Nx.divide(255)
      |> Nx.transpose(axes: [:channels, :height, :width])

    label_tensor = Nx.tensor([label])
    {img_tensor, label_tensor}
  end
end
batch_size = 32
target_height = 160
target_width = 160

{test_paths, train_paths} =
  Path.wildcard("/data/train/cats_dogs/*.jpg")
  |> Enum.shuffle()
  |> Enum.split(1000)

{test_paths, val_paths} = test_paths |> Enum.split(750)

train_pipeline =
  CatsAndDogs.pipeline_with_augs(
    train_paths,
    batch_size,
    target_height,
    target_width
  )

test_pipeline =
  CatsAndDogs.pipeline(
    test_paths,
    batch_size,
    target_height,
    target_width
  )

val_pipeline =
  CatsAndDogs.pipeline(
    val_paths,
    batch_size,
    target_height,
    target_width
  )

Enum.take(train_pipeline, 1)
{cnn_base, cnn_base_params} = AxonOnnx.import(
  "/data/train/mobilenetv2-7.onnx", batch_size: batch_size
)
input_template = Nx.template({1, 3, target_height, target_width}, :f32)
Axon.Display.as_graph(cnn_base, input_template)
### Extract the original classification head
{_popped, cnn_base} = cnn_base |> Axon.pop_node()
{_popped, cnn_base} = cnn_base |> Axon.pop_node()

Wrap convolutional base into its own namespace

cnn_base = cnn_base |> Axon.namespace("feature_extractor")

Freeze the convolutional base so that we don’t use it for training

cnn_base = cnn_base |> Axon.freeze()

Flatten the features or use a global pooling layer. Also add some regularization via dropout

model =
  cnn_base
  |> Axon.global_avg_pool(channels: :first)
  |> Axon.dropout(rate: 0.2)
  |> Axon.dense(1)

Create training loop

loss = &amp;Axon.Losses.binary_cross_entropy(&amp;1, &amp;2,
  reduction: :mean,
  from_logits: true
)
optimizer = Polaris.Optimizers.adam(learning_rate: 1.0e-4)

trained_model_state =
  model
  |> Axon.Loop.trainer(loss, optimizer)
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.validate(model, val_pipeline)
  |> Axon.Loop.early_stop("validation_loss", mode: :min, patience: 5)
  |> Axon.Loop.run(
    train_pipeline,
    %{"feature_extractor" => cnn_base_params},
    epochs: 10,
    compiler: EXLA
  )
eval_model = model |> Axon.sigmoid()

eval_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, trained_model_state, compiler: EXLA)

Fine-tuning

model = model |> Axon.unfreeze(up: 50)
loss = &amp;Axon.Losses.binary_cross_entropy(&amp;1, &amp;2,
  reduction: :mean,
  from_logits: true)

optimizer = Polaris.Optimizers.rmsprop(learning_rate: 1.0e-5)

trained_model_state =
  model
  |> Axon.Loop.trainer(loss, optimizer)
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.validate(model, val_pipeline)
  |> Axon.Loop.early_stop("validation_loss", mode: :min, patience: 5)
  |> Axon.Loop.run(
    train_pipeline,
    trained_model_state,
    epochs: 1,
    compiler: EXLA
  )
eval_model = model |> Axon.sigmoid()

eval_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, trained_model_state, compiler: EXLA)

Chapter 9 - Text (RNNs)

data = Scidata.IMDBReviews.download()
{train_data, test_data} =
  data.review
  |> Enum.zip(data.sentiment)
  |> Enum.shuffle()
  |> Enum.split(23_000)
frequencies =
  Enum.reduce(train_data, %{}, fn {review, _}, tokens ->
    review
    |> String.downcase()
    |> String.replace(~r/[\p{P}\p{S}]/, "")
    |> String.split()
    |> Enum.reduce(tokens, &amp;Map.update(&amp;2, &amp;1, 1, fn x -> x + 1 end))
  end)
num_tokens = 1024
tokens =
  frequencies
  |> Enum.sort_by(&amp;elem(&amp;1, 1), :desc)
  |> Enum.take(num_tokens)
  |> Enum.with_index(fn {token, _}, i -> {token, i} end)
  |> Map.new()
tokenize = fn review ->
  review
  |> String.downcase()
  |> String.replace(~r/[\p{P}\p{S}]/, "")
  |> String.split()
  |> Enum.map(&amp;Map.get(tokens, &amp;1))
end

Example :

review = "The Departed is Martin Scorsese's best work, and anybody who disagrees
is wrong. This movie is amazing."

tokenize.(review)

nil represents out of vocab token. Let’s account for them, and use the value 0

# Add padding
pad_token = 0
unknown_token = 1

max_seq_len = 64

tokens =
  frequencies
  |> Enum.sort_by(&amp;elem(&amp;1, 1), :desc)
  |> Enum.take(num_tokens)
  |> Enum.with_index(fn {token, _}, i -> {token, i + 2} end)
  |> Map.new()

tokenize = fn review ->
  review
  |> String.downcase()
  |> String.replace(~r/[\p{P}\p{S}]/, "")
  |> String.split()
  |> Enum.map(&amp;Map.get(tokens, &amp;1, unknown_token))
  |> Nx.tensor()
  |> then(&amp;Nx.pad(&amp;1, pad_token, [{0, max_seq_len - Nx.size(&amp;1), 0}]))
end
tokenize.(review)

Testing out how padding works

ten = Nx.tensor([[1, 2, 3], [4, 5, 6]]) |> IO.inspect(label: "ten")
Nx.pad(ten, 0, [{1, 1, 0}, {1, 0, 1}])

Create the input pipeline

batch_size = 64

train_pipeline =
  train_data
  |> Stream.map(fn {review, label} ->
    {tokenize.(review), Nx.tensor(label)}
  end)
  |> Stream.chunk_every(batch_size, batch_size, :discard)
  |> Stream.map(fn reviews_and_labels ->
    {reviews, labels} = Enum.unzip(reviews_and_labels)
    {Nx.stack(reviews), Nx.stack(labels) |> Nx.new_axis(-1)}
  end)

test_pipeline =
  test_data
  |> Stream.map(fn {review, label} ->
    {tokenize.(review), Nx.tensor(label)}
  end)
  |> Stream.chunk_every(batch_size, batch_size, :discard)
  |> Stream.map(fn reviews_and_labels ->
    {reviews, labels} = Enum.unzip(reviews_and_labels)
    {Nx.stack(reviews), Nx.stack(labels) |> Nx.new_axis(-1)}
  end)

Enum.take(train_pipeline, 1)

Create simple MLP model

model =
  Axon.input("review")
  |> Axon.embedding(num_tokens + 2, 64)
  |> Axon.flatten()
  |> Axon.dense(64, activation: :relu)
  |> Axon.dense(1)
input_template = Nx.template({64, 64}, :s64)
IO.puts(Axon.Display.as_table(model, input_template))
loss = &amp;Axon.Losses.binary_cross_entropy(&amp;1, &amp;2,
  from_logits: true,
  reduction: :mean
)
optimizer = Polaris.Optimizers.adam(learning_rate: 1.0e-4)

trained_model_state =
  model
  |> Axon.Loop.trainer(loss, optimizer)
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.run(train_pipeline, %{}, epochs: 10, compiler: EXLA)
model
  |> Axon.Loop.evaluator()
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.run(test_pipeline, trained_model_state, compiler: EXLA)

RNN implementation

sequence = Axon.input("review") # {batch_size, sequence_length}
embedded = sequence |> Axon.embedding(num_tokens + 2, 64)
mask = Axon.mask(sequence, 0) # Ignore token = 0, the padding token

{rnn_sequence, _state} = Axon.lstm(embedded, 64, mask: mask, unroll: :static)
final_token = Axon.nx(rnn_sequence, fn seq ->
  Nx.squeeze(seq[[0..-1//1, -1, 0..-1//1]])
end)

model =
  final_token
  |> Axon.dense(64, activation: :relu)
  |> Axon.dense(1)
loss = &amp;Axon.Losses.binary_cross_entropy(&amp;1, &amp;2,
  from_logits: true,
  reduction: :mean
)
optimizer = Polaris.Optimizers.adam(learning_rate: 1.0e-4)

trained_model_state =
  model
  |> Axon.Loop.trainer(loss, optimizer)
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.run(train_pipeline, %{}, epochs: 10, compiler: EXLA)
model
  |> Axon.Loop.evaluator()
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.run(test_pipeline, trained_model_state, compiler: EXLA)

Implementing bidirectional RNNs

sequence = Axon.input("review")
mask = Axon.mask(sequence, 0)
embedded = Axon.embedding(sequence, num_tokens + 2, 64)

{rnn_sequence, _state} = Axon.bidirectional(
  embedded, &amp;Axon.lstm(&amp;1, 64, mask: mask, unroll: :static),
  &amp;Axon.concatenate/2
)

final_token = Axon.nx(rnn_sequence, fn seq ->
  Nx.squeeze(seq[[0..-1//1, -1, 0..-1//1]])
end)

model =
  final_token
  |> Axon.dense(64, activation: :relu)
  |> Axon.dense(1)
loss = &amp;Axon.Losses.binary_cross_entropy(&amp;1, &amp;2,
  from_logits: true,
  reduction: :mean
)
optimizer = Polaris.Optimizers.adam(learning_rate: 1.0e-4)

trained_model_state =
  model
  |> Axon.Loop.trainer(loss, optimizer)
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.run(train_pipeline, %{}, epochs: 10, compiler: EXLA)

Chapter 10 - Time-series

csv_file = "/data/train/djia_30_stock/all_stocks_2006-01-01_to_2018-01-01.csv"
df = Explorer.DataFrame.from_csv!(csv_file, parse_dates: true)
df = Explorer.DataFrame.select(df, ["Date", "Close", "Name"])

Vl.new(title: "DJIA Stock Prices", width: 640, height: 480)
|> Vl.data_from_values(Explorer.DataFrame.to_columns(df))
|> Vl.mark(:line)
|> Vl.encode_field(:x, "Date", type: :temporal)
|> Vl.encode_field(:y, "Close", type: :quantitative)
|> Vl.encode_field(:color, "Name", type: :nominal)
|> Kino.VegaLite.new()
aapl_df = Explorer.DataFrame.filter_with(df, fn df ->
  Explorer.Series.equal(df["Name"], "AAPL")
end)

Vl.new(title: "AAPL Stock Prices", width: 640, height: 480)
|> Vl.data_from_values(Explorer.DataFrame.to_columns(aapl_df))
|> Vl.mark(:line)
|> Vl.encode_field(:x, "Date", type: :temporal)
|> Vl.encode_field(:y, "Close", type: :quantitative)
|> Vl.encode_field(:color, "Name", type: :nominal)
|> Kino.VegaLite.new()
normalized_aapl_df = Explorer.DataFrame.mutate_with(aapl_df, fn df ->
  var = Explorer.Series.variance(df["Close"])
  mean = Explorer.Series.mean(df["Close"])
  centered = Explorer.Series.subtract(df["Close"], mean)
  norm = Explorer.Series.divide(centered, var)
  ["Close": norm]
end)
Vl.new(title: "AAPL Stock Prices", width: 640, height: 480)
|> Vl.data_from_values(Explorer.DataFrame.to_columns(normalized_aapl_df))
|> Vl.mark(:line)
|> Vl.encode_field(:x, "Date", type: :temporal)
|> Vl.encode_field(:y, "Close", type: :quantitative)
|> Vl.encode_field(:color, "Name", type: :nominal)
|> Kino.VegaLite.new()
defmodule Data do
  def window(inputs, window_size, target_window_size) do
    inputs
    |> Stream.chunk_every(window_size + target_window_size, 1, :discard)
    |> Stream.map(fn window ->
      features =
        window
        |> Enum.take(window_size)
        |> Nx.tensor()
        |> Nx.new_axis(-1)

      targets =
        window
        |> Enum.drop(window_size)
        |> Nx.tensor()
        |> Nx.new_axis(-1)

      {features, targets}
    end)
  end

  def batch(inputs, batch_size) do
    inputs
    |> Stream.chunk_every(batch_size, batch_size, :discard)
    |> Stream.map(fn windows ->
      {features, targets} = Enum.unzip(windows)
      {Nx.stack(features), Nx.stack(targets)}
    end)
  end
end

Splitting the data

train_df = Explorer.DataFrame.filter_with(normalized_aapl_df, fn df ->
  Explorer.Series.less(df["Date"], Date.new!(2016, 1, 1))
end)

test_df = Explorer.DataFrame.filter_with(normalized_aapl_df, fn df ->
  Explorer.Series.greater_equal(df["Date"], Date.new!(2016, 1, 1))
end)
window_size = 10
batch_size = 32

train_prices = Explorer.Series.to_list(train_df["Close"])
test_prices = Explorer.Series.to_list(test_df["Close"])

single_step_train_data =
  train_prices
  |> Data.window(window_size, 1)
  |> Data.batch(batch_size)

single_step_test_data =
  test_prices
  |> Data.window(window_size, 1)
  |> Data.batch(batch_size)
Enum.take(single_step_train_data, 1)

Create the convolutional neural network

cnn_model =
  Axon.input("stock_price")
  |> Axon.conv(batch_size, kernel_size: window_size, activation: :relu)
  |> Axon.dense(batch_size, activation: :relu)
  |> Axon.dense(1)
template = Nx.template({batch_size, window_size, 1}, :f32)
Axon.Display.as_graph(cnn_model, template)
# IO.puts(Axon.Display.as_table(cnn_model, template))
cnn_trained_model_state =
  cnn_model
  |> Axon.Loop.trainer(:mean_squared_error, :adam)
  |> Axon.Loop.metric(:mean_absolute_error)
  |> Axon.Loop.run(single_step_train_data, %{}, epochs: 10, compiler: EXLA)
single_test_evaluation =
  cnn_model
  |> Axon.Loop.evaluator()
  |> Axon.Loop.metric(:mean_absolute_error)
  |> Axon.Loop.run(single_step_test_data, cnn_trained_model_state, compiler: EXLA)

Let’s use the mean and variance of AAPL stock prices to get a more accurate estimate.

The result means that over the course of 2 years, our model had an absolute error off the next day’s closing stock price, across each batch.

single_test_evaluation[0]["mean_absolute_error"] |> Nx.to_number()
|> Kernel.*(
  :math.sqrt(Explorer.Series.variance(aapl_df["Close"]))
)
|> Kernel.+(
  Explorer.Series.mean(aapl_df["Close"])
)
defmodule Analysis do
  def visualize_predictions(
    model,
    model_state,
    prices,
    window_size,
    target_window_size,
    batch_size
  ) do
    {_, predict_fn} = Axon.build(model, compiler: EXLA)
    
    windows =
      prices
      |> Data.window(window_size, target_window_size)
      |> Data.batch(batch_size)
      |> Stream.map(&amp;elem(&amp;1, 0))

    predicted = Enum.flat_map(windows, fn window ->
      predict_fn.(model_state, window) |> Nx.to_flat_list()
    end)

    predicted = List.duplicate(nil, 10) ++ predicted

    types =
      List.duplicate("AAPL", length(prices))
      ++ List.duplicate("Predicted", length(prices))

    days =
      Enum.to_list(0..length(prices) - 1)
      ++ Enum.to_list(0..length(prices) - 1)

    prices = prices ++ predicted

    plot(%{
      "day" => days,
      "prices" => prices,
      "types" => types
    }, "AAPL Stock Price vs. Predicted, CNN Single-Shot")
  end

  defp plot(values, title) do
    Vl.new(title: title, width: 640, height: 480)
    |> Vl.data_from_values(values)
    |> Vl.mark(:line)
    |> Vl.encode_field(:x, "day", type: :temporal)
    |> Vl.encode_field(:y, "prices", type: :quantitative)
    |> Vl.encode_field(:color, "types", type: :nominal)
    |> Kino.VegaLite.new()
  end
end
Analysis.visualize_predictions(
  cnn_model,
  cnn_trained_model_state,
  Explorer.Series.to_list(aapl_df["Close"]),
  window_size,
  1,
  batch_size
)

Training a RNN for time-series

rnn_model =
  Axon.input("stock_prices")
  |> Axon.lstm(window_size)
  |> elem(0)
  |> Axon.nx(&amp; &amp;1[[0..-1//1, -1, 0..-1//1]])
  |> Axon.dense(1)
template = Nx.template({batch_size, window_size, 1}, :f32)
Axon.Display.as_graph(rnn_model, template)
rnn_trained_model_state =
  rnn_model
  |> Axon.Loop.trainer(:mean_squared_error, :adam)
  |> Axon.Loop.metric(:mean_absolute_error)
  |> Axon.Loop.run(single_step_train_data, %{}, epochs: 50, compiler: EXLA)
rnn_single_test_eval =
  rnn_model
  |> Axon.Loop.evaluator()
  |> Axon.Loop.metric(:mean_absolute_error)
  |> Axon.Loop.run(single_step_test_data, rnn_trained_model_state, compiler: EXLA)
rnn_single_test_eval[0]["mean_absolute_error"] |> Nx.to_number()
|> Kernel.*(
  :math.sqrt(Explorer.Series.variance(aapl_df["Close"]))
)
|> Kernel.+(
  Explorer.Series.mean(aapl_df["Close"])
)
Analysis.visualize_predictions(
  rnn_model,
  rnn_trained_model_state,
  Explorer.Series.to_list(aapl_df["Close"]),
  window_size,
  1,
  batch_size
)

Chapter 11 - Transformers

auth_token = System.get_env("HF_TOKEN")
repo = {:hf, "google/vit-base-patch16-224", auth_token: auth_token}
{:ok, model_info} = Bumblebee.load_model(repo)
{:ok, featurizer} = Bumblebee.load_featurizer(repo)

serving =
  Bumblebee.Vision.image_classification(model_info, featurizer,
    top_k: 1,
    compile: [batch_size: 1],
    defn_options: [compiler: EXLA]
  )
image_input = Kino.Input.image("Image", size: {224, 224})
form = Kino.Control.form([image: image_input], submit: "Run")
frame = Kino.Frame.new()

form
|> Kino.Control.stream()
|> Stream.filter(&amp; &amp;1.data.image)
|> Kino.listen(fn %{data: %{image: image}} ->
  Kino.Frame.render(frame, Kino.Markdown.new("Running..."))
  image =
    image.file_ref
    |> Kino.Input.file_path()
    |> File.read!()
    |> Nx.from_binary(:u8)
    |> Nx.reshape({image.height, image.width, 3})
  output = Nx.Serving.run(serving, image)

  output.predictions
  |> Enum.map(&amp;{&amp;1.label, &amp;1.score})
  |> Kino.Bumblebee.ScoredList.new()
  |> then(&amp;Kino.Frame.render(frame, &amp;1))
end)

Kino.Layout.grid([form, frame], boxed: true, gap: 16)
{:ok, model} = Bumblebee.load_model(
  {:hf, "facebook/bart-large-mnli"}
)
{:ok, tokenizer} = Bumblebee.load_tokenizer(
  {:hf, "facebook/bart-large-mnli"}
)
model.model
labels = ["New booking", "Update booking", "Cancel booking", "Refund"]
zero_shot_serving =
  Bumblebee.Text.zero_shot_classification(
    model,
    tokenizer,
    labels
  )
input = "I need to book a new flight"
Nx.Serving.run(zero_shot_serving, input)

Passing batches of input to a Serving

inputs = [
  "I want to change my existing flight",
  "I want to cancel my current flight",
  "I demand my money back"
]

Nx.Serving.run(zero_shot_serving, inputs)

Fine tuning pre-trained models (distilbert)

{:ok, spec} = Bumblebee.load_spec({:hf, "distilbert-base-cased"},
  module: Bumblebee.Text.Distilbert,
  architecture: :for_sequence_classification
)
spec = Bumblebee.configure(spec, num_labels: 5)

{:ok, %{model: model, params: params}} = Bumblebee.load_model(
  {:hf, "distilbert-base-cased"},
  spec: spec
)
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "distilbert-base-cased"})

Create the input pipeline

batch_size = 32
max_length = 128

train_data =
  File.stream!("/data/train/yelp_review_full/train.csv")
  |> Stream.chunk_every(batch_size)
  |> Stream.map(fn inputs ->
    {labels, reviews} =
      inputs
      |> Enum.map(fn line ->
        [label, review] = String.split(line, "\",\"")
        {String.trim(label, "\""), String.trim(review, "\"")}
      end)
      |> Enum.unzip()

    labels = labels |> Enum.map(&amp;String.to_integer/1) |> Nx.tensor()
    tokens = Bumblebee.apply_tokenizer(tokenizer, reviews, length: max_length)

    {tokens, labels}
  end)
Enum.take(train_data, 1)

Check model output

Axon.get_output_shape(model, %{"input_ids" => Nx.template({32, 128}, :s64)})

Extract logits

model = Axon.nx(model, fn %{logits: logits} -> logits end)
optimizer = Polaris.Optimizers.adamw(learning_rate: 5.0e-5)
loss = &amp;Axon.Losses.categorical_cross_entropy(&amp;1, &amp;2,
  from_logits: true,
  sparse: true,
  reduction: :mean
)

trained_model_state =
  model
  |> Axon.Loop.trainer(loss, optimizer, log: 1)
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.run(train_data, params, epochs: 3, compiler: EXLA)

Chapter 12 - Unsupervised learning

An autoencoder - an NN that learns to compress data, then a separate NN that is trained to decompress the compressed form

Composed of an encoder and a decoder.

Encoders learn latent representation of input data. Decoders learn to reconstruct input data from the latent representation and back with minimal info loss.

batch_size = 64
{{data, type, shape}, _} = Scidata.MNIST.download()

train_data =
  data
  |> Nx.from_binary(type)
  |> Nx.reshape({:auto, 28, 28, 1})
  |> Nx.divide(255)
  |> Nx.to_batched(batch_size)
defmodule Autoencoder do
  def encoder(input) do
    input
    |> Axon.flatten()
    |> Axon.dense(256, activation: :relu, name: "encoder_dense_0")
    |> Axon.dense(128, activation: :relu, name: "encoder_dense_1")
  end

  def decoder(input) do
    input
    |> Axon.dense(256, activation: :relu, name: "decoder_dense_0")
    |> Axon.dense(784, activation: :sigmoid, name: "decoder_dense_1")
    |> Axon.reshape({:batch, 28, 28, 1})
  end
end
model =
  Axon.input("image")
  |> Autoencoder.encoder()
  |> Autoencoder.decoder()

model
trained_model_state =
  model
  |> Axon.Loop.trainer(:mean_squared_error, Polaris.Optimizers.adam(learning_rate: 1.0e-3))
  |> Axon.Loop.run(
    Stream.zip(train_data, train_data),
    %{},
    epochs: 5,
    compiler: EXLA
  )
test_batch = Enum.at(train_data, 0)
test_image = test_batch[0] |> Nx.new_axis(0)

visualize_test_image = fn
  %Axon.Loop.State{step_state: step_state} = state ->
    out_image = Axon.predict(
      model,
      step_state[:model_state],
      test_image,
      compiler: EXLA
    )
    out_image =
      out_image
      |> Nx.multiply(255)
      |> Nx.as_type(:u8)
      |> Nx.reshape({28, 28, 1})

    Kino.Image.new(out_image) |> Kino.render()
    {:continue, state}
end
trained_model_state =
  model
  |> Axon.Loop.trainer(:mean_squared_error, Polaris.Optimizers.adam(learning_rate: 1.0e-3))
  |> Axon.Loop.handle_event(:epoch_completed, visualize_test_image)
  |> Axon.Loop.run(
    Stream.zip(train_data, train_data),
    %{},
    epochs: 5,
    compiler: EXLA
  )

Let’s try only with decoder

decoder_only =
  Axon.input("noise")
  |> Autoencoder.decoder()

key = Nx.Random.key(42)
{noise, key} = Nx.Random.normal(key, shape: {1, 128})

out_image = Axon.predict(decoder_only, trained_model_state, noise)
upsampled = Axon.Layers.resize(out_image, size: {512, 512})
out_image =
  upsampled
  |> Nx.reshape({512, 512, 1})
  |> Nx.multiply(255)
  |> Nx.as_type(:u8)

Kino.Image.new(out_image)

Variational autoencoders force models to learn a structured latent space

defmodule VAE do
  import Nx.Defn

  def encoder(input) do
    encoded =
      input
      |> Axon.conv(32, kernel_size: 3, activation: :relu, strides: 2, padding: :same)
      |> Axon.conv(32, kernel_size: 3, activation: :relu, strides: 2, padding: :same)
      |> Axon.flatten()
      |> Axon.dense(16, activation: :relu)

    z_mean = Axon.dense(encoded, 2)
    z_log_var = Axon.dense(encoded, 2)
      z = Axon.layer(&amp;sample/3, [z_mean, z_log_var], op_name: :sample)
    Axon.container({z_mean, z_log_var, z})
  end

  def decoder(input) do
    input
    |> Axon.dense(7 * 7 * 64, activation: :relu)
    |> Axon.reshape({:batch, 7, 7, 64})
    |> Axon.conv_transpose(64,
      kernel_size: {3, 3},
      activation: :relu,
      strides: [2, 2],
      padding: :same
    )
    |> Axon.conv_transpose(32,
      kernel_size: {3, 3},
      activation: :relu,
      strides: [2, 2],
      padding: :same
    )
    |> Axon.conv_transpose(1,
      kernel_size: {3, 3},
      activation: :sigmoid,
      padding: :same
    )
  end

  def display_sample(
        %Axon.Loop.State{step_state: state} = out_state,
        decoder_fn
      ) do
    latent = Nx.tensor([[0.0, 0.0], [0.5, 0.5], [1.0, 1.0]])
    %{prediction: out} = decoder_fn.(state[:model_state]["decoder"], latent)
    out_image = Nx.multiply(out, 255) |> Nx.as_type(:u8)

    upsample =
      Axon.Layers.resize(
        out_image,
        size: {512, 512},
        channels: :first
      )

    for i <- 0..2 do
      Kino.Image.new(Nx.reshape(upsample[i], {512, 512, 1})) |> Kino.render()
    end

    {:continue, out_state}
  end

  defn train_step(encoder_fn, decoder_fn, optimizer_fn, batch, state) do
    {batch_loss, joint_param_grads} =
      value_and_grad(
        state[:model_state],
        &amp;joint_objective(encoder_fn, decoder_fn, batch, &amp;1)
      )

    {scaled_updates, new_optimizer_state} =
      optimizer_fn.(
        joint_param_grads,
        state[:optimizer_state],
        state[:model_state]
      )

    new_model_state =
      Polaris.Updates.apply_updates(
        state[:model_state],
        scaled_updates
      )

    new_loss =
      state[:loss]
      |> Nx.multiply(state[:i])
      |> Nx.add(batch_loss)
      |> Nx.divide(Nx.add(state[:i], 1))

    %{
      state
      | i: Nx.add(state[:i], 1),
        loss: new_loss,
        model_state: new_model_state,
        optimizer_state: new_optimizer_state
    }
  end

  defn init_step(
         encoder_init_fn,
         decoder_init_fn,
         optimizer_init_fn,
         batch,
         init_state
       ) do
    encoder_params = encoder_init_fn.(batch, init_state)

    {decoder_params, _key} =
      decoder_init_fn.(Nx.Random.uniform(Nx.Random.key(42), shape: {64, 2}), init_state)

    joint_params = %{
      "encoder" => encoder_params,
      "decoder" => decoder_params
    }

    optimizer_state = optimizer_init_fn.(joint_params)

    %{
      i: Nx.tensor(0),
      loss: Nx.tensor(0.0),
      model_state: joint_params,
      optimizer_state: optimizer_state
    }
  end

  defnp joint_objective(encoder_fn, decoder_fn, batch, joint_params) do
    %{prediction: preds} = encoder_fn.(joint_params["encoder"], batch)
    {z_mean, z_log_var, z} = preds
    %{prediction: reconstruction} = decoder_fn.(joint_params["decoder"], z)

    recon_loss =
      Axon.Losses.binary_cross_entropy(
        batch,
        reconstruction,
        reduction: :mean
      )

    kl_loss = -0.5 * (1 + z_log_var - Nx.pow(z_mean, 2) - Nx.exp(z_log_var))
    kl_loss = Nx.mean(Nx.sum(kl_loss, axes: [1]))

    recon_loss + kl_loss
  end

  defnp sample(z_mean, z_log_var, _opts \\ []) do
    noise_shape = Nx.shape(z_mean)
    {epsilon, _key} = Nx.Random.normal(Nx.Random.key(42), shape: noise_shape)
    z_mean + Nx.exp(0.5 * z_log_var) * epsilon
  end
end

Let’s see what Axon loop under the hood looks like:

encoder = Axon.input("image") |> VAE.encoder()
decoder = Axon.input("latent") |> VAE.decoder()

{encoder_init_fn, encoder_fn} = Axon.build(encoder, mode: :train)
{decoder_init_fn, decoder_fn} = Axon.build(decoder, mode: :train)

{optimizer_init_fn, optimizer_fn} = Polaris.Optimizers.adam(learning_rate: 1.0e-3)

init_fn = &amp;VAE.init_step(
  encoder_init_fn,
  decoder_init_fn,
  optimizer_init_fn,
  &amp;1,
  &amp;2
)

step_fn = &amp;VAE.train_step(
  encoder_fn,
  decoder_fn,
  optimizer_fn,
  &amp;1,
  &amp;2
)
step_fn
|> Axon.Loop.loop(init_fn)
|> Axon.Loop.handle_event(:epoch_completed, &amp;VAE.display_sample(&amp;1, decoder_fn))
|> Axon.Loop.log(fn
  %Axon.Loop.State{epoch: epoch, iteration: iter, step_state: state} ->
    "\rEpoch: #{epoch}, batch: #{iter}, loss: #{Nx.to_number(state[:loss])}"
    end, device: :stdio, event: :iteration_completed)
|> Axon.Loop.run(train_data, %{}, compiler: EXLA, epochs: 10)