Distributed Image Processing Worker 3
Mix.install(
[
{:nx, "~> 0.9"},
{:evision, "~> 0.2"},
{:flow, "~> 1.2"},
{:kino, "~> 0.14"}
]
)
Get the node name
Node.self()
Define the processing module
Same as the main node
defmodule DistributedImageProcessing do
def distribute(workers, images_stream) do
# Connect to worker nodes
Enum.each(workers, &Node.connect/1)
# Generate a stream of worker nodes
worker_stream =
Stream.repeatedly(fn -> workers end)
|> Stream.flat_map(& &1)
sender_pid = self()
worker_stream
|> Stream.zip(images_stream)
|> Flow.from_enumerable(stages: 4, max_demand: 1)
|> Flow.map(fn {worker, image} ->
IO.puts("enter spawn_link")
{
Node.spawn_link(worker, fn ->
# Receive the image from the main
receive do
{:img, sender_pid, img} ->
# Perform image processing
{dst_file, img} = process_image(img)
# Send the processed image to the main
binary = Nx.to_binary(img)
shape = Nx.shape(img)
type = Nx.type(img)
send(sender_pid, {dst_file, type, shape, binary})
IO.puts("respond")
end
end),
image
}
end)
|> Flow.map(fn {pid, src_file} ->
IO.puts("enter reader")
img =
src_file
|> Evision.imread()
|> Evision.Mat.to_nx()
# Send the image to the worker
binary = Nx.to_binary(img)
shape = Nx.shape(img)
type = Nx.type(img)
send(pid, {:img, sender_pid, {src_file, type, shape, binary}})
end)
|> Enum.to_list()
|> Enum.map(fn _ ->
IO.puts("enter receiver")
receive do
{dst_file, type, shape, binary} ->
save_image({dst_file, type, shape, binary})
end
end)
|> Enum.to_list()
end
def process_image({src_file, type, shape, binary}) do
IO.puts("enter processor #{Node.self()}")
src_file_ext = Path.extname(src_file)
src_file_basename = Path.basename(src_file, src_file_ext)
dst_file = "#{src_file_basename}_processed#{src_file_ext}"
dst_img =
binary
# Reconstruction of an image
|> Nx.from_binary(type)
|> Nx.reshape(shape)
|> Evision.Mat.from_nx_2d()
# Thresholding
|> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
|> elem(1)
# Draw the node name
|> Evision.putText(
Node.self() |> Atom.to_string(),
{10, 30},
Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
1.0,
{0, 0, 0},
[{:thickness, 2}]
)
|> Evision.Mat.to_nx()
{dst_file, dst_img}
end
def save_image({dst_file, type, shape, binary}) do
img =
binary
|> Nx.from_binary(type)
|> Nx.reshape(shape)
|> Evision.Mat.from_nx_2d()
Evision.imwrite(dst_file, img)
dst_file
end
end