Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Untitled notebook

src/segmentation_learn.livemd

Untitled notebook

Mix.install(
  [
    {:kino, "~> 0.10.0"},
    {:evision, "~> 0.1.33"},
    {:axon, "~> 0.6.0"},
    {:table_rex, "~> 3.1.1"},
    {:axon_onnx, "~> 0.4.0"},
    {:nx, "~> 0.6.1"},
    {:exla, "~> 0.6.1"},
    {:stb_image, "~> 0.6.3"},
    {:flow, "~> 1.2.4"}
  ],
  config: [
    nx: [
      default_backend: EXLA.Backend,
      default_defn_options: [compiler: EXLA]
    ]
  ]
  # system_env: %{"XLA_TARGET" => "cuda118"}
)

Section

File.cd!(__DIR__)
img_dir = ~c"../Plant segmentation/images/"
mask_dir = ~c"../Plant segmentation/masks/"
img_dir = ~c"./Images/"
mask_dir = ~c"./Masks/"
images = File.ls!(img_dir)
masks = File.ls!(mask_dir)
IO.puts("Number of images: #{length(images)}")
IO.puts("Number of masks: #{length(masks)}")
defmodule Split do
  # データの分割関数の定義
  def split_data(images, masks, test_ratio) do
    total_length = length(images)
    test_size = trunc(total_length * test_ratio)

    # データをシャッフル
    {shuffled_images, shuffled_masks} = Enum.zip(images, masks) |> Enum.shuffle() |> Enum.unzip()

    # テストデータとトレーニングデータの分割
    test_images = Enum.take(shuffled_images, test_size)
    test_masks = Enum.take(shuffled_masks, test_size)
    train_images = Enum.drop(shuffled_images, test_size)
    train_masks = Enum.drop(shuffled_masks, test_size)

    {train_images, test_images, train_masks, test_masks}
  end
end

img_dir = ~c"./Images/"
mask_dir = ~c"./Masks/"
images = File.ls!(img_dir)
masks = File.ls!(mask_dir)
# データ分割の呼び出し
{train_images, test_images, train_masks, test_masks} = Split.split_data(images, masks, 0.2)

# データ数の出力
IO.inspect(length(train_images))
IO.inspect(length(train_masks))
IO.inspect(length(test_images))
IO.inspect(length(test_masks))

file = Enum.take(train_images, 1)
IO.puts("#{img_dir}#{file}")
img = File.read!("#{img_dir}#{file}")
mask = File.read!("#{mask_dir}#{file}")

Kino.Layout.grid([img, mask], columns: 2)
Kino.Image.new(img, :png)
cnn1 =
  Axon.input("input", shape: {nil, 128, 128, 3})
  |> Axon.conv(64, kernel_size: {3, 3}, padding: :same)
  |> Axon.conv(64, kernel_size: {2, 2}, padding: :same)
  |> Axon.max_pool(kernel_size: {2, 2})
  |> Axon.conv(128, kernel_size: {1, 1})
  |> Axon.conv(128, kernel_size: {1, 1})

cnn2 =
  cnn1
  |> Axon.max_pool(kernel_size: {2, 2})
  |> Axon.conv(256, kernel_size: {1, 1})
  |> Axon.conv(256, kernel_size: {1, 1})
  |> Axon.conv(256, kernel_size: {1, 1})

cnn3 =
  cnn2
  |> Axon.max_pool(kernel_size: {2, 2})
  |> Axon.conv(512, kernel_size: {1, 1})
  |> Axon.conv(512, kernel_size: {1, 1})
  |> Axon.conv(512, kernel_size: {1, 1})

cnn4 =
  cnn3
  |> Axon.max_pool(kernel_size: {2, 2})
  |> Axon.conv(512, kernel_size: {1, 1})
  |> Axon.conv(512, kernel_size: {1, 1})
  |> Axon.conv(512, kernel_size: {1, 1})

