Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Distributed Image Processing Worker 3

distributed_worker_3_en.livemd

Distributed Image Processing Worker 3

Mix.install(
  [
    {:nx, "~> 0.9"},
    {:evision, "~> 0.2"},
    {:flow, "~> 1.2"},
    {:kino, "~> 0.14"}
  ]
)

Get the node name

Node.self()

Define the processing module

Same as the main node

defmodule DistributedImageProcessing do
  def distribute(workers, images_stream) do
    # Connect to worker nodes
    Enum.each(workers, &Node.connect/1)

    # Generate a stream of worker nodes
    worker_stream =
      Stream.repeatedly(fn -> workers end)
      |> Stream.flat_map(& &1)

    sender_pid = self()

    worker_stream
    |> Stream.zip(images_stream)
    |> Flow.from_enumerable(stages: 4, max_demand: 1)
    |> Flow.map(fn {worker, image} ->
      IO.puts("enter spawn_link")

      {
        Node.spawn_link(worker, fn ->
          # Receive the image from the main
          receive do
            {:img, sender_pid, img} ->
              # Perform image processing
              {dst_file, img} = process_image(img)

              # Send the processed image to the main
              binary = Nx.to_binary(img)
              shape = Nx.shape(img)
              type = Nx.type(img)

              send(sender_pid, {dst_file, type, shape, binary})
              IO.puts("respond")
          end
        end),
        image
      }
    end)
    |> Flow.map(fn {pid, src_file} ->
      IO.puts("enter reader")

      img =
        src_file
        |> Evision.imread()
        |> Evision.Mat.to_nx()

      # Send the image to the worker
      binary = Nx.to_binary(img)
      shape = Nx.shape(img)
      type = Nx.type(img)

      send(pid, {:img, sender_pid, {src_file, type, shape, binary}})
    end)
    |> Enum.to_list()
    |> Enum.map(fn _ ->
      IO.puts("enter receiver")

      receive do
        {dst_file, type, shape, binary} ->
          save_image({dst_file, type, shape, binary})
      end
    end)
    |> Enum.to_list()
  end

  def process_image({src_file, type, shape, binary}) do
    IO.puts("enter processor #{Node.self()}")

    src_file_ext = Path.extname(src_file)
    src_file_basename = Path.basename(src_file, src_file_ext)
    dst_file = "#{src_file_basename}_processed#{src_file_ext}"

    dst_img =
      binary
      # Reconstruction of an image
      |> Nx.from_binary(type)
      |> Nx.reshape(shape)
      |> Evision.Mat.from_nx_2d()
      # Thresholding
      |> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
      |> elem(1)
      # Draw the node name
      |> Evision.putText(
        Node.self() |> Atom.to_string(),
        {10, 30},
        Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
        1.0,
        {0, 0, 0},
        [{:thickness, 2}]
      )
      |> Evision.Mat.to_nx()

    {dst_file, dst_img}
  end

  def save_image({dst_file, type, shape, binary}) do
    img =
      binary
      |> Nx.from_binary(type)
      |> Nx.reshape(shape)
      |> Evision.Mat.from_nx_2d()

    Evision.imwrite(dst_file, img)

    dst_file
  end
end