Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Chapt8 notebook

chapt8.livemd

Chapt8 notebook

Mix.install([
  # {:benchee, "~> 1.3"},
  # {:explorer, "~> 0.9.1"},
  {:axon_onnx, git: "https://github.com/mortont/axon_onnx.git", branch: "master"},
  {:axon, "~> 0.5"},
  {:nx, "~> 0.5"},
  # {:exla, "~> 0.5"},
  {:stb_image, "~> 0.6"},
  {:kino, "~> 0.8"},
  {:torchx, "~> 0.7"}
  # {:bumblebee, "~> 0.5.3"},
  # {:kino_vega_lite, "~> 0.1.13"},
  # {:scholar, "~> 0.3.1"},
  # {:scidata, "~> 0.1.11"},
  # {:table_rex, "~> 3.1"},
  # {:tucan, "~> 0.3.0"},
  # {:vega_lite, "~> 0.1.9"},
] ,
   config: [
    nx: [
      default_backend: {Torchx.Backend, device: :cuda}
    ]
  ],
  system_env: %{"LIBTORCH_TARGET" => "cu121", "LIBTORCH_VERSION" => "2.4.1"}
) 

Pipeline

defmodule CatsAndDogs do
  def pipeline(paths, batch_size, target_height, target_width, opts \\ []) do
    pipeline = paths
    |> Enum.shuffle()
    |> Task.async_stream(&parse_image/1)
    |> Stream.filter(fn 
      {:ok, {%StbImage{}, _}} -> true
      _ -> false
        end)
    |> Stream.map(&to_tensors(&1, target_height, target_width))
    |> Stream.chunk_every(batch_size, batch_size, :discard)
    |> Stream.map(fn chunks ->
      {img_chunk, label_chunk} = Enum.unzip(chunks)
      {Nx.stack(img_chunk), Nx.stack(label_chunk)}
    end)

    if Keyword.get(opts, :test_image, false) do
      test_image = paths
      |> Enum.shuffle()
      |> Enum.take_random(1)
      |> hd()
      |> parse_image()
      |> case do
        {img, label} -> to_tensors({:ok, {img, label}}, target_height, target_width)
        :error -> {:error, :error_converting_image}
      end

      {pipeline, test_image}
    else
      pipeline
    end
  end

  def pipeline_with_aug(paths, batch_size, target_height, target_width) do
    paths
    |> Enum.shuffle()
    |> Task.async_stream(&parse_image/1)
    |> Stream.filter(fn 
      {:ok, {%StbImage{}, _}} -> true
      _ -> false
        end)
    |> Stream.map(&to_tensors(&1, target_height, target_width))
    |> Stream.map(&random_flip(&1, :height))
    |> Stream.map(&random_flip(&1, :width))
    |> Stream.chunk_every(batch_size, batch_size, :discard)
    |> Stream.map(fn chunks ->
      {img_chunk, label_chunk} = Enum.unzip(chunks)
      {Nx.stack(img_chunk), Nx.stack(label_chunk)}
    end)
  end

  defp parse_image(path) do
    label = if String.contains?(path, "cat"), do: 0, else: 1

    case StbImage.read_file(path) do
      {:ok, img} -> {img, label}
      _err -> :error
    end
  end

  defp to_tensors({:ok, {img, label}}, target_height, target_width) do
    img_tensor = img
    |> StbImage.resize(target_height, target_width)
    |> StbImage.to_nx()
    |> Nx.divide(255)
    |> Nx.transpose(axes: [:channels, :height, :width])

    label_tensor = Nx.tensor([label])
    
    {img_tensor, label_tensor}
  end

  defp random_flip({image, label}, axis) do
    if :rand.uniform() < 0.5 do
      {Nx.reverse(image, axes: [axis]), label}
    else
      {image, label}
    end
  end
end
{:ok, cwd} = File.cwd()
train_path = "#{cwd}/files/train/*.jpg"
{test_paths, train_paths} = train_path
  |> Path.wildcard()
  |> Enum.shuffle()
  |> Enum.split(1000)
{test_paths, val_paths} = Enum.split(test_paths, 750)

batch_size = 32
target_height = target_width = 160
train_pipeline = CatsAndDogs.pipeline_with_aug(train_paths, batch_size, target_height, target_width)
val_pipeline = CatsAndDogs.pipeline(val_paths, batch_size, target_height, target_width)
{test_pipeline, test_image} = CatsAndDogs.pipeline(test_paths, batch_size, target_height, target_width, test_image: true)

Enum.take(train_pipeline, 1)
  file_path = Kino.FS.file_path("mobilenetv2-7.onnx")
{cnn_base, cnn_base_params} = AxonOnnx.import(file_path, batch_size: batch_size)
input_template = Nx.template({1, 3, target_height, target_width}, :f32)
Axon.Display.as_graph(cnn_base, input_template)
{_, cnn_base} = Axon.pop_node(cnn_base)
{_, cnn_base} = Axon.pop_node(cnn_base)
Axon.Display.as_graph(cnn_base, input_template)
cnn_base = Axon.namespace(cnn_base, "feature_extractor")
cnn_base = Axon.freeze(cnn_base)
model = cnn_base
|> Axon.global_avg_pool(channels: :first)
|> Axon.dropout(rate: 0.2)
|> Axon.dense(1)
loss = &amp;Axon.Losses.binary_cross_entropy(&amp;1, &amp;2, reduction: :mean, from_logits: true)

optimizer = Polaris.Optimizers.adam(learning_rate: 1.0e-4)

trained_model_state = model
|> Axon.Loop.trainer(loss, optimizer)
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.validate(model, val_pipeline)
|> Axon.Loop.early_stop("validation_loss", mode: :min, patience: 5)
|> Axon.Loop.run(
  train_pipeline,
  %{"feature_extractor" => cnn_base_params},
  epochs: 100
)
eval_model = Axon.sigmoid(model)

eval_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, trained_model_state)
tuner_model = Axon.unfreeze(model, up: 50)
loss = &amp;Axon.Losses.binary_cross_entropy(&amp;1, &amp;2, reduction: :mean, from_logits: true)

optimizer = Polaris.Optimizers.rmsprop(learning_rate: 1.0e-5)

tuner_trained_model_state = tuner_model
|> Axon.Loop.trainer(loss, optimizer)
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.validate(tuner_model, val_pipeline)
|> Axon.Loop.early_stop("validation_loss", mode: :min, patience: 5)
|> Axon.Loop.run(
  train_pipeline,
  # trained_model_state,
  %{"feature_extractor" => cnn_base_params},
  epochs: 100
)
eval_model = Axon.sigmoid(model)

eval_model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:accuracy)
|> Axon.Loop.run(test_pipeline, trained_model_state, compiler: EXLA)
{_, predict_fn} = Axon.build(eval_model, compiler: EXLA)
      test_image = test_paths
      |> Enum.shuffle()
      |> Enum.take_random(1)
      |> hd()
      |> parse_image()
      |> case do
        {img, label} -> to_tensors({:ok, {img, label}}, target_height, target_width)
        :error -> {:error, :error_converting_image}
      end

test_image
# |> Nx.new_axis(0)
|> then(&amp;predict_fn.(trained_model_state, &amp;1))
|> Nx.argmax()