Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Distributed Image Processing Main

distributed_main_en.livemd

Distributed Image Processing Main

Mix.install(
  [
    {:nx, "~> 0.6"},
    {:evision, "~> 0.1"},
    {:flow, "~> 1.2"},
    {:req, "~> 0.3"},
    {:kino, "~> 0.12"},
    {:benchee, "~> 1.1"}
  ]
)

What to do in this notebook

Distributed processing of 32 images and comparison of speeds on 1, 2, and 4 nodes.

Other nodes

Get the image

# Download the image
img_path = "rwakabay.png"

Req.get!(
  "https://www.elixirconf.eu/assets/images/ryo-wakabayashi.png",
  output: img_path
)

img_mat = Evision.imread(img_path)

How to process an image between nodes

Convert an image to binary.

This is to exchange images between nodes.

img_tensor = Evision.Mat.to_nx(img_mat)

type = Nx.type(img_tensor)
shape = Nx.shape(img_tensor)
binary = Nx.to_binary(img_tensor)

{type, shape, binary}

In this notebook, I will threshold the image and draw the node name.

This is to specify which node the image was processed at.

dst_binary =
  binary
  # Reconstruction of an image
  |> Nx.from_binary(type)
  |> Nx.reshape(shape)
  |> Evision.Mat.from_nx_2d()
  # Thresholding
  |> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
  |> elem(1)
  # Draw the node name
  |> Evision.putText(
    Node.self() |> Atom.to_string(),
    {10, 30},
    Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
    1.0,
    {0, 0, 0},
    [{:thickness, 2}]
  )
  |> Evision.Mat.to_nx()
  |> Nx.to_binary()

Convert the processing results returned in binary to a matrix.

Node name of this notebook is drawn in the image.

dst_img_mat =
  dst_binary
  |> Nx.from_binary(type)
  |> Nx.reshape(shape)
  |> Evision.Mat.from_nx_2d()

Node Connection

Check the connections between notebooks as nodes

Node.list(:connected)

Currently this notebook is only connected to the Livebook process.

Now let’s connect to a worker notebook.

Input the worker 1 node name from worker 1 notebook.

worker_1_input = Kino.Input.text("WORKER_1_NODE_NAME")
worker_1_atom =
  worker_1_input
  |> Kino.Input.read()
  |> String.to_atom()
Node.connect(worker_1_atom)
Node.list(:connected)

Worker 1 node name has been added to the list.

Disconnect worker 1 because we will connect later.

Node.disconnect(worker_1_atom)
Node.list(:connected)

Worker 1 node name has been removed from the list.

Copy images

Copy images for distributed processing

file_ext = Path.extname(img_path)
file_basename = Path.basename(img_path, file_ext)

{file_basename, file_ext}
# Number of copies
copy_count = 32

file_path_list =
  img_mat
  |> List.duplicate(copy_count)
  |> Enum.with_index()
  |> Enum.map(fn {copied_img_mat, index} ->
    filename = "#{file_basename}_d_#{index}#{file_ext}"

    Evision.imwrite(filename, copied_img_mat)

    filename
  end)
# Display the first 6 copied files
file_path_list
|> Enum.slice(0..5)
|> Enum.map(fn filename ->
  img = Evision.imread(filename)

  [filename, img]
  |> Kino.Layout.grid(columns: 1)
end)
|> Kino.Layout.grid(columns: 3)

Define the processing module

Define image processing as a module for distributed processing.

Send images as binaries from the main node (this notebook) to each worker.

Workes convert the binary back to an image and returns the result as binaries to the main.

Main saves the returned binaries as each images.

