Introduction NX - MNIST
Mix.install(
[
{:nx, "~> 0.4.1"},
{:exla, "~> 0.2"},
{:axon, "~> 0.3.0"},
{:tokenizers, "~> 0.2.0"},
{:vega_lite, "~> 0.1.6"},
{:kino, "~> 0.7.0"},
{:kino_vega_lite, "~> 0.1.7"},
{:scidata, "~> 0.1.9"}
],
config: [
nx: [default_backend: EXLA.Backend]
]
)
alias VegaLite, as: Vl
Introduction
These are some ML experiments in Elixir / Livebook with Axon.
Data Preparation
Here we load the data files for both the train set and the test set.
{train_images_data, train_labels_data} = Scidata.MNIST.download()
{test_images_data, test_labels_data} = Scidata.MNIST.download_test()
{train_images_data, train_labels_data, test_images_data, test_labels_data}
We then do some pattern-matching on the binary results to extract the relevant parts.
{images, _type, {n_images, 1, width, height}} = train_images_data
{labels, _type, {n_labels}} = train_labels_data
{test_images, _type, {n_test_images, 1, width, height}} = test_images_data
{test_labels, _type, {n_test_labels}} = test_labels_data
Now we extract the images and labels from the binary data into proper batched lists:
bytes_per_image = width * height
train_images =
images
|> Nx.from_binary({:u, 8})
|> Nx.reshape({n_images, bytes_per_image}, names: [:batch, :input])
|> Nx.to_batched(30)
train_labels =
labels
|> Nx.from_binary({:u, 8})
|> Nx.reshape({n_labels, 1}, names: [:batch, :output])
|> Nx.equal(Nx.tensor(Enum.to_list(0..9)))
|> Nx.to_batched(30)
Let’s do the same for the test data, but without batching:
test_images =
test_images
|> Nx.from_binary({:u, 8})
|> Nx.reshape({n_test_images, bytes_per_image}, names: [:batch, :input])
test_labels =
test_labels
|> Nx.from_binary({:u, 8})
|> Nx.reshape({n_test_labels, 1}, names: [:batch, :output])
|> Nx.equal(Nx.tensor(Enum.to_list(0..9)))
The ML model
Creating the model is really simple:
model =
Axon.input("input", shape: {nil, bytes_per_image})
|> Axon.dense(128, activation: :relu)
|> Axon.dense(10, activation: :softmax)
model |> Axon.Display.as_graph(Nx.template({30, 784}, :f32))
Let’s define a graph that will dynamically show the progression of the training process:
epoch_frame = Kino.Frame.new()
chart =
Vl.new(width: 400, height: 400)
|> Vl.data_from_values([])
|> Vl.encode_field(:x, "iteration",
type: :quantitative,
title: "Iteration",
scale: [domain: [0, 2000]]
)
|> Vl.layers([
Vl.new()
|> Vl.mark(:line, color: :red)
|> Vl.encode_field(:y, "loss",
type: :quantitative,
scale: [domain: [0, 10]],
title: "Loss"
),
Vl.new()
|> Vl.mark(:line, color: :blue)
|> Vl.encode_field(:y, "accuracy",
type: :quantitative,
scale: [domain: [0, 1]],
title: "Accuracy"
)
])
|> Vl.resolve(:scale, y: :independent)
|> Kino.VegaLite.new()
add_chart_data = fn iteration, loss, acc ->
if rem(iteration, 50) == 0 do
Kino.VegaLite.push(chart, %{
iteration: iteration,
loss: min(Nx.to_number(loss), 10),
accuracy: Nx.to_number(acc)
})
end
end
update_epoch = fn epoch ->
epoch_frame |> Kino.Frame.render(Kino.Markdown.new("### Epoch #{epoch}"))
end
start_epoch = fn %{epoch: epoch} = state ->
update_epoch.(epoch)
Kino.VegaLite.clear(chart)
{:continue, state}
end
complete_iteration = fn %{metrics: %{"loss" => loss, "accuracy" => acc}, iteration: iteration} =
state ->
add_chart_data.(iteration, Nx.to_number(loss), Nx.to_number(acc))
{:continue, state}
end
Kino.Frame.render(epoch_frame, Kino.Markdown.new("Training not yet started..."))
Kino.Layout.grid([epoch_frame, chart])
Training the model using Axon is just a matter of specifying the loss function and the optimizer (the Adam optimizer in this case):
params =
model
|> Axon.Loop.trainer(:categorical_cross_entropy, :adam)
|> Axon.Loop.metric(:accuracy, "accuracy")
|> Axon.Loop.handle(:epoch_started, start_epoch)
|> Axon.Loop.handle(:iteration_completed, complete_iteration)
|> Axon.Loop.run(Stream.zip(train_images, train_labels), %{}, epochs: 10, compiler: EXLA)
Now we just need to test the model. Just run the prediction on each image in the test set:
output = Axon.predict(model, params, test_images)
We get the output for each image, but we want to know which element in the output has the highest value. We use Nx.argmax
for that:
predictions = Nx.argmax(output, axis: 1)
expected = test_labels |> Nx.argmax(axis: 1)
predictions |> Nx.equal(expected) |> Nx.sum() |> Nx.divide(n_test_images)
test_images |> Nx.reshape({n_test_images, 28, 28}) |> Nx.to_heatmap()
Tokenizers
{:ok, tokenizer} = Tokenizers.Tokenizer.from_pretrained("xlm-roberta-base")
{:ok, encoding} = Tokenizers.Tokenizer.encode(tokenizer, "Hello there!")
Tokenizers.Tokenizer.get_vocab_size(tokenizer)
{:ok, tokens} =
Tokenizers.Tokenizer.encode(tokenizer, "Quelle est la date limite de cotisation a un REER!")
padded = Tokenizers.Encoding.pad(tokens, 20)
Tokenizers.Encoding.get_ids(padded) |> Nx.tensor()