Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Parallel Image Processing

parallel_image_processing_en.livemd

Parallel Image Processing

Mix.install([
  {:nx, "~> 0.9"},
  {:evision, "~> 0.2"},
  {:flow, "~> 1.2"},
  {:req, "~> 0.5"},
  {:kino, "~> 0.14"},
  {:benchee, "~> 1.3"}
])

Modules

The following two modules have been added.

  • Flow: Parallel Processing in Elixir
  • Benchee: Benchmark in Elixir

What to do in this notebook

Process 128 images in parallel and compare the speeds of sequential, one-parallel, two-parallel, four-parallel, and eight-parallel processing.

Get the image

# Download the image
img_path = "rwakabay.jpg"

Req.get!(
  "https://www.elixirconf.eu/assets/images/ryo-wakabayashi.jpg",
  output: File.stream!(img_path)
)

img_mat = Evision.imread(img_path)

Single image processing

Let’s process only one image first.

In this notebook, I will threshold the image and draw some rectangles and ellipses.

proc = fn mat ->
  mat
  # Thresholding
  |> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
  |> elem(1)
  # Draw many shapes
  |> Evision.rectangle({50, 10}, {125, 60}, {255, 0, 0})
  |> Evision.rectangle({250, 60}, {325, 110}, {0, 255, 0}, thickness: -1)
  |> Evision.rectangle({150, 120}, {225, 320}, {0, 0, 255},
    thickness: 5,
    lineType: Evision.Constant.cv_LINE_4()
  )
  |> Evision.ellipse({300, 300}, {100, 200}, 30, 0, 360, {255, 255, 0}, thickness: 3)
end
proc.(img_mat)

Copy images

Copy images for parallel processing.

file_ext = Path.extname(img_path)
file_basename = Path.basename(img_path, file_ext)

{file_basename, file_ext}
# Number og copies
copy_count = 128

file_path_list =
  img_mat
  |> List.duplicate(copy_count)
  |> Enum.with_index()
  |> Enum.map(fn {copied_img_mat, index} ->
    filename = "#{file_basename}_p_#{index}#{file_ext}"

    Evision.imwrite(filename, copied_img_mat)

    filename
  end)
# Display the first 6 copied files
file_path_list
|> Enum.slice(0..5)
|> Enum.map(fn filename ->
  img = Evision.imread(filename)

  [filename, img]
  |> Kino.Layout.grid(columns: 1)
end)
|> Kino.Layout.grid(columns: 3)

Sequential Processing

Let’s try sequential processing with Enum.map

# Get the list of existing files
stream =
  Stream.unfold(0, fn counter -> {counter, counter + 1} end)
  |> Stream.map(&{&1, "#{file_basename}_p_#{&1}#{file_ext}"})
  |> Stream.take_while(fn {_, filename} -> File.exists?(filename) end)
# Process with Enum.map
enum_proc = fn stream ->
  stream
  |> Enum.map(fn {_, filename} ->
    {
      filename,
      filename |> Evision.imread() |> proc.()
    }
  end)
end
img_tuple_list = enum_proc.(stream)

All images processed.

# Display the first 6 copied files
img_tuple_list
|> Enum.slice(0..5)
|> Enum.map(fn {filename, img} ->
  [filename, img]
  |> Kino.Layout.grid(columns: 1)
end)
|> Kino.Layout.grid(columns: 3)

Parallel Processing

Next, let’s try parallel processing with Flow.map.

Replace Enum.map to Flow.map.

And, insert Flow.from_enumerable before Flow.map.

Set the number of parallels for stages.

max_demand is set to 1 so that Flow processes all images in parallel.

If max_demand is not set, Flow will group targets for sequential processing, and then process each group in parallel.

# Process with Flow.map
flow_proc = fn stream, stages ->
  stream
  |> Flow.from_enumerable(stages: stages, max_demand: 1)
  |> Flow.map(fn {_, filename} ->
    {
      filename,
      filename |> Evision.imread() |> proc.()
    }
  end)
  |> Enum.to_list()
end
img_tuple_list = flow_proc.(stream, 4)
# Display the first 6 copied files
img_tuple_list
|> Enum.slice(0..5)
|> Enum.map(fn {filename, img} ->
  [filename, img]
  |> Kino.Layout.grid(columns: 1)
end)
|> Kino.Layout.grid(columns: 3)

Same result as Enum.map

Speed Comparison

Compare speeds with Benchee.

  • Sequential processing with Enum.map
  • Parallel Processing with Flow.map 1 stage
  • Parallel Processing with Flow.map 2 stage
  • Parallel Processing with Flow.map 4 stage
  • Parallel Processing with Flow.map 8 stage
Benchee.run(%{
  "enum" => fn -> enum_proc.(stream) end,
  "flow 1" => fn -> flow_proc.(stream, 1) end,
  "flow 2" => fn -> flow_proc.(stream, 2) end,
  "flow 4" => fn -> flow_proc.(stream, 4) end,
  "flow 8" => fn -> flow_proc.(stream, 8) end
})

Benchee displays the fastest process and how slow the others.

Enum.map is the slowest and four-parallel with Flow.map is the fastest.

My machine’s CPU has 4 cores, so it doesn’t make much sense if the number of parallelism exceeds this.