Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

分散処理 メイン

distributed_main.livemd

分散処理 メイン

Mix.install([
  {:nx, "~> 0.9"},
  {:evision, "~> 0.2"},
  {:flow, "~> 1.2"},
  {:req, "~> 0.5"},
  {:kino, "~> 0.14"},
  {:benchee, "~> 1.3"}
])

他のノード

画像の取得

# 指定したパスにダウンロードする
img_path = "rwakabay.jpg"

Req.get!(
  "https://www.elixirconf.eu/assets/images/ryo-wakabayashi.jpg",
  output: File.stream!(img_path)
)

img_mat = Evision.imread(img_path)

ノード間で画像処理する方法

ノード間でデータをやり取りするため、バイナリにする

img_tensor = Evision.Mat.to_nx(img_mat)

type = Nx.type(img_tensor)
shape = Nx.shape(img_tensor)
binary = Nx.to_binary(img_tensor)

{type, shape, binary}

二つの画像処理を実行する

  • 閾値処理
  • ノード名の描画
dst_binary =
  binary
  # 画像の再構築
  |> Nx.from_binary(type)
  |> Nx.reshape(shape)
  |> Evision.Mat.from_nx_2d()
  # 閾値処理
  |> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
  |> elem(1)
  # ノード名を描画
  |> Evision.putText(
    Node.self() |> Atom.to_string(),
    {10, 30},
    Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
    1.0,
    {0, 0, 0},
    [{:thickness, 2}]
  )
  |> Evision.Mat.to_nx()
  |> Nx.to_binary()

バイナリで返ってきた処理結果をマトリックスに戻す

dst_img_mat =
  dst_binary
  |> Nx.from_binary(type)
  |> Nx.reshape(shape)
  |> Evision.Mat.from_nx_2d()

ノード接続

Node.list(:connected)
worker_1_input = Kino.Input.text("WORKER_1_NODE_NAME")
worker_1_atom =
  worker_1_input
  |> Kino.Input.read()
  |> String.to_atom()
Node.connect(worker_1_atom)
Node.list(:connected)
Node.disconnect(worker_1_atom)
Node.list(:connected)

画像をコピー

file_ext = Path.extname(img_path)
file_basename = Path.basename(img_path, file_ext)

{file_basename, file_ext}
# コピー枚数
copy_count = 32

src_file_paths =
  img_mat
  |> List.duplicate(copy_count)
  |> Enum.with_index()
  |> Enum.map(fn {copied_img_mat, index} ->
    filename = "#{file_basename}_d_#{index}#{file_ext}"

    Evision.imwrite(filename, copied_img_mat)

    filename
  end)

処理の定義

分散処理するため、処理内容をモジュールとして定義する