defmodule DistributedImageProcessing do
  def distribute(workers, images_stream) do
    # Connect to worker nodes
    Enum.each(workers, &Node.connect/1)

    # Generate a stream of worker nodes
    worker_stream =
      Stream.repeatedly(fn -> workers end)
      |> Stream.flat_map(& &1)

    sender_pid = self()

    worker_stream
    |> Stream.zip(images_stream)
    |> Flow.from_enumerable(stages: 4, max_demand: 1)
    |> Flow.map(fn {worker, image} ->
      IO.puts("enter spawn_link")

      {
        Node.spawn_link(worker, fn ->
          # Receive the image from the main
          receive do
            {:img, sender_pid, img} ->
              # Perform image processing
              {dst_file, img} = process_image(img)

              # Send the processed image to the main
              binary = Nx.to_binary(img)
              shape = Nx.shape(img)
              type = Nx.type(img)

              send(sender_pid, {dst_file, type, shape, binary})
              IO.puts("respond")
          end
        end),
        image
      }
    end)
    |> Flow.map(fn {pid, src_file} ->
      IO.puts("enter reader")

      img =
        src_file
        |> Evision.imread()
        |> Evision.Mat.to_nx()

      # Send the image to the worker
      binary = Nx.to_binary(img)
      shape = Nx.shape(img)
      type = Nx.type(img)

      send(pid, {:img, sender_pid, {src_file, type, shape, binary}})
    end)
    |> Enum.to_list()
    |> Enum.map(fn _ ->
      IO.puts("enter receiver")

      receive do
        {dst_file, type, shape, binary} ->
          save_image({dst_file, type, shape, binary})
      end
    end)
    |> Enum.to_list()
  end

  def process_image({src_file, type, shape, binary}) do
    IO.puts("enter processor #{Node.self()}")

    src_file_ext = Path.extname(src_file)
    src_file_basename = Path.basename(src_file, src_file_ext)
    dst_file = "#{src_file_basename}_processed#{src_file_ext}"

    dst_img =
      binary
      # Reconstruction of an image
      |> Nx.from_binary(type)
      |> Nx.reshape(shape)
      |> Evision.Mat.from_nx_2d()
      # Thresholding
      |> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
      |> elem(1)
      # Draw the node name
      |> Evision.putText(
        Node.self() |> Atom.to_string(),
        {10, 30},
        Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
        1.0,
        {0, 0, 0},
        [{:thickness, 2}]
      )
      |> Evision.Mat.to_nx()

    {dst_file, dst_img}
  end

  def save_image({dst_file, type, shape, binary}) do
    img =
      binary
      |> Nx.from_binary(type)
      |> Nx.reshape(shape)
      |> Evision.Mat.from_nx_2d()

    Evision.imwrite(dst_file, img)

    dst_file
  end
end

Process one image on own node

{img_path, type, shape, binary}
|> DistributedImageProcessing.process_image()
|> then(fn {dst_file, dst_img} ->
  {dst_file, Nx.type(dst_img), Nx.shape(dst_img), Nx.to_binary(dst_img)}
end)
|> DistributedImageProcessing.save_image()
|> Evision.imread()

Process one image on one worker node

[worker_1_atom]
|> DistributedImageProcessing.distribute([img_path])
|> Enum.at(0)
|> Evision.imread()

Worker 1 node name is drawn in the image.

Process 32 images on one worker node

# Get files that exist
images_stream =
  Stream.unfold(0, fn counter -> {counter, counter + 1} end)
  |> Stream.map(&"#{file_basename}_d_#{&1}#{file_ext}")
  |> Stream.take_while(fn filename -> File.exists?(filename) end)
[worker_1_atom]
|> DistributedImageProcessing.distribute(images_stream)
|> Enum.map(&Evision.imread(&1))
|> Kino.Layout.grid(columns: 4)

Worker 1 node name is drawn in all images.

Process 32 images on 4 worker nodes

worker_2_input = Kino.Input.text("WORKER_2_NODE_NAME")
worker_3_input = Kino.Input.text("WORKER_3_NODE_NAME")
worker_4_input = Kino.Input.text("WORKER_4_NODE_NAME")
workers =
  [worker_1_input, worker_2_input, worker_3_input, worker_4_input]
  |> Enum.map(fn input ->
    input
    |> Kino.Input.read()
    |> String.to_atom()
  end)
workers
|> DistributedImageProcessing.distribute(images_stream)
|> Enum.map(&Evision.imread(&1))
|> Kino.Layout.grid(columns: 4)

Node name in images are disparate, indicating that they were processed in a distributed processing.

Speed Comparison

Prepare a function to process a given list of workers.

distributed = fn worker_input_list ->
  worker_input_list
  |> Enum.map(fn input ->
    input
    |> Kino.Input.read()
    |> String.to_atom()
  end)
  |> DistributedImageProcessing.distribute(images_stream)
end

Compare speeds with different numbers of workers.

Benchee.run(%{
  "1 worker" => fn -> distributed.([worker_1_input]) end,
  "2 workers" => fn -> distributed.([worker_1_input, worker_2_input]) end,
  "4 workers" => fn ->
    distributed.([worker_1_input, worker_2_input, worker_3_input, worker_4_input])
  end
})

In reality, the distributed processing was not effective because it was running on a single machine.

In the future, I would like to actually run distributed processing on multiple machines.