Parallel Image Processing
Mix.install([
{:nx, "~> 0.9"},
{:evision, "~> 0.2"},
{:flow, "~> 1.2"},
{:req, "~> 0.5"},
{:kino, "~> 0.14"},
{:benchee, "~> 1.3"}
])
Modules
The following two modules have been added.
- Flow: Parallel Processing in Elixir
- Benchee: Benchmark in Elixir
What to do in this notebook
Process 128 images in parallel and compare the speeds of sequential, one-parallel, two-parallel, four-parallel, and eight-parallel processing.
Get the image
# Download the image
img_path = "rwakabay.jpg"
Req.get!(
"https://www.elixirconf.eu/assets/images/ryo-wakabayashi.jpg",
output: File.stream!(img_path)
)
img_mat = Evision.imread(img_path)
Single image processing
Let’s process only one image first.
In this notebook, I will threshold the image and draw some rectangles and ellipses.
proc = fn mat ->
mat
# Thresholding
|> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
|> elem(1)
# Draw many shapes
|> Evision.rectangle({50, 10}, {125, 60}, {255, 0, 0})
|> Evision.rectangle({250, 60}, {325, 110}, {0, 255, 0}, thickness: -1)
|> Evision.rectangle({150, 120}, {225, 320}, {0, 0, 255},
thickness: 5,
lineType: Evision.Constant.cv_LINE_4()
)
|> Evision.ellipse({300, 300}, {100, 200}, 30, 0, 360, {255, 255, 0}, thickness: 3)
end
proc.(img_mat)
Copy images
Copy images for parallel processing.
file_ext = Path.extname(img_path)
file_basename = Path.basename(img_path, file_ext)
{file_basename, file_ext}
# Number og copies
copy_count = 128
file_path_list =
img_mat
|> List.duplicate(copy_count)
|> Enum.with_index()
|> Enum.map(fn {copied_img_mat, index} ->
filename = "#{file_basename}_p_#{index}#{file_ext}"
Evision.imwrite(filename, copied_img_mat)
filename
end)
# Display the first 6 copied files
file_path_list
|> Enum.slice(0..5)
|> Enum.map(fn filename ->
img = Evision.imread(filename)
[filename, img]
|> Kino.Layout.grid(columns: 1)
end)
|> Kino.Layout.grid(columns: 3)
Sequential Processing
Let’s try sequential processing with Enum.map
# Get the list of existing files
stream =
Stream.unfold(0, fn counter -> {counter, counter + 1} end)
|> Stream.map(&{&1, "#{file_basename}_p_#{&1}#{file_ext}"})
|> Stream.take_while(fn {_, filename} -> File.exists?(filename) end)
# Process with Enum.map
enum_proc = fn stream ->
stream
|> Enum.map(fn {_, filename} ->
{
filename,
filename |> Evision.imread() |> proc.()
}
end)
end
img_tuple_list = enum_proc.(stream)
All images processed.
# Display the first 6 copied files
img_tuple_list
|> Enum.slice(0..5)
|> Enum.map(fn {filename, img} ->
[filename, img]
|> Kino.Layout.grid(columns: 1)
end)
|> Kino.Layout.grid(columns: 3)
Parallel Processing
Next, let’s try parallel processing with Flow.map
.
Replace Enum.map
to Flow.map
.
And, insert Flow.from_enumerable
before Flow.map
.
Set the number of parallels for stages
.
max_demand
is set to 1 so that Flow processes all images in parallel.
If max_demand
is not set, Flow will group targets for sequential processing, and then process each group in parallel.
# Process with Flow.map
flow_proc = fn stream, stages ->
stream
|> Flow.from_enumerable(stages: stages, max_demand: 1)
|> Flow.map(fn {_, filename} ->
{
filename,
filename |> Evision.imread() |> proc.()
}
end)
|> Enum.to_list()
end
img_tuple_list = flow_proc.(stream, 4)
# Display the first 6 copied files
img_tuple_list
|> Enum.slice(0..5)
|> Enum.map(fn {filename, img} ->
[filename, img]
|> Kino.Layout.grid(columns: 1)
end)
|> Kino.Layout.grid(columns: 3)
Same result as Enum.map
Speed Comparison
Compare speeds with Benchee.
- Sequential processing with Enum.map
- Parallel Processing with Flow.map 1 stage
- Parallel Processing with Flow.map 2 stage
- Parallel Processing with Flow.map 4 stage
- Parallel Processing with Flow.map 8 stage
Benchee.run(%{
"enum" => fn -> enum_proc.(stream) end,
"flow 1" => fn -> flow_proc.(stream, 1) end,
"flow 2" => fn -> flow_proc.(stream, 2) end,
"flow 4" => fn -> flow_proc.(stream, 4) end,
"flow 8" => fn -> flow_proc.(stream, 8) end
})
Benchee displays the fastest process and how slow the others.
Enum.map
is the slowest and four-parallel with Flow.map
is the fastest.
My machine’s CPU has 4 cores, so it doesn’t make much sense if the number of parallelism exceeds this.