defmodule DistributedImageProcessing do
  def distribute(workers, images_stream) do
    # ワーカーノードに接続する
    Enum.each(workers, &Node.connect/1)

    # ワーカーストリームを生成する
    worker_stream =
      Stream.repeatedly(fn -> workers end)
      |> Stream.flat_map(& &1)

    sender_pid = self()

    worker_stream
    |> Stream.zip(images_stream)
    |> Flow.from_enumerable(stages: 4, max_demand: 1)
    |> Flow.map(fn {worker, image} ->
      IO.puts("enter spawn_link")

      {
        Node.spawn_link(worker, fn ->
          # ワーカーがメインから画像を受け取る
          receive do
            {:img, sender_pid, img} ->
              # 画像処理を呼び出す
              {dst_file, img} = process_image(img)

              # 画像をバイナリ、形、型としてメインに送る
              binary = Nx.to_binary(img)
              shape = Nx.shape(img)
              type = Nx.type(img)

              send(sender_pid, {dst_file, type, shape, binary})
              IO.puts("respond")
          end
        end),
        image
      }
    end)
    |> Flow.map(fn {pid, src_file} ->
      IO.puts("enter reader")

      img =
        src_file
        |> Evision.imread()
        |> Evision.Mat.to_nx()

      # 画像をバイナリ、形、型としてワーカーに送る
      binary = Nx.to_binary(img)
      shape = Nx.shape(img)
      type = Nx.type(img)

      send(pid, {:img, sender_pid, {src_file, type, shape, binary}})
    end)
    |> Enum.to_list()
    |> Enum.map(fn _ ->
      IO.puts("enter receiver")

      receive do
        {dst_file, type, shape, binary} ->
          save_image({dst_file, type, shape, binary})
      end
    end)
    |> Enum.to_list()
  end

  def process_image({src_file, type, shape, binary}) do
    IO.puts("enter processor #{Node.self()}")

    src_file_ext = Path.extname(src_file)
    src_file_basename = Path.basename(src_file, src_file_ext)
    dst_file = "#{src_file_basename}_processed#{src_file_ext}"

    dst_img =
      binary
      # 画像の再構築
      |> Nx.from_binary(type)
      |> Nx.reshape(shape)
      |> Evision.Mat.from_nx_2d()
      # 閾値処理
      |> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
      |> elem(1)
      # ノード名を描画
      |> Evision.putText(
        Node.self() |> Atom.to_string(),
        {10, 30},
        Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
        1.0,
        {0, 0, 0},
        [{:thickness, 2}]
      )
      |> Evision.Mat.to_nx()

    {dst_file, dst_img}
  end

  def save_image({dst_file, type, shape, binary}) do
    img =
      binary
      |> Nx.from_binary(type)
      |> Nx.reshape(shape)
      |> Evision.Mat.from_nx_2d()

    Evision.imwrite(dst_file, img)

    dst_file
  end
end

自身のノードで1つだけ実行

{img_path, type, shape, binary}
|> DistributedImageProcessing.process_image()
|> then(fn {dst_file, dst_img} ->
  {dst_file, Nx.type(dst_img), Nx.shape(dst_img), Nx.to_binary(dst_img)}
end)
|> DistributedImageProcessing.save_image()
|> Evision.imread()

ワーカーノード1つで1枚だけ実行

[worker_1_atom]
|> DistributedImageProcessing.distribute([img_path])
|> Enum.at(0)
|> Evision.imread()

ワーカーノード1つで32枚実行

# 存在するファイルを取得
images_stream =
  Stream.unfold(0, fn counter -> {counter, counter + 1} end)
  |> Stream.map(&"#{file_basename}_d_#{&1}#{file_ext}")
  |> Stream.take_while(fn filename -> File.exists?(filename) end)
[worker_1_atom]
|> DistributedImageProcessing.distribute(images_stream)
|> Enum.map(&Evision.imread(&1))
|> Kino.Layout.grid(columns: 4)

ワーカーノード4つで分散処理

worker_2_input = Kino.Input.text("WORKER_2_NODE_NAME")
worker_3_input = Kino.Input.text("WORKER_3_NODE_NAME")
worker_4_input = Kino.Input.text("WORKER_4_NODE_NAME")
workers =
  [worker_1_input, worker_2_input, worker_3_input, worker_4_input]
  |> Enum.map(fn input ->
    input
    |> Kino.Input.read()
    |> String.to_atom()
  end)
workers
|> DistributedImageProcessing.distribute(images_stream)
|> Enum.map(&Evision.imread(&1))
|> Kino.Layout.grid(columns: 4)

速度比較

distributed = fn worker_input_list ->
  worker_input_list
  |> Enum.map(fn input ->
    input
    |> Kino.Input.read()
    |> String.to_atom()
  end)
  |> DistributedImageProcessing.distribute(images_stream)
end
Benchee.run(%{
  "1 worker" => fn -> distributed.([worker_1_input]) end,
  "2 workers" => fn -> distributed.([worker_1_input, worker_2_input]) end,
  "4 workers" => fn ->
    distributed.([worker_1_input, worker_2_input, worker_3_input, worker_4_input])
  end
})