Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Image Classification Client

Image_classification_client.livemd

Image Classification Client

Mix.install(
  [
    {:nx, "~> 0.7"},
    {:exla, "~> 0.7"},
    {:axon_onnx, "~> 0.4", git: "https://github.com/mortont/axon_onnx/"},
    {:evision, "~> 0.2"},
    {:flow, "~> 1.2"},
    {:req, "~> 0.5"},
    {:kino, "~> 0.13"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Distributed processing

distributed

Load data

Tellus Satellite Data Traveler API を使用する

API 仕様:

提供:だいち(ALOS) AVNIR-2 データ(JAXA)

だいち(ALOS) AVNIR-2 の仕様はこちら

https://www.eorc.jaxa.jp/ALOS/jp/alos/sensor/avnir2_j.htm

defmodule TellusTraveler do
  @base_path "https://www.tellusxdp.com/api/traveler/v1"
  @data_path "#{@base_path}/datasets"

  defp get_headers(token) do
    %{
      "Authorization" => "Bearer #{token}",
      "Content-Type" => "application/json"
    }
  end

  def get_data_files(token, dataset_id, data_id) do
    url = "#{@data_path}/#{dataset_id}/data/#{data_id}/files/"
    headers = get_headers(token)

    url
    |> Req.get!(headers: headers)
    |> then(& &1.body["results"])
  end

  defp get_data_file_download_url(token, dataset_id, data_id, file_id) do
    url = "#{@data_path}/#{dataset_id}/data/#{data_id}/files/#{file_id}/download-url/"
    headers = get_headers(token)

    url
    |> Req.post!(headers: headers)
    |> then(& &1.body["download_url"])
  end

  def download(token, dataset_id, scene_id, dist \\ "/tmp/") do
    [dist, scene_id]
    |> Path.join()
    |> File.mkdir_p()

    token
    |> get_data_files(dataset_id, scene_id)
    |> Enum.map(fn file ->
      file_path = Path.join([dist, scene_id, file["name"]])

      unless File.exists?(file_path) do
        token
        |> get_data_file_download_url(dataset_id, scene_id, file["id"])
        |> Req.get!(into: File.stream!(file_path))
      end

      file_path
    end)
  end
end
dataset_id = "ea71ef6e-9569-49fc-be16-ba98d876fb73"
scene_id = "bbf4a49f-2440-4b0f-a964-b64b7c7deacc"
# Tellus のトークンを入力する
token_input = Kino.Input.password("Token")
file_path_list =
  token_input
  |> Kino.Input.read()
  |> TellusTraveler.download(dataset_id, scene_id)
get_band_tensor = fn file_path_list, prefix ->
  file_path_list
  |> Enum.find(fn file ->
    file
    |> Path.basename()
    |> String.starts_with?(prefix)
  end)
  |> Evision.imread(flags: Evision.Constant.cv_IMREAD_GRAYSCALE())
  |> Evision.Mat.to_nx(EXLA.Backend)
end
blue_tensor = get_band_tensor.(file_path_list, "IMG-01")
green_tensor = get_band_tensor.(file_path_list, "IMG-02")
red_tensor = get_band_tensor.(file_path_list, "IMG-03")

rgb_tensor = Nx.stack([red_tensor, green_tensor, blue_tensor], axis: 2)
rgb_tensor
|> Evision.Mat.from_nx_2d()
|> Evision.resize({640, 640})
|> Evision.cvtColor(Evision.Constant.cv_COLOR_RGB2BGR())
tiled_tensors =
  rgb_tensor[[0..-1//1, 0..7199//1, 0..-1//1]]
  # 水平に分割
  |> Nx.to_batched(500)
  # 垂直に分割
  |> Enum.map(&Nx.transpose(&1, axes: [1, 0, 2]))
  |> Enum.flat_map(&Nx.to_batched(&1, 450))
  |> Enum.map(&Nx.transpose(&1, axes: [1, 0, 2]))
  |> Enum.map(fn tensor ->
    tensor
    |> Evision.Mat.from_nx_2d()
    |> Evision.resize({224, 224})
    |> Evision.Mat.to_nx(EXLA.Backend)
  end)
tiled_tensors
|> Enum.map(fn tensor ->
  tensor
  |> Evision.Mat.from_nx_2d()
  |> Evision.resize({40, 40})
  |> Evision.Mat.to_nx()
  |> Kino.Image.new()
end)
|> Kino.Layout.grid(columns: 16)

Connect to server

server_node_inputs =
  ["A", "B"]
  |> Enum.into(%{}, fn node_id ->
    {
      node_id,
      %{
        node: Kino.Input.text("SERVER_#{node_id}_NODE_NAME"),
        cookie: Kino.Input.text("SERVER_#{node_id}_COOKIE")
      }
    }
  end)

server_node_inputs
|> Enum.map(fn {_, inputs} ->
  [inputs.node, inputs.cookie]
end)
|> List.flatten()
|> Kino.Layout.grid(columns: 2)
server_node_inputs
|> Enum.map(fn {_, inputs} ->
  node_name =
    inputs.node
    |> Kino.Input.read()
    |> String.to_atom()

  cookie =
    inputs.cookie
    |> Kino.Input.read()
    |> String.to_atom()

  Node.set_cookie(node_name, cookie)

  Node.connect(node_name)
end)

Batch prediction with server

batch_size = 4
predictions =
  tiled_tensors
  |> Enum.slice(0..15)
  |> Enum.with_index()
  |> Enum.chunk_every(batch_size)
  |> Enum.map(fn batch ->
    tensor_list = Enum.map(batch, &elem(&1, 0))
    index_list = Enum.map(batch, &elem(&1, 1))

    ImageClassification
    |> Nx.Serving.batched_run(tensor_list)
    |> Enum.zip(index_list)
    |> Enum.map(fn {prediction, index} ->
      Map.put_new(prediction, :index, index)
    end)
  end)
  |> List.flatten()
tiled_tensors
|> Enum.zip(predictions)
|> Enum.map(fn {tensor, prediction} ->
  [
    Kino.Markdown.new(prediction.predicted_class),
    tensor
    |> Evision.Mat.from_nx_2d()
    |> Evision.resize({100, 100})
    |> Evision.Mat.to_nx()
    |> Kino.Image.new()
  ]
  |> Kino.Layout.grid()
end)
|> Kino.Layout.grid(columns: 8)
predictions =
  tiled_tensors
  |> Enum.with_index()
  |> Enum.chunk_every(batch_size)
  |> Flow.from_enumerable(stages: 4, max_demand: 4)
  |> Flow.map(fn batch ->
    tensor_list = Enum.map(batch, &elem(&1, 0))
    index_list = Enum.map(batch, &elem(&1, 1))

    index_list
    |> Enum.at(0)
    |> IO.inspect()

    ImageClassification
    |> Nx.Serving.batched_run(tensor_list)
    |> Enum.zip(index_list)
    |> Enum.map(fn {prediction, index} ->
      Map.put_new(prediction, :index, index)
    end)
  end)
  |> Enum.to_list()
  |> List.flatten()
  |> Enum.sort(&amp;(&amp;1.index < &amp;2.index))
color_map = %{
  "clear" => [0, 0, 255],
  "cloudy" => [255, 255, 255]
}
color_map
|> Map.get("clear")
|> Nx.tensor(type: :u8)
|> Nx.broadcast({40, 40, 3})
|> Kino.Image.new()
[
  rgb_tensor
  |> Evision.Mat.from_nx_2d()
  |> Evision.resize({640, 640})
  |> Evision.Mat.to_nx()
  |> Kino.Image.new(),
  predictions
  |> Enum.map(fn prediction ->
    color_map
    |> Map.get(prediction.predicted_class)
    |> Nx.tensor(type: :u8)
    |> Nx.broadcast({40, 40, 3})
    |> Kino.Image.new()
  end)
  |> Kino.Layout.grid(columns: 16)
]
|> Kino.Layout.grid(columns: 2)