分散処理 メイン
Mix.install([
{:nx, "~> 0.9"},
{:evision, "~> 0.2"},
{:flow, "~> 1.2"},
{:req, "~> 0.5"},
{:kino, "~> 0.14"},
{:benchee, "~> 1.3"}
])
他のノード
画像の取得
# 指定したパスにダウンロードする
img_path = "rwakabay.jpg"
Req.get!(
"https://www.elixirconf.eu/assets/images/ryo-wakabayashi.jpg",
output: File.stream!(img_path)
)
img_mat = Evision.imread(img_path)
ノード間で画像処理する方法
ノード間でデータをやり取りするため、バイナリにする
img_tensor = Evision.Mat.to_nx(img_mat)
type = Nx.type(img_tensor)
shape = Nx.shape(img_tensor)
binary = Nx.to_binary(img_tensor)
{type, shape, binary}
二つの画像処理を実行する
- 閾値処理
- ノード名の描画
dst_binary =
binary
# 画像の再構築
|> Nx.from_binary(type)
|> Nx.reshape(shape)
|> Evision.Mat.from_nx_2d()
# 閾値処理
|> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
|> elem(1)
# ノード名を描画
|> Evision.putText(
Node.self() |> Atom.to_string(),
{10, 30},
Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
1.0,
{0, 0, 0},
[{:thickness, 2}]
)
|> Evision.Mat.to_nx()
|> Nx.to_binary()
バイナリで返ってきた処理結果をマトリックスに戻す
dst_img_mat =
dst_binary
|> Nx.from_binary(type)
|> Nx.reshape(shape)
|> Evision.Mat.from_nx_2d()
ノード接続
Node.list(:connected)
worker_1_input = Kino.Input.text("WORKER_1_NODE_NAME")
worker_1_atom =
worker_1_input
|> Kino.Input.read()
|> String.to_atom()
Node.connect(worker_1_atom)
Node.list(:connected)
Node.disconnect(worker_1_atom)
Node.list(:connected)
画像をコピー
file_ext = Path.extname(img_path)
file_basename = Path.basename(img_path, file_ext)
{file_basename, file_ext}
# コピー枚数
copy_count = 32
src_file_paths =
img_mat
|> List.duplicate(copy_count)
|> Enum.with_index()
|> Enum.map(fn {copied_img_mat, index} ->
filename = "#{file_basename}_d_#{index}#{file_ext}"
Evision.imwrite(filename, copied_img_mat)
filename
end)
処理の定義
分散処理するため、処理内容をモジュールとして定義する
defmodule DistributedImageProcessing do
def distribute(workers, images_stream) do
# ワーカーノードに接続する
Enum.each(workers, &Node.connect/1)
# ワーカーストリームを生成する
worker_stream =
Stream.repeatedly(fn -> workers end)
|> Stream.flat_map(& &1)
sender_pid = self()
worker_stream
|> Stream.zip(images_stream)
|> Flow.from_enumerable(stages: 4, max_demand: 1)
|> Flow.map(fn {worker, image} ->
IO.puts("enter spawn_link")
{
Node.spawn_link(worker, fn ->
# ワーカーがメインから画像を受け取る
receive do
{:img, sender_pid, img} ->
# 画像処理を呼び出す
{dst_file, img} = process_image(img)
# 画像をバイナリ、形、型としてメインに送る
binary = Nx.to_binary(img)
shape = Nx.shape(img)
type = Nx.type(img)
send(sender_pid, {dst_file, type, shape, binary})
IO.puts("respond")
end
end),
image
}
end)
|> Flow.map(fn {pid, src_file} ->
IO.puts("enter reader")
img =
src_file
|> Evision.imread()
|> Evision.Mat.to_nx()
# 画像をバイナリ、形、型としてワーカーに送る
binary = Nx.to_binary(img)
shape = Nx.shape(img)
type = Nx.type(img)
send(pid, {:img, sender_pid, {src_file, type, shape, binary}})
end)
|> Enum.to_list()
|> Enum.map(fn _ ->
IO.puts("enter receiver")
receive do
{dst_file, type, shape, binary} ->
save_image({dst_file, type, shape, binary})
end
end)
|> Enum.to_list()
end
def process_image({src_file, type, shape, binary}) do
IO.puts("enter processor #{Node.self()}")
src_file_ext = Path.extname(src_file)
src_file_basename = Path.basename(src_file, src_file_ext)
dst_file = "#{src_file_basename}_processed#{src_file_ext}"
dst_img =
binary
# 画像の再構築
|> Nx.from_binary(type)
|> Nx.reshape(shape)
|> Evision.Mat.from_nx_2d()
# 閾値処理
|> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
|> elem(1)
# ノード名を描画
|> Evision.putText(
Node.self() |> Atom.to_string(),
{10, 30},
Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
1.0,
{0, 0, 0},
[{:thickness, 2}]
)
|> Evision.Mat.to_nx()
{dst_file, dst_img}
end
def save_image({dst_file, type, shape, binary}) do
img =
binary
|> Nx.from_binary(type)
|> Nx.reshape(shape)
|> Evision.Mat.from_nx_2d()
Evision.imwrite(dst_file, img)
dst_file
end
end
自身のノードで1つだけ実行
{img_path, type, shape, binary}
|> DistributedImageProcessing.process_image()
|> then(fn {dst_file, dst_img} ->
{dst_file, Nx.type(dst_img), Nx.shape(dst_img), Nx.to_binary(dst_img)}
end)
|> DistributedImageProcessing.save_image()
|> Evision.imread()
ワーカーノード1つで1枚だけ実行
[worker_1_atom]
|> DistributedImageProcessing.distribute([img_path])
|> Enum.at(0)
|> Evision.imread()
ワーカーノード1つで32枚実行
# 存在するファイルを取得
images_stream =
Stream.unfold(0, fn counter -> {counter, counter + 1} end)
|> Stream.map(&"#{file_basename}_d_#{&1}#{file_ext}")
|> Stream.take_while(fn filename -> File.exists?(filename) end)
[worker_1_atom]
|> DistributedImageProcessing.distribute(images_stream)
|> Enum.map(&Evision.imread(&1))
|> Kino.Layout.grid(columns: 4)
ワーカーノード4つで分散処理
worker_2_input = Kino.Input.text("WORKER_2_NODE_NAME")
worker_3_input = Kino.Input.text("WORKER_3_NODE_NAME")
worker_4_input = Kino.Input.text("WORKER_4_NODE_NAME")
workers =
[worker_1_input, worker_2_input, worker_3_input, worker_4_input]
|> Enum.map(fn input ->
input
|> Kino.Input.read()
|> String.to_atom()
end)
workers
|> DistributedImageProcessing.distribute(images_stream)
|> Enum.map(&Evision.imread(&1))
|> Kino.Layout.grid(columns: 4)
速度比較
distributed = fn worker_input_list ->
worker_input_list
|> Enum.map(fn input ->
input
|> Kino.Input.read()
|> String.to_atom()
end)
|> DistributedImageProcessing.distribute(images_stream)
end
Benchee.run(%{
"1 worker" => fn -> distributed.([worker_1_input]) end,
"2 workers" => fn -> distributed.([worker_1_input, worker_2_input]) end,
"4 workers" => fn ->
distributed.([worker_1_input, worker_2_input, worker_3_input, worker_4_input])
end
})