Machine Learning in Elixir (Ch 6+)
Mix.install(
[
# {:axon_onnx, github: "mortont/axon_onnx", override: true},
# {:axon, "~> 0.5"},
# {:axon, "~> 0.5"},
# {:bumblebee, ">= 0.0.0"},
{:bumblebee, github: "elixir-nx/bumblebee", override: true},
{:nx, "~> 0.5"},
{:polaris, "~> 0.1"},
{:explorer, "~> 0.5"},
{:kino, "~> 0.8"},
{:kino_bumblebee, ">= 0.0.0"},
{:scholar, "~> 0.3.1"},
{:exla, "~> 0.6"},
{:benchee, github: "bencheeorg/benchee", override: true},
{:table_rex, "~> 3.1.1"},
{:scidata, "~> 0.1"},
{:stb_image, "~> 0.6"},
{:vega_lite, "~> 0.1"},
{:kino_vega_lite, "~> 0.1"}
],
config: [
nx: [
default_backend: {EXLA.Backend, client: :cuda},
default_defn_options: [compiler: EXLA, client: :cuda]
],
exla: [
clients: [
cuda: [platform: :cuda, preallocate: false]
]
]
],
system_env: [
XLA_TARGET: "cuda120",
]
)
Setup v2
require Explorer.DataFrame, as: DF
alias VegaLite, as: Vl
Chapter 6
Nx.add(Nx.tensor([1, 2, 3]), Nx.tensor([1, 2, 3]))
Neural network
defmodule NeuralNetwork do
import Nx.Defn
defn hidden(input, weight, bias) do
input
|> dense(weight, bias)
|> activation()
end
defn output(input, weight, bias) do
input
|> dense(weight, bias)
|> activation()
end
defn predict(input, w1, b1, w2, b2) do
input
|> hidden(w1, b1)
|> output(w2, b2)
end
defn dense(input, weight, bias) do
input
|> Nx.dot(weight)
|> Nx.add(bias)
end
defn activation(input) do
Nx.sigmoid(input)
end
end
key = Nx.Random.key(42)
{w1, new_key} = Nx.Random.uniform(key)
{b1, new_key} = Nx.Random.uniform(new_key)
{w2, new_key} = Nx.Random.uniform(new_key)
{b2, new_key} = Nx.Random.uniform(new_key)
{input, _new_key} = Nx.Random.uniform(new_key, shape: {})
input
|> NeuralNetwork.predict(w1, b2, w2, b2)
Axon
{images, labels} = Scidata.MNIST.download()
{image_data, image_type, image_shape} = images
{label_data, label_type, label_shape} = labels
images =
image_data
|> Nx.from_binary(image_type)
|> Nx.divide(255)
|> Nx.reshape({60000, :auto})
labels =
label_data
|> Nx.from_binary(label_type)
|> Nx.reshape(label_shape)
|> Nx.new_axis(-1)
|> Nx.equal(Nx.iota({1, 10}))
train_range = 0..49_999//1
test_range = 50_000..-1//1
train_images = images[train_range]
train_labels = labels[train_range]
test_images = images[test_range]
test_labels = labels[test_range]
batch_size = 64
train_data =
train_images
|> Nx.to_batched(batch_size)
|> Stream.zip(Nx.to_batched(train_labels, batch_size))
test_data =
test_images
|> Nx.to_batched(batch_size)
|> Stream.zip(Nx.to_batched(test_labels, batch_size))
Building the model with Axon
model =
Axon.input("images", shape: {nil, 784})
|> Axon.dense(128, activation: :relu)
|> Axon.dense(128, activation: :relu)
|> Axon.dense(10, activation: :softmax)
template = Nx.template({1, 784}, :f32)
Axon.Display.as_graph(model, template)
Axon.Display.as_table(model, template)
|> IO.puts()
IO.inspect(model, structs: false)
Training the model
trained_model_state =
model
|> Axon.Loop.trainer(:categorical_cross_entropy, :sgd)
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(train_data, %{}, epochs: 10, compiler: EXLA)
Evaluating the model
model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_data, trained_model_state, compiler: EXLA)
Executing models with Axon
{test_batch, _} = Enum.at(test_data, 0)
test_image = test_batch[0]
test_image
|> Nx.reshape({28, 28})
|> Nx.to_heatmap()
{_, predict_fn} = Axon.build(model, compiler: EXLA)
probabilities =
test_image
|> Nx.new_axis(0)
|> then(&predict_fn.(trained_model_state, &1))
probabilities |> Nx.argmax()
Chapter 7
Creating a pipeline
defmodule CatsAndDogs do
def pipeline(paths, batch_size, target_height, target_width) do
paths
|> Enum.shuffle()
|> Task.async_stream(&parse_image/1)
|> Stream.filter(fn
{:ok, {%StbImage{}, _}} -> true
_ -> false
end)
|> Stream.map(&to_tensors(&1, target_height, target_width))
|> Stream.chunk_every(batch_size, batch_size, :discard)
|> Stream.map(fn chunks ->
{img_chunk, label_chunk} = Enum.unzip(chunks)
{Nx.stack(img_chunk), Nx.stack(label_chunk)}
end)
end
def pipeline_with_aug(paths, batch_size, target_height, target_width) do
paths
|> Enum.shuffle()
|> Task.async_stream(&parse_image/1)
|> Stream.filter(fn
{:ok, {%StbImage{}, _}} -> true
_ -> false
end)
|> Stream.map(&to_tensors(&1, target_height, target_width))
|> Stream.map(&random_flip(&1, :height))
|> Stream.map(&random_flip(&1, :width))
|> Stream.chunk_every(batch_size, batch_size, :discard)
|> Stream.map(fn chunks ->
{img_chunk, label_chunk} = Enum.unzip(chunks)
{Nx.stack(img_chunk), Nx.stack(label_chunk)}
end)
end
defp parse_image(path) do
base = Path.basename(path)
label = if String.contains?(base, "cat"), do: 0, else: 1
case StbImage.read_file(path) do
{:ok, img} -> {img, label}
_error -> :error
end
end
defp to_tensors({:ok, {img, label}}, target_height, target_width) do
img_tensor =
img
|> StbImage.resize(target_height, target_width)
|> StbImage.to_nx()
|> Nx.divide(255)
label_tensor = Nx.tensor([label])
{img_tensor, label_tensor}
end
defp random_flip({image, label}, axis) do
if :rand.uniform() < 0.5 do
{Nx.reverse(image, axes: [axis]), label}
else
{image, label}
end
end
end
batch_size = 128
target_height = 96
target_width = 96
{test_paths, train_paths} =
Path.wildcard("train/cats_dogs/*.jpg")
|> Enum.shuffle()
|> Enum.split(1000)
{test_paths, val_paths} = test_paths |> Enum.split(750)
train_pipeline = CatsAndDogs.pipeline_with_aug(
train_paths, batch_size, target_height, target_width
)
val_pipeline = CatsAndDogs.pipeline(
val_paths, batch_size, target_height, target_width
)
test_pipeline = CatsAndDogs.pipeline(
test_paths, batch_size, target_height, target_width
)
Enum.take(train_pipeline, 1)
Training the MLP
mlp_model =
Axon.input("images", shape: {nil, target_height, target_width, 3})
|> Axon.flatten()
|> Axon.dense(256, activation: :relu)
|> Axon.dense(128, activation: :relu)
|> Axon.dense(1, activation: :sigmoid)
mlp_trained_model_state =
mlp_model
|> Axon.Loop.trainer(:binary_cross_entropy, :adam)
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(train_pipeline, %{}, epochs: 5, compiler: EXLA)
mlp_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, mlp_trained_model_state, compiler: EXLA)
Convolutional Networks
path = "train/cats_dogs/dog.5.jpg"
img =
path
|> StbImage.read_file!()
|> StbImage.to_nx()
|> Nx.transpose(axes: [:channels, :height, :width])
|> Nx.new_axis(0)
kernel = Nx.tensor([
[-1, 0, 1],
[-1, 0, 1],
[-1, 0, 1]
])
kernel = kernel |> Nx.reshape({1, 1, 3, 3}) |> Nx.broadcast({3, 3, 3, 3})
img
|> Nx.conv(kernel)
|> Nx.as_type({:u, 8})
|> Nx.squeeze(axes: [0])
|> Nx.transpose(axes: [:height, :width, :channels])
|> Kino.Image.new()
Implementing CNNs
cnn_model =
Axon.input("images", shape: {nil, 96, 96, 3})
|> Axon.conv(32, kernel_size: {3, 3}, activation: :relu, padding: :same)
|> Axon.batch_norm()
|> Axon.max_pool(kernel_size: {2, 2}, strides: [2, 2])
|> Axon.conv(64, kernel_size: {3, 3}, activation: :relu, padding: :same)
|> Axon.batch_norm()
|> Axon.max_pool(kernel_size: {2, 2}, strides: [2, 2])
|> Axon.conv(128, kernel_size: {3, 3}, activation: :relu, padding: :same)
|> Axon.max_pool(kernel_size: {2, 2}, strides: [2, 2])
|> Axon.flatten()
|> Axon.dense(128, activation: :relu)
|> Axon.dropout(rate: 0.5)
|> Axon.dense(1, activation: :sigmoid)
template = Nx.template({1, 96, 96, 3}, :f32)
Axon.Display.as_graph(cnn_model, template)
cnn_trained_model_state =
cnn_model
|> Axon.Loop.trainer(:binary_cross_entropy, Polaris.Optimizers.adam(learning_rate: 1.0e-3))
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.validate(cnn_model, val_pipeline)
|> Axon.Loop.early_stop("validation_loss", mode: :min)
|> Axon.Loop.run(train_pipeline, %{}, epochs: 100, compiler: EXLA)
cnn_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, cnn_trained_model_state, compiler: EXLA)
Chapter 8 - Vision (Convolutional NNs)
defmodule CatsAndDogs do
def pipeline(paths, batch_size, target_height, target_width) do
paths
|> Enum.shuffle()
|> Task.async_stream(&parse_image/1)
|> Stream.filter(fn
{:ok, {%StbImage{}, _}} -> true
_ -> false
end)
|> Stream.map(&to_tensors(&1, target_height, target_width))
|> Stream.chunk_every(batch_size, batch_size, :discard)
|> Stream.map(fn chunks ->
{img_chunk, label_chunk} = Enum.unzip(chunks)
{Nx.stack(img_chunk), Nx.stack(label_chunk)}
end)
end
def pipeline_with_augs(paths, batch_size, target_height, target_width) do
paths
|> Enum.shuffle()
|> Task.async_stream(&parse_image/1)
|> Stream.filter(fn
{:ok, {%StbImage{}, _}} -> true
_ -> false
end)
|> Stream.map(&to_tensors(&1, target_height, target_width))
|> Stream.map(&random_flip(&1, :height))
|> Stream.map(&random_flip(&1, :width))
|> Stream.chunk_every(batch_size, batch_size, :discard)
|> Stream.map(fn chunks ->
{img_chunk, label_chunk} = Enum.unzip(chunks)
{Nx.stack(img_chunk), Nx.stack(label_chunk)}
end)
end
defp random_flip({image, label}, axis) do
if :rand.uniform() < 0.5 do
{Nx.reverse(image, axes: [axis]), label}
else
{image, label}
end
end
defp parse_image(path) do
base = Path.basename(path)
label = if String.contains?(base, "cat"), do: 0, else: 1
case StbImage.read_file(path) do
{:ok, img} -> {img, label}
_error -> :error
end
end
defp to_tensors({:ok, {img, label}}, target_height, target_width) do
img_tensor =
img
|> StbImage.resize(target_height, target_width)
|> StbImage.to_nx()
|> Nx.divide(255)
|> Nx.transpose(axes: [:channels, :height, :width])
label_tensor = Nx.tensor([label])
{img_tensor, label_tensor}
end
end
batch_size = 32
target_height = 160
target_width = 160
{test_paths, train_paths} =
Path.wildcard("/data/train/cats_dogs/*.jpg")
|> Enum.shuffle()
|> Enum.split(1000)
{test_paths, val_paths} = test_paths |> Enum.split(750)
train_pipeline =
CatsAndDogs.pipeline_with_augs(
train_paths,
batch_size,
target_height,
target_width
)
test_pipeline =
CatsAndDogs.pipeline(
test_paths,
batch_size,
target_height,
target_width
)
val_pipeline =
CatsAndDogs.pipeline(
val_paths,
batch_size,
target_height,
target_width
)
Enum.take(train_pipeline, 1)
{cnn_base, cnn_base_params} = AxonOnnx.import(
"/data/train/mobilenetv2-7.onnx", batch_size: batch_size
)
input_template = Nx.template({1, 3, target_height, target_width}, :f32)
Axon.Display.as_graph(cnn_base, input_template)
### Extract the original classification head
{_popped, cnn_base} = cnn_base |> Axon.pop_node()
{_popped, cnn_base} = cnn_base |> Axon.pop_node()
Wrap convolutional base into its own namespace
cnn_base = cnn_base |> Axon.namespace("feature_extractor")
Freeze the convolutional base so that we don’t use it for training
cnn_base = cnn_base |> Axon.freeze()
Flatten the features or use a global pooling layer. Also add some regularization via dropout
model =
cnn_base
|> Axon.global_avg_pool(channels: :first)
|> Axon.dropout(rate: 0.2)
|> Axon.dense(1)
Create training loop
loss = &Axon.Losses.binary_cross_entropy(&1, &2,
reduction: :mean,
from_logits: true
)
optimizer = Polaris.Optimizers.adam(learning_rate: 1.0e-4)
trained_model_state =
model
|> Axon.Loop.trainer(loss, optimizer)
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.validate(model, val_pipeline)
|> Axon.Loop.early_stop("validation_loss", mode: :min, patience: 5)
|> Axon.Loop.run(
train_pipeline,
%{"feature_extractor" => cnn_base_params},
epochs: 10,
compiler: EXLA
)
eval_model = model |> Axon.sigmoid()
eval_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, trained_model_state, compiler: EXLA)
Fine-tuning
model = model |> Axon.unfreeze(up: 50)
loss = &Axon.Losses.binary_cross_entropy(&1, &2,
reduction: :mean,
from_logits: true)
optimizer = Polaris.Optimizers.rmsprop(learning_rate: 1.0e-5)
trained_model_state =
model
|> Axon.Loop.trainer(loss, optimizer)
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.validate(model, val_pipeline)
|> Axon.Loop.early_stop("validation_loss", mode: :min, patience: 5)
|> Axon.Loop.run(
train_pipeline,
trained_model_state,
epochs: 1,
compiler: EXLA
)
eval_model = model |> Axon.sigmoid()
eval_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, trained_model_state, compiler: EXLA)
Chapter 9 - Text (RNNs)
data = Scidata.IMDBReviews.download()
{train_data, test_data} =
data.review
|> Enum.zip(data.sentiment)
|> Enum.shuffle()
|> Enum.split(23_000)
frequencies =
Enum.reduce(train_data, %{}, fn {review, _}, tokens ->
review
|> String.downcase()
|> String.replace(~r/[\p{P}\p{S}]/, "")
|> String.split()
|> Enum.reduce(tokens, &Map.update(&2, &1, 1, fn x -> x + 1 end))
end)
num_tokens = 1024
tokens =
frequencies
|> Enum.sort_by(&elem(&1, 1), :desc)
|> Enum.take(num_tokens)
|> Enum.with_index(fn {token, _}, i -> {token, i} end)
|> Map.new()
tokenize = fn review ->
review
|> String.downcase()
|> String.replace(~r/[\p{P}\p{S}]/, "")
|> String.split()
|> Enum.map(&Map.get(tokens, &1))
end
Example :
review = "The Departed is Martin Scorsese's best work, and anybody who disagrees
is wrong. This movie is amazing."
tokenize.(review)
nil
represents out of vocab token. Let’s account for them, and use the value 0
# Add padding
pad_token = 0
unknown_token = 1
max_seq_len = 64
tokens =
frequencies
|> Enum.sort_by(&elem(&1, 1), :desc)
|> Enum.take(num_tokens)
|> Enum.with_index(fn {token, _}, i -> {token, i + 2} end)
|> Map.new()
tokenize = fn review ->
review
|> String.downcase()
|> String.replace(~r/[\p{P}\p{S}]/, "")
|> String.split()
|> Enum.map(&Map.get(tokens, &1, unknown_token))
|> Nx.tensor()
|> then(&Nx.pad(&1, pad_token, [{0, max_seq_len - Nx.size(&1), 0}]))
end
tokenize.(review)
Testing out how padding works
ten = Nx.tensor([[1, 2, 3], [4, 5, 6]]) |> IO.inspect(label: "ten")
Nx.pad(ten, 0, [{1, 1, 0}, {1, 0, 1}])
Create the input pipeline
batch_size = 64
train_pipeline =
train_data
|> Stream.map(fn {review, label} ->
{tokenize.(review), Nx.tensor(label)}
end)
|> Stream.chunk_every(batch_size, batch_size, :discard)
|> Stream.map(fn reviews_and_labels ->
{reviews, labels} = Enum.unzip(reviews_and_labels)
{Nx.stack(reviews), Nx.stack(labels) |> Nx.new_axis(-1)}
end)
test_pipeline =
test_data
|> Stream.map(fn {review, label} ->
{tokenize.(review), Nx.tensor(label)}
end)
|> Stream.chunk_every(batch_size, batch_size, :discard)
|> Stream.map(fn reviews_and_labels ->
{reviews, labels} = Enum.unzip(reviews_and_labels)
{Nx.stack(reviews), Nx.stack(labels) |> Nx.new_axis(-1)}
end)
Enum.take(train_pipeline, 1)
Create simple MLP model
model =
Axon.input("review")
|> Axon.embedding(num_tokens + 2, 64)
|> Axon.flatten()
|> Axon.dense(64, activation: :relu)
|> Axon.dense(1)
input_template = Nx.template({64, 64}, :s64)
IO.puts(Axon.Display.as_table(model, input_template))
loss = &Axon.Losses.binary_cross_entropy(&1, &2,
from_logits: true,
reduction: :mean
)
optimizer = Polaris.Optimizers.adam(learning_rate: 1.0e-4)
trained_model_state =
model
|> Axon.Loop.trainer(loss, optimizer)
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(train_pipeline, %{}, epochs: 10, compiler: EXLA)
model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, trained_model_state, compiler: EXLA)
RNN implementation
sequence = Axon.input("review") # {batch_size, sequence_length}
embedded = sequence |> Axon.embedding(num_tokens + 2, 64)
mask = Axon.mask(sequence, 0) # Ignore token = 0, the padding token
{rnn_sequence, _state} = Axon.lstm(embedded, 64, mask: mask, unroll: :static)
final_token = Axon.nx(rnn_sequence, fn seq ->
Nx.squeeze(seq[[0..-1//1, -1, 0..-1//1]])
end)
model =
final_token
|> Axon.dense(64, activation: :relu)
|> Axon.dense(1)
loss = &Axon.Losses.binary_cross_entropy(&1, &2,
from_logits: true,
reduction: :mean
)
optimizer = Polaris.Optimizers.adam(learning_rate: 1.0e-4)
trained_model_state =
model
|> Axon.Loop.trainer(loss, optimizer)
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(train_pipeline, %{}, epochs: 10, compiler: EXLA)
model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, trained_model_state, compiler: EXLA)
Implementing bidirectional RNNs
sequence = Axon.input("review")
mask = Axon.mask(sequence, 0)
embedded = Axon.embedding(sequence, num_tokens + 2, 64)
{rnn_sequence, _state} = Axon.bidirectional(
embedded, &Axon.lstm(&1, 64, mask: mask, unroll: :static),
&Axon.concatenate/2
)
final_token = Axon.nx(rnn_sequence, fn seq ->
Nx.squeeze(seq[[0..-1//1, -1, 0..-1//1]])
end)
model =
final_token
|> Axon.dense(64, activation: :relu)
|> Axon.dense(1)
loss = &Axon.Losses.binary_cross_entropy(&1, &2,
from_logits: true,
reduction: :mean
)
optimizer = Polaris.Optimizers.adam(learning_rate: 1.0e-4)
trained_model_state =
model
|> Axon.Loop.trainer(loss, optimizer)
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(train_pipeline, %{}, epochs: 10, compiler: EXLA)
Chapter 10 - Time-series
csv_file = "/data/train/djia_30_stock/all_stocks_2006-01-01_to_2018-01-01.csv"
df = Explorer.DataFrame.from_csv!(csv_file, parse_dates: true)
df = Explorer.DataFrame.select(df, ["Date", "Close", "Name"])
Vl.new(title: "DJIA Stock Prices", width: 640, height: 480)
|> Vl.data_from_values(Explorer.DataFrame.to_columns(df))
|> Vl.mark(:line)
|> Vl.encode_field(:x, "Date", type: :temporal)
|> Vl.encode_field(:y, "Close", type: :quantitative)
|> Vl.encode_field(:color, "Name", type: :nominal)
|> Kino.VegaLite.new()
aapl_df = Explorer.DataFrame.filter_with(df, fn df ->
Explorer.Series.equal(df["Name"], "AAPL")
end)
Vl.new(title: "AAPL Stock Prices", width: 640, height: 480)
|> Vl.data_from_values(Explorer.DataFrame.to_columns(aapl_df))
|> Vl.mark(:line)
|> Vl.encode_field(:x, "Date", type: :temporal)
|> Vl.encode_field(:y, "Close", type: :quantitative)
|> Vl.encode_field(:color, "Name", type: :nominal)
|> Kino.VegaLite.new()
normalized_aapl_df = Explorer.DataFrame.mutate_with(aapl_df, fn df ->
var = Explorer.Series.variance(df["Close"])
mean = Explorer.Series.mean(df["Close"])
centered = Explorer.Series.subtract(df["Close"], mean)
norm = Explorer.Series.divide(centered, var)
["Close": norm]
end)
Vl.new(title: "AAPL Stock Prices", width: 640, height: 480)
|> Vl.data_from_values(Explorer.DataFrame.to_columns(normalized_aapl_df))
|> Vl.mark(:line)
|> Vl.encode_field(:x, "Date", type: :temporal)
|> Vl.encode_field(:y, "Close", type: :quantitative)
|> Vl.encode_field(:color, "Name", type: :nominal)
|> Kino.VegaLite.new()
defmodule Data do
def window(inputs, window_size, target_window_size) do
inputs
|> Stream.chunk_every(window_size + target_window_size, 1, :discard)
|> Stream.map(fn window ->
features =
window
|> Enum.take(window_size)
|> Nx.tensor()
|> Nx.new_axis(-1)
targets =
window
|> Enum.drop(window_size)
|> Nx.tensor()
|> Nx.new_axis(-1)
{features, targets}
end)
end
def batch(inputs, batch_size) do
inputs
|> Stream.chunk_every(batch_size, batch_size, :discard)
|> Stream.map(fn windows ->
{features, targets} = Enum.unzip(windows)
{Nx.stack(features), Nx.stack(targets)}
end)
end
end
Splitting the data
train_df = Explorer.DataFrame.filter_with(normalized_aapl_df, fn df ->
Explorer.Series.less(df["Date"], Date.new!(2016, 1, 1))
end)
test_df = Explorer.DataFrame.filter_with(normalized_aapl_df, fn df ->
Explorer.Series.greater_equal(df["Date"], Date.new!(2016, 1, 1))
end)
window_size = 10
batch_size = 32
train_prices = Explorer.Series.to_list(train_df["Close"])
test_prices = Explorer.Series.to_list(test_df["Close"])
single_step_train_data =
train_prices
|> Data.window(window_size, 1)
|> Data.batch(batch_size)
single_step_test_data =
test_prices
|> Data.window(window_size, 1)
|> Data.batch(batch_size)
Enum.take(single_step_train_data, 1)
Create the convolutional neural network
cnn_model =
Axon.input("stock_price")
|> Axon.conv(batch_size, kernel_size: window_size, activation: :relu)
|> Axon.dense(batch_size, activation: :relu)
|> Axon.dense(1)
template = Nx.template({batch_size, window_size, 1}, :f32)
Axon.Display.as_graph(cnn_model, template)
# IO.puts(Axon.Display.as_table(cnn_model, template))
cnn_trained_model_state =
cnn_model
|> Axon.Loop.trainer(:mean_squared_error, :adam)
|> Axon.Loop.metric(:mean_absolute_error)
|> Axon.Loop.run(single_step_train_data, %{}, epochs: 10, compiler: EXLA)
single_test_evaluation =
cnn_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:mean_absolute_error)
|> Axon.Loop.run(single_step_test_data, cnn_trained_model_state, compiler: EXLA)
Let’s use the mean and variance of AAPL stock prices to get a more accurate estimate.
The result means that over the course of 2 years, our model had an absolute error off the next day’s closing stock price, across each batch.
single_test_evaluation[0]["mean_absolute_error"] |> Nx.to_number()
|> Kernel.*(
:math.sqrt(Explorer.Series.variance(aapl_df["Close"]))
)
|> Kernel.+(
Explorer.Series.mean(aapl_df["Close"])
)
defmodule Analysis do
def visualize_predictions(
model,
model_state,
prices,
window_size,
target_window_size,
batch_size
) do
{_, predict_fn} = Axon.build(model, compiler: EXLA)
windows =
prices
|> Data.window(window_size, target_window_size)
|> Data.batch(batch_size)
|> Stream.map(&elem(&1, 0))
predicted = Enum.flat_map(windows, fn window ->
predict_fn.(model_state, window) |> Nx.to_flat_list()
end)
predicted = List.duplicate(nil, 10) ++ predicted
types =
List.duplicate("AAPL", length(prices))
++ List.duplicate("Predicted", length(prices))
days =
Enum.to_list(0..length(prices) - 1)
++ Enum.to_list(0..length(prices) - 1)
prices = prices ++ predicted
plot(%{
"day" => days,
"prices" => prices,
"types" => types
}, "AAPL Stock Price vs. Predicted, CNN Single-Shot")
end
defp plot(values, title) do
Vl.new(title: title, width: 640, height: 480)
|> Vl.data_from_values(values)
|> Vl.mark(:line)
|> Vl.encode_field(:x, "day", type: :temporal)
|> Vl.encode_field(:y, "prices", type: :quantitative)
|> Vl.encode_field(:color, "types", type: :nominal)
|> Kino.VegaLite.new()
end
end
Analysis.visualize_predictions(
cnn_model,
cnn_trained_model_state,
Explorer.Series.to_list(aapl_df["Close"]),
window_size,
1,
batch_size
)
Training a RNN for time-series
rnn_model =
Axon.input("stock_prices")
|> Axon.lstm(window_size)
|> elem(0)
|> Axon.nx(& &1[[0..-1//1, -1, 0..-1//1]])
|> Axon.dense(1)
template = Nx.template({batch_size, window_size, 1}, :f32)
Axon.Display.as_graph(rnn_model, template)
rnn_trained_model_state =
rnn_model
|> Axon.Loop.trainer(:mean_squared_error, :adam)
|> Axon.Loop.metric(:mean_absolute_error)
|> Axon.Loop.run(single_step_train_data, %{}, epochs: 50, compiler: EXLA)
rnn_single_test_eval =
rnn_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:mean_absolute_error)
|> Axon.Loop.run(single_step_test_data, rnn_trained_model_state, compiler: EXLA)
rnn_single_test_eval[0]["mean_absolute_error"] |> Nx.to_number()
|> Kernel.*(
:math.sqrt(Explorer.Series.variance(aapl_df["Close"]))
)
|> Kernel.+(
Explorer.Series.mean(aapl_df["Close"])
)
Analysis.visualize_predictions(
rnn_model,
rnn_trained_model_state,
Explorer.Series.to_list(aapl_df["Close"]),
window_size,
1,
batch_size
)
Chapter 11 - Transformers
auth_token = System.get_env("HF_TOKEN")
repo = {:hf, "google/vit-base-patch16-224", auth_token: auth_token}
{:ok, model_info} = Bumblebee.load_model(repo)
{:ok, featurizer} = Bumblebee.load_featurizer(repo)
serving =
Bumblebee.Vision.image_classification(model_info, featurizer,
top_k: 1,
compile: [batch_size: 1],
defn_options: [compiler: EXLA]
)
image_input = Kino.Input.image("Image", size: {224, 224})
form = Kino.Control.form([image: image_input], submit: "Run")
frame = Kino.Frame.new()
form
|> Kino.Control.stream()
|> Stream.filter(& &1.data.image)
|> Kino.listen(fn %{data: %{image: image}} ->
Kino.Frame.render(frame, Kino.Markdown.new("Running..."))
image =
image.file_ref
|> Kino.Input.file_path()
|> File.read!()
|> Nx.from_binary(:u8)
|> Nx.reshape({image.height, image.width, 3})
output = Nx.Serving.run(serving, image)
output.predictions
|> Enum.map(&{&1.label, &1.score})
|> Kino.Bumblebee.ScoredList.new()
|> then(&Kino.Frame.render(frame, &1))
end)
Kino.Layout.grid([form, frame], boxed: true, gap: 16)
{:ok, model} = Bumblebee.load_model(
{:hf, "facebook/bart-large-mnli"}
)
{:ok, tokenizer} = Bumblebee.load_tokenizer(
{:hf, "facebook/bart-large-mnli"}
)
model.model
labels = ["New booking", "Update booking", "Cancel booking", "Refund"]
zero_shot_serving =
Bumblebee.Text.zero_shot_classification(
model,
tokenizer,
labels
)
input = "I need to book a new flight"
Nx.Serving.run(zero_shot_serving, input)
Passing batches of input to a Serving
inputs = [
"I want to change my existing flight",
"I want to cancel my current flight",
"I demand my money back"
]
Nx.Serving.run(zero_shot_serving, inputs)
Fine tuning pre-trained models (distilbert)
{:ok, spec} = Bumblebee.load_spec({:hf, "distilbert-base-cased"},
module: Bumblebee.Text.Distilbert,
architecture: :for_sequence_classification
)
spec = Bumblebee.configure(spec, num_labels: 5)
{:ok, %{model: model, params: params}} = Bumblebee.load_model(
{:hf, "distilbert-base-cased"},
spec: spec
)
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "distilbert-base-cased"})
Create the input pipeline
batch_size = 32
max_length = 128
train_data =
File.stream!("/data/train/yelp_review_full/train.csv")
|> Stream.chunk_every(batch_size)
|> Stream.map(fn inputs ->
{labels, reviews} =
inputs
|> Enum.map(fn line ->
[label, review] = String.split(line, "\",\"")
{String.trim(label, "\""), String.trim(review, "\"")}
end)
|> Enum.unzip()
labels = labels |> Enum.map(&String.to_integer/1) |> Nx.tensor()
tokens = Bumblebee.apply_tokenizer(tokenizer, reviews, length: max_length)
{tokens, labels}
end)
Enum.take(train_data, 1)
Check model output
Axon.get_output_shape(model, %{"input_ids" => Nx.template({32, 128}, :s64)})
Extract logits
model = Axon.nx(model, fn %{logits: logits} -> logits end)
optimizer = Polaris.Optimizers.adamw(learning_rate: 5.0e-5)
loss = &Axon.Losses.categorical_cross_entropy(&1, &2,
from_logits: true,
sparse: true,
reduction: :mean
)
trained_model_state =
model
|> Axon.Loop.trainer(loss, optimizer, log: 1)
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(train_data, params, epochs: 3, compiler: EXLA)
Chapter 12 - Unsupervised learning
An autoencoder - an NN that learns to compress data, then a separate NN that is trained to decompress the compressed form
Composed of an encoder and a decoder.
Encoders learn latent representation of input data. Decoders learn to reconstruct input data from the latent representation and back with minimal info loss.
batch_size = 64
{{data, type, shape}, _} = Scidata.MNIST.download()
train_data =
data
|> Nx.from_binary(type)
|> Nx.reshape({:auto, 28, 28, 1})
|> Nx.divide(255)
|> Nx.to_batched(batch_size)
defmodule Autoencoder do
def encoder(input) do
input
|> Axon.flatten()
|> Axon.dense(256, activation: :relu, name: "encoder_dense_0")
|> Axon.dense(128, activation: :relu, name: "encoder_dense_1")
end
def decoder(input) do
input
|> Axon.dense(256, activation: :relu, name: "decoder_dense_0")
|> Axon.dense(784, activation: :sigmoid, name: "decoder_dense_1")
|> Axon.reshape({:batch, 28, 28, 1})
end
end
model =
Axon.input("image")
|> Autoencoder.encoder()
|> Autoencoder.decoder()
model
trained_model_state =
model
|> Axon.Loop.trainer(:mean_squared_error, Polaris.Optimizers.adam(learning_rate: 1.0e-3))
|> Axon.Loop.run(
Stream.zip(train_data, train_data),
%{},
epochs: 5,
compiler: EXLA
)
test_batch = Enum.at(train_data, 0)
test_image = test_batch[0] |> Nx.new_axis(0)
visualize_test_image = fn
%Axon.Loop.State{step_state: step_state} = state ->
out_image = Axon.predict(
model,
step_state[:model_state],
test_image,
compiler: EXLA
)
out_image =
out_image
|> Nx.multiply(255)
|> Nx.as_type(:u8)
|> Nx.reshape({28, 28, 1})
Kino.Image.new(out_image) |> Kino.render()
{:continue, state}
end
trained_model_state =
model
|> Axon.Loop.trainer(:mean_squared_error, Polaris.Optimizers.adam(learning_rate: 1.0e-3))
|> Axon.Loop.handle_event(:epoch_completed, visualize_test_image)
|> Axon.Loop.run(
Stream.zip(train_data, train_data),
%{},
epochs: 5,
compiler: EXLA
)
Let’s try only with decoder
decoder_only =
Axon.input("noise")
|> Autoencoder.decoder()
key = Nx.Random.key(42)
{noise, key} = Nx.Random.normal(key, shape: {1, 128})
out_image = Axon.predict(decoder_only, trained_model_state, noise)
upsampled = Axon.Layers.resize(out_image, size: {512, 512})
out_image =
upsampled
|> Nx.reshape({512, 512, 1})
|> Nx.multiply(255)
|> Nx.as_type(:u8)
Kino.Image.new(out_image)
Variational autoencoders force models to learn a structured latent space
defmodule VAE do
import Nx.Defn
def encoder(input) do
encoded =
input
|> Axon.conv(32, kernel_size: 3, activation: :relu, strides: 2, padding: :same)
|> Axon.conv(32, kernel_size: 3, activation: :relu, strides: 2, padding: :same)
|> Axon.flatten()
|> Axon.dense(16, activation: :relu)
z_mean = Axon.dense(encoded, 2)
z_log_var = Axon.dense(encoded, 2)
z = Axon.layer(&sample/3, [z_mean, z_log_var], op_name: :sample)
Axon.container({z_mean, z_log_var, z})
end
def decoder(input) do
input
|> Axon.dense(7 * 7 * 64, activation: :relu)
|> Axon.reshape({:batch, 7, 7, 64})
|> Axon.conv_transpose(64,
kernel_size: {3, 3},
activation: :relu,
strides: [2, 2],
padding: :same
)
|> Axon.conv_transpose(32,
kernel_size: {3, 3},
activation: :relu,
strides: [2, 2],
padding: :same
)
|> Axon.conv_transpose(1,
kernel_size: {3, 3},
activation: :sigmoid,
padding: :same
)
end
def display_sample(
%Axon.Loop.State{step_state: state} = out_state,
decoder_fn
) do
latent = Nx.tensor([[0.0, 0.0], [0.5, 0.5], [1.0, 1.0]])
%{prediction: out} = decoder_fn.(state[:model_state]["decoder"], latent)
out_image = Nx.multiply(out, 255) |> Nx.as_type(:u8)
upsample =
Axon.Layers.resize(
out_image,
size: {512, 512},
channels: :first
)
for i <- 0..2 do
Kino.Image.new(Nx.reshape(upsample[i], {512, 512, 1})) |> Kino.render()
end
{:continue, out_state}
end
defn train_step(encoder_fn, decoder_fn, optimizer_fn, batch, state) do
{batch_loss, joint_param_grads} =
value_and_grad(
state[:model_state],
&joint_objective(encoder_fn, decoder_fn, batch, &1)
)
{scaled_updates, new_optimizer_state} =
optimizer_fn.(
joint_param_grads,
state[:optimizer_state],
state[:model_state]
)
new_model_state =
Polaris.Updates.apply_updates(
state[:model_state],
scaled_updates
)
new_loss =
state[:loss]
|> Nx.multiply(state[:i])
|> Nx.add(batch_loss)
|> Nx.divide(Nx.add(state[:i], 1))
%{
state
| i: Nx.add(state[:i], 1),
loss: new_loss,
model_state: new_model_state,
optimizer_state: new_optimizer_state
}
end
defn init_step(
encoder_init_fn,
decoder_init_fn,
optimizer_init_fn,
batch,
init_state
) do
encoder_params = encoder_init_fn.(batch, init_state)
{decoder_params, _key} =
decoder_init_fn.(Nx.Random.uniform(Nx.Random.key(42), shape: {64, 2}), init_state)
joint_params = %{
"encoder" => encoder_params,
"decoder" => decoder_params
}
optimizer_state = optimizer_init_fn.(joint_params)
%{
i: Nx.tensor(0),
loss: Nx.tensor(0.0),
model_state: joint_params,
optimizer_state: optimizer_state
}
end
defnp joint_objective(encoder_fn, decoder_fn, batch, joint_params) do
%{prediction: preds} = encoder_fn.(joint_params["encoder"], batch)
{z_mean, z_log_var, z} = preds
%{prediction: reconstruction} = decoder_fn.(joint_params["decoder"], z)
recon_loss =
Axon.Losses.binary_cross_entropy(
batch,
reconstruction,
reduction: :mean
)
kl_loss = -0.5 * (1 + z_log_var - Nx.pow(z_mean, 2) - Nx.exp(z_log_var))
kl_loss = Nx.mean(Nx.sum(kl_loss, axes: [1]))
recon_loss + kl_loss
end
defnp sample(z_mean, z_log_var, _opts \\ []) do
noise_shape = Nx.shape(z_mean)
{epsilon, _key} = Nx.Random.normal(Nx.Random.key(42), shape: noise_shape)
z_mean + Nx.exp(0.5 * z_log_var) * epsilon
end
end
Let’s see what Axon loop under the hood looks like:
encoder = Axon.input("image") |> VAE.encoder()
decoder = Axon.input("latent") |> VAE.decoder()
{encoder_init_fn, encoder_fn} = Axon.build(encoder, mode: :train)
{decoder_init_fn, decoder_fn} = Axon.build(decoder, mode: :train)
{optimizer_init_fn, optimizer_fn} = Polaris.Optimizers.adam(learning_rate: 1.0e-3)
init_fn = &VAE.init_step(
encoder_init_fn,
decoder_init_fn,
optimizer_init_fn,
&1,
&2
)
step_fn = &VAE.train_step(
encoder_fn,
decoder_fn,
optimizer_fn,
&1,
&2
)
step_fn
|> Axon.Loop.loop(init_fn)
|> Axon.Loop.handle_event(:epoch_completed, &VAE.display_sample(&1, decoder_fn))
|> Axon.Loop.log(fn
%Axon.Loop.State{epoch: epoch, iteration: iter, step_state: state} ->
"\rEpoch: #{epoch}, batch: #{iter}, loss: #{Nx.to_number(state[:loss])}"
end, device: :stdio, event: :iteration_completed)
|> Axon.Loop.run(train_data, %{}, compiler: EXLA, epochs: 10)