Powered by AppSignal & Oban Pro

Ch7: CNNs

Ch7 - CNN.livemd

Ch7: CNNs

Mix.install([
  {:axon, "~> 0.5"},
  {:nx, "~> 0.5"},
  {:exla, "~> 0.5"},
  {:stb_image, "~> 0.6"},
  {:kino, "~> 0.8"}
])

Default backend

Nx.global_default_backend(EXLA.Backend)

Input pipeline

defmodule CatsAndDogs do

  def pipeline(paths, batch_size, target_height, target_width, augment \\ false) do
    paths
    |> Enum.shuffle()
    |> Task.async_stream(&parse_image/1)
    |> Stream.filter(fn
      {:ok, {%StbImage{}, _}} ->
        true

      _ ->
        false
    end)
    |> Stream.map(&to_tensors(&1, target_height, target_width))
    |> augment_data(augment)
    |> Stream.chunk_every(batch_size, batch_size, :discard)
    |> Stream.map(fn chunks ->
      {img_chunk, label_chunk} = Enum.unzip(chunks)
      {Nx.stack(img_chunk), Nx.stack(label_chunk)}
    end)
  end

  defp augment_data(stream, augment) do
    if augment do
      stream
      |> Stream.map(&random_flip(&1, :height))
      |> Stream.map(&random_flip(&1, :width))
    else
      stream
    end
  end

  defp to_tensors({:ok, {img, label}}, target_height, target_width) do
    img_tensor =
      img
      |> StbImage.resize(target_height, target_width)
      |> StbImage.to_nx()
      |> Nx.divide(255)

    label_tensor = Nx.tensor([label])
    {img_tensor, label_tensor}
  end

  defp parse_image(path) do
    filename = Path.basename(path, ".jpg")
    label = if String.contains?(filename, "cat"), do: 0, else: 1

    case StbImage.read_file(path) do
      {:ok, img} -> {img, label}
      _error -> :error
    end
  end

  defp random_flip({image, label}, axis) do
    if :rand.uniform() < 0.5 do
      {Nx.reverse(image, axes: [axis]), label}
    else
      {image, label}
    end
  end
end

Notice you have to specify the additional target_height and target_width arguments. The choice of target_height and target_width is arbitrary. Just remember that lower resolutions encode less information than higher resolutions and might be more difficult for your neural network to train on, whereas higher resolutions require more processing power.

base_path = "Dev/Education/Elixir/ml/Datasets/"

{test_paths, train_paths} =
  (base_path <> "dogs-vs-cats/train/*.jpg")
  |> Path.wildcard()
  |> Enum.shuffle()
  |> Enum.split(1000)

{test_paths, val_paths} = test_paths |> Enum.split(750)

batch_size = 128
target_height = 96
target_width = 96

train_pipeline =
  CatsAndDogs.pipeline(
    train_paths,
    batch_size,
    target_height,
    target_width,
    true
  )

# Notice that you don’t want to apply augmentations to your test or validation pipeline. 
# You don’t want to make classification more difficult for your model at test time.
val_pipeline =
  CatsAndDogs.pipeline(
    val_paths,
    batch_size,
    target_height,
    target_width,
    false
  )

test_pipeline =
  CatsAndDogs.pipeline(
    test_paths,
    batch_size,
    target_height,
    target_width,
    false
  )

# Enum.take(train_pipeline, 1)
# Enum.take(test_pipeline, 1)

MLP as baseline

Not required. Done in this case to compare performance vs CNN.

mlp_model =
  Axon.input("images", shape: {nil, target_height, target_width, 3})
  |> Axon.flatten()
  |> Axon.dense(256, activation: :relu)
  |> Axon.dense(128, activation: :relu)
  |> Axon.dense(1, activation: :sigmoid)
mlp_template = Nx.template({batch_size, target_height, target_width ,3}, :f32)
Axon.Display.as_graph(mlp_model, mlp_template)
mlp_trained_model_state =
  mlp_model
  |> Axon.Loop.trainer(:binary_cross_entropy, :adam)
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.run(train_pipeline, %{}, epochs: 5, compiler: EXLA)
mlp_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, mlp_trained_model_state, compiler: EXLA)

CNN

This code uses Nx.conv to implement a basic edge detector.

img_path = base_path <> "dogs-vs-cats/train/dog.5.jpg"

img =
  img_path
  |> StbImage.read_file!()
  |> StbImage.to_nx()
  |> Nx.transpose(axes: [:channels, :height, :width])
  |> Nx.new_axis(0)

kernel =
  Nx.tensor([
    [-1, 0, 1],
    [-1, 0, 1],
    [-1, 0, 1]
  ])

kernel =
  kernel
  |> Nx.reshape({1, 1, 3, 3})
  |> Nx.broadcast({3, 3, 3, 3})

img
|> Nx.conv(kernel)
|> Nx.as_type({:u, 8})
|> Nx.squeeze(axes: [0])
|> Nx.transpose(axes: [:height, :width, :channels])
|> Kino.Image.new()

The CNN model

cnn_model =
  Axon.input("images", shape: {nil, 96, 96, 3})
  # convolutional block 1
  |> Axon.conv(32,
    kernel_size: {3, 3},
    padding: :same,
    activation: :relu
  )
  |> Axon.max_pool(
    kernel_size: {2, 2},
    strides: [2, 2]
  )
  # convolutional block 2
  |> Axon.conv(128,
    kernel_size: {3, 3},
    padding: :same,
    activation: :relu
  )
  |> Axon.max_pool(
    kernel_size: {2, 2},
    strides: [2, 2]
  )
  # flatten (or pool)
  |> Axon.flatten()
  # fully connected head
  |> Axon.dense(128, activation: :relu)
  |> Axon.dropout(rate: 0.5)
  |> Axon.dense(1, activation: :sigmoid)

template = Nx.template({1, 96, 96, 3}, :f32)

Axon.Display.as_graph(cnn_model, template)

Note that Axon.conv adds the activation layer in.

Training

cnn_trained_model_state =
  cnn_model
  |> Axon.Loop.trainer(:binary_cross_entropy, Polaris.Optimizers.adam(learning_rate: 1.0e-3))
  |> Axon.Loop.metric(:accuracy)
  |> Axon.Loop.validate(cnn_model, val_pipeline)
  |> Axon.Loop.early_stop("validation_loss", mode: :min)
  |> Axon.Loop.run(train_pipeline, %{}, epochs: 100, compiler: EXLA)

Run test set eval

cnn_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, cnn_trained_model_state, compiler: EXLA)