cnn5 =
  cnn4
  |> Axon.max_pool(kernel_size: {2, 2})
  |> Axon.conv(512, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv(512, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv_transpose(512, kernel_size: {1, 1}, strides: 2)

cnn6 =
  cnn5
  |> Axon.concatenate(cnn4)
  |> Axon.conv(256, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv(256, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv_transpose(256, kernel_size: {1, 1}, strides: 2)

cnn7 =
  cnn6
  |> Axon.concatenate(cnn3)
  |> Axon.conv(128, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv(128, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv_transpose(128, kernel_size: {1, 1}, strides: 2)

cnn8 =
  cnn7
  |> Axon.concatenate(cnn2)
  |> Axon.conv(64, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv(64, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv_transpose(64, kernel_size: {1, 1}, strides: 2)

model =
  cnn8
  |> Axon.concatenate(cnn1)
  |> Axon.conv(32, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv(32, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv_transpose(64, kernel_size: {1, 1}, strides: 2)
  |> Axon.conv(16, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv(16, kernel_size: {1, 1})
  |> Axon.batch_norm()
  |> Axon.relu()
  |> Axon.conv(3, kernel_size: {1, 1}, activation: :sigmoid)

Axon.Display.as_table(model, Nx.template({4, 128, 128, 3}, :u8)) |> IO.puts()
# Axon.Display.as_graph(model, Nx.template({4, 128, 128, 3}, :u8))
defmodule Image do
  def to_nx(img_names, path) do
    img_names =
      img_names
      |> Enum.take(5000)
      |> Enum.map(&Path.join(path, &1))
      |> Enum.filter(&(Path.extname(&1) == ".png"))
      |> Enum.to_list()

    img_names
    |> Flow.from_enumerable(min_demand: 1)
    |> Flow.map(fn name ->
      File.read!(name)
      |> StbImage.read_binary!()
      |> StbImage.to_nx()
    end)
    |> Enum.to_list()
    |> Nx.concatenate()
    |> Nx.as_type(:u8)
    |> Nx.divide(255)
    |> Nx.reshape({Enum.count(img_names), 128, 128, 3})
  end
end
train_images =
  train_images
  |> Image.to_nx("./Images")
train_masks =
  train_masks
  |> Image.to_nx("./Masks")
test_images =
  test_images
  |> Image.to_nx("./Images")
test_masks =
  test_masks
  |> Image.to_nx("./Masks")
batch_size = 8

batched_train_images =
  train_images
  |> Nx.to_batched(batch_size)

batched_train_masks =
  train_masks
  |> Nx.to_batched(batch_size)

batched_test_images =
  test_images
  |> Nx.to_batched(batch_size)

batched_test_masks =
  test_masks
  |> Nx.to_batched(batch_size)
data = Stream.zip(batched_train_images, batched_train_masks)
test_data = Stream.zip(batched_test_images, batched_test_masks)
params =
  model
  |> Axon.Loop.trainer(:binary_cross_entropy, Axon.Optimizers.adam(0.001))
  |> Axon.Loop.run(data, %{}, epochs: 5, compiler: EXLA)
File.cd!(__DIR__)
path_to_axon = "./cpu_batch8_epochs5.axon"
model_bytes = Axon.serialize(model, params)
File.write!(path_to_axon, model_bytes)
defmodule Change do
  def to_integer(list) do
    list
    |> Enum.map(fn x ->
      Enum.map(x, fn y ->
        Enum.map(y, fn z -> trunc(z) end)
      end)
    end)
  end
end
num = :rand.uniform(100)

test_img1 =
  test_images[num]
  |> Nx.multiply(255)
  |> Nx.to_list()
  |> Change.to_integer()
  |> Nx.tensor(type: :u8)
  |> StbImage.from_nx()

test_img =
  test_images[num]
  |> Nx.new_axis(0)

pre_img =
  Axon.predict(model, params, test_img)
  |> Nx.squeeze(axes: [0])
  |> Nx.multiply(255)
  |> Nx.to_list()
  |> Change.to_integer()
  |> Nx.tensor(type: :u8)
  |> StbImage.from_nx()

Kino.Layout.grid([test_img1, pre_img], columns: 2)