Distributed Image Processing Main
Mix.install(
[
{:nx, "~> 0.9"},
{:evision, "~> 0.2"},
{:flow, "~> 1.2"},
{:req, "~> 0.5"},
{:kino, "~> 0.14"},
{:benchee, "~> 1.3"}
]
)
What to do in this notebook
Distributed processing of 32 images and comparison of speeds on 1, 2, and 4 nodes.
Other nodes
Get the image
# Download the image
img_path = "rwakabay.jpg"
Req.get!(
"https://www.elixirconf.eu/assets/images/ryo-wakabayashi.jpg",
output: File.stream!(img_path)
)
img_mat = Evision.imread(img_path)
How to process an image between nodes
Convert an image to binary.
This is to exchange images between nodes.
img_tensor = Evision.Mat.to_nx(img_mat)
type = Nx.type(img_tensor)
shape = Nx.shape(img_tensor)
binary = Nx.to_binary(img_tensor)
{type, shape, binary}
In this notebook, I will threshold the image and draw the node name.
This is to specify which node the image was processed at.
dst_binary =
binary
# Reconstruction of an image
|> Nx.from_binary(type)
|> Nx.reshape(shape)
|> Evision.Mat.from_nx_2d()
# Thresholding
|> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
|> elem(1)
# Draw the node name
|> Evision.putText(
Node.self() |> Atom.to_string(),
{10, 30},
Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
1.0,
{0, 0, 0},
[{:thickness, 2}]
)
|> Evision.Mat.to_nx()
|> Nx.to_binary()
Convert the processing results returned in binary to a matrix.
Node name of this notebook is drawn in the image.
dst_img_mat =
dst_binary
|> Nx.from_binary(type)
|> Nx.reshape(shape)
|> Evision.Mat.from_nx_2d()
Node Connection
Check the connections between notebooks as nodes
Node.list(:connected)
Currently this notebook is only connected to the Livebook process.
Now let’s connect to a worker notebook.
Input the worker 1 node name from worker 1 notebook.
worker_1_input = Kino.Input.text("WORKER_1_NODE_NAME")
worker_1_atom =
worker_1_input
|> Kino.Input.read()
|> String.to_atom()
Node.connect(worker_1_atom)
Node.list(:connected)
Worker 1 node name has been added to the list.
Disconnect worker 1 because we will connect later.
Node.disconnect(worker_1_atom)
Node.list(:connected)
Worker 1 node name has been removed from the list.
Copy images
Copy images for distributed processing
file_ext = Path.extname(img_path)
file_basename = Path.basename(img_path, file_ext)
{file_basename, file_ext}
# Number of copies
copy_count = 32
file_path_list =
img_mat
|> List.duplicate(copy_count)
|> Enum.with_index()
|> Enum.map(fn {copied_img_mat, index} ->
filename = "#{file_basename}_d_#{index}#{file_ext}"
Evision.imwrite(filename, copied_img_mat)
filename
end)
# Display the first 6 copied files
file_path_list
|> Enum.slice(0..5)
|> Enum.map(fn filename ->
img = Evision.imread(filename)
[filename, img]
|> Kino.Layout.grid(columns: 1)
end)
|> Kino.Layout.grid(columns: 3)
Define the processing module
Define image processing as a module for distributed processing.
Send images as binaries from the main node (this notebook) to each worker.
Workes convert the binary back to an image and returns the result as binaries to the main.
Main saves the returned binaries as each images.
defmodule DistributedImageProcessing do
def distribute(workers, images_stream) do
# Connect to worker nodes
Enum.each(workers, &Node.connect/1)
# Generate a stream of worker nodes
worker_stream =
Stream.repeatedly(fn -> workers end)
|> Stream.flat_map(& &1)
sender_pid = self()
worker_stream
|> Stream.zip(images_stream)
|> Flow.from_enumerable(stages: 4, max_demand: 1)
|> Flow.map(fn {worker, image} ->
IO.puts("enter spawn_link")
{
Node.spawn_link(worker, fn ->
# Receive the image from the main
receive do
{:img, sender_pid, img} ->
# Perform image processing
{dst_file, img} = process_image(img)
# Send the processed image to the main
binary = Nx.to_binary(img)
shape = Nx.shape(img)
type = Nx.type(img)
send(sender_pid, {dst_file, type, shape, binary})
IO.puts("respond")
end
end),
image
}
end)
|> Flow.map(fn {pid, src_file} ->
IO.puts("enter reader")
img =
src_file
|> Evision.imread()
|> Evision.Mat.to_nx()
# Send the image to the worker
binary = Nx.to_binary(img)
shape = Nx.shape(img)
type = Nx.type(img)
send(pid, {:img, sender_pid, {src_file, type, shape, binary}})
end)
|> Enum.to_list()
|> Enum.map(fn _ ->
IO.puts("enter receiver")
receive do
{dst_file, type, shape, binary} ->
save_image({dst_file, type, shape, binary})
end
end)
|> Enum.to_list()
end
def process_image({src_file, type, shape, binary}) do
IO.puts("enter processor #{Node.self()}")
src_file_ext = Path.extname(src_file)
src_file_basename = Path.basename(src_file, src_file_ext)
dst_file = "#{src_file_basename}_processed#{src_file_ext}"
dst_img =
binary
# Reconstruction of an image
|> Nx.from_binary(type)
|> Nx.reshape(shape)
|> Evision.Mat.from_nx_2d()
# Thresholding
|> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
|> elem(1)
# Draw the node name
|> Evision.putText(
Node.self() |> Atom.to_string(),
{10, 30},
Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
1.0,
{0, 0, 0},
[{:thickness, 2}]
)
|> Evision.Mat.to_nx()
{dst_file, dst_img}
end
def save_image({dst_file, type, shape, binary}) do
img =
binary
|> Nx.from_binary(type)
|> Nx.reshape(shape)
|> Evision.Mat.from_nx_2d()
Evision.imwrite(dst_file, img)
dst_file
end
end
Process one image on own node
{img_path, type, shape, binary}
|> DistributedImageProcessing.process_image()
|> then(fn {dst_file, dst_img} ->
{dst_file, Nx.type(dst_img), Nx.shape(dst_img), Nx.to_binary(dst_img)}
end)
|> DistributedImageProcessing.save_image()
|> Evision.imread()
Process one image on one worker node
[worker_1_atom]
|> DistributedImageProcessing.distribute([img_path])
|> Enum.at(0)
|> Evision.imread()
Worker 1 node name is drawn in the image.
Process 32 images on one worker node
# Get files that exist
images_stream =
Stream.unfold(0, fn counter -> {counter, counter + 1} end)
|> Stream.map(&"#{file_basename}_d_#{&1}#{file_ext}")
|> Stream.take_while(fn filename -> File.exists?(filename) end)
[worker_1_atom]
|> DistributedImageProcessing.distribute(images_stream)
|> Enum.map(&Evision.imread(&1))
|> Kino.Layout.grid(columns: 4)
Worker 1 node name is drawn in all images.
Process 32 images on 4 worker nodes
worker_2_input = Kino.Input.text("WORKER_2_NODE_NAME")
worker_3_input = Kino.Input.text("WORKER_3_NODE_NAME")
worker_4_input = Kino.Input.text("WORKER_4_NODE_NAME")
workers =
[worker_1_input, worker_2_input, worker_3_input, worker_4_input]
|> Enum.map(fn input ->
input
|> Kino.Input.read()
|> String.to_atom()
end)
workers
|> DistributedImageProcessing.distribute(images_stream)
|> Enum.map(&Evision.imread(&1))
|> Kino.Layout.grid(columns: 4)
Node name in images are disparate, indicating that they were processed in a distributed processing.
Speed Comparison
Prepare a function to process a given list of workers.
distributed = fn worker_input_list ->
worker_input_list
|> Enum.map(fn input ->
input
|> Kino.Input.read()
|> String.to_atom()
end)
|> DistributedImageProcessing.distribute(images_stream)
end
Compare speeds with different numbers of workers.
Benchee.run(%{
"1 worker" => fn -> distributed.([worker_1_input]) end,
"2 workers" => fn -> distributed.([worker_1_input, worker_2_input]) end,
"4 workers" => fn ->
distributed.([worker_1_input, worker_2_input, worker_3_input, worker_4_input])
end
})
In reality, the distributed processing was not effective because it was running on a single machine.
In the future, I would like to actually run distributed processing on multiple machines.