Distributed Image Processing Example
Introduction
This example demonstrates using Handoff for distributed image processing across multiple Erlang nodes. The pipeline loads images, applies various transformations, and creates a collage of the results. Each operation can be distributed across the available nodes based on resource requirements.
Setup
First, let’s set up the environment:
Mix.install([
{:handoff, "~> 0.1"},
{:image, "~> 0.14"} # For actual image processing
])
Image Transformations Module
This module simulates image processing operations. In a real application, you would use actual image processing libraries.
defmodule ImageTransformations do
@moduledoc """
Image transformation functions for the distributed processing example.
This module provides functions that would be used in a real image processing
pipeline. For simplicity, this example doesn't require actual image libraries
and uses placeholder functions.
In a real application, you would use libraries like:
- https://hexdocs.pm/image/readme.html
- https://github.com/elixir-nx/nx for tensor operations
"""
@doc """
Simulates loading an image from disk.
In a real application, this would load actual image data.
"""
def load_image(path) do
# Simulate loading time
Process.sleep(100)
# Return a simulated image structure
%{
path: path,
filename: Path.basename(path),
width: Enum.random(800..1200),
height: Enum.random(600..900),
data: :crypto.strong_rand_bytes(100), # Simulated image data
format: Enum.random([:jpg, :png, :gif]),
metadata: %{loaded_on: DateTime.utc_now()}
}
end
@doc """
Simulates resizing an image to the specified dimensions.
"""
def resize_image(image, width, height) do
# Simulate CPU-intensive operation
Process.sleep(300)
%{image |
width: width,
height: height,
metadata: Map.put(image.metadata, :resized, true)
}
end
@doc """
Simulates applying a blur filter to an image.
"""
def blur_image(image, radius \\ 5) do
# Simulate processing time based on image size and blur radius
processing_time = div(image.width * image.height, 100_000) * radius
Process.sleep(processing_time)
%{image |
metadata: Map.put(image.metadata, :blur_applied, radius)
}
end
@doc """
Simulates converting an image to grayscale.
"""
def grayscale(image) do
# Simulate processing time
Process.sleep(200)
%{image |
metadata: Map.put(image.metadata, :grayscale, true)
}
end
@doc """
Simulates applying a sepia filter.
"""
def sepia(image) do
# Simulate processing time
Process.sleep(250)
%{image |
metadata: Map.put(image.metadata, :sepia, true)
}
end
@doc """
Simulates edge detection on an image.
"""
def edge_detection(image) do
# Simulate GPU-intensive operation
Process.sleep(400)
%{image |
metadata: Map.put(image.metadata, :edge_detection, true)
}
end
@doc """
Simulates creating a thumbnail of an image.
"""
def create_thumbnail(image, size \\ 150) do
# Simulate processing
Process.sleep(150)
%{image |
width: size,
height: size,
metadata: Map.put(image.metadata, :thumbnail, true)
}
end
@doc """
Simulates creating a collage from multiple images.
"""
def create_collage(images, options \\ []) do
# Simulate complex operation
Process.sleep(500 + length(images) * 50)
%{
type: :collage,
source_images: length(images),
width: Keyword.get(options, :width, 1200),
height: Keyword.get(options, :height, 800),
data: :crypto.strong_rand_bytes(200),
created_at: DateTime.utc_now()
}
end
@doc """
Simulates saving an image to disk.
"""
def save_image(image, output_path) do
# Simulate disk I/O
Process.sleep(200)
# Return path where image was "saved"
%{
original: image,
saved_path: output_path,
timestamp: DateTime.utc_now()
}
end
@doc """
Simulates image quality assessment.
"""
def assess_quality(image) do
# Simulate analysis
Process.sleep(300)
metrics = %{
sharpness: :rand.uniform() * 10,
noise_level: :rand.uniform() * 5,
exposure: :rand.uniform() * 10,
color_balance: :rand.uniform() * 10
}
{image, metrics}
end
end
Node Setup and Discovery
> Note: This section requires running multiple nodes. If you’re running this in Livebook, you can skip to the “Pipeline Implementation” section and run it in single-node mode.
To run as a true distributed system, start multiple nodes in separate terminals:
# Terminal 1
iex --name node1@127.0.0.1 -S mix
# Terminal 2
iex --name node2@127.0.0.1 -S mix
# Terminal 3 (optional)
iex --name node3@127.0.0.1 -S mix
Then connect the nodes from node1:
# Run this in node1
Node.connect(:"node2@127.0.0.1")
Node.connect(:"node3@127.0.0.1")
Node.list()
Pipeline Implementation
defmodule DistributedImageProcessing do
@moduledoc """
Example of distributed image processing using Handoff.
"""
alias Handoff.{Function, DAG, DistributedExecutor}
require Logger
# Task implementation for loading images
def load_images_task(image_paths) do
IO.puts("Loading #{length(image_paths)} images...")
# In a real scenario, ensure paths are accessible by the executing node
image_paths |> Enum.map(&{&1, ImageTransformations.load_image(&1)}) |> Enum.into(%{})
end
# Task implementation for resizing images
def resize_images_task(images_map, width, height) do
IO.puts("Resizing images to #{width}x#{height}...")
images_map |> Map.new(fn {path, img} ->
{path, ImageTransformations.resize_image(img, width, height)}
end)
end
# Task implementation for applying a single image transformation to all images in a map
def apply_image_effect_task(images_map, effect_name, transformation_fun) do
IO.puts("Applying #{effect_name} effect...")
images_map |> Map.new(fn {path, img} -> {path, transformation_fun.(img)} end)
end
def apply_image_effect_task(images_map, effect_name, transformation_fun, extra_fun_arg) do
IO.puts("Applying #{effect_name} effect with arg #{extra_fun_arg}...")
images_map |> Map.new(fn {path, img} -> {path, transformation_fun.(img, extra_fun_arg)} end)
end
# Task implementation for quality assessment
def quality_assessment_task(grayscale_results, sepia_results, blur_results, edge_results) do
IO.puts("Assessing image quality across effects...")
all_effects = %{
grayscale: grayscale_results,
sepia: sepia_results,
blur: blur_results,
edge: edge_results
}
all_effects
|> Enum.flat_map(fn {effect, images} ->
images |> Enum.map(fn {path, img} ->
{"#{effect}_#{Path.basename(path)}", ImageTransformations.assess_quality(img)}
end)
end)
|> Enum.into(%{})
end
# Task implementation for creating a collage
def create_collage_task(grayscale_imgs, sepia_imgs, blur_imgs, edge_imgs, collage_opts) do
IO.puts("Creating collage from all effects...")
all_images =
Map.values(grayscale_imgs)
|> Enum.concat(Map.values(sepia_imgs))
|> Enum.concat(Map.values(blur_imgs))
|> Enum.concat(Map.values(edge_imgs))
ImageTransformations.create_collage(all_images, collage_opts)
end
# Task implementation for saving multiple images
def save_images_task(images_map, output_prefix) do
IO.puts("Saving images with prefix '#{output_prefix}'...")
images_map |> Map.new(fn {path, img} ->
output_path = "output/#{output_prefix}_#{Path.basename(path)}" # Ensure 'output' dir exists or handle errors
{path, ImageTransformations.save_image(img, output_path)}
end)
end
# Task implementation for saving a single image (like a collage)
def save_single_image_task(image_data, base_filename) do
IO.puts("Saving final image as '#{base_filename}'...")
# Ensure 'output' dir exists or handle errors. Appending timestamp to avoid overwrites.
output_path = "output/#{base_filename}_#{DateTime.utc_now() |> DateTime.to_unix()}.jpg"
ImageTransformations.save_image(image_data, output_path)
end
@doc """
Build the image processing pipeline DAG.
"""
def build_pipeline(image_paths) do
dag = DAG.new(:distributed_img_proc_dag) # Give the DAG an ID
# 1. Load images function
load_images_fn = %Function{
id: :load_images,
args: [],
code: &DistributedImageProcessing.load_images_task/1,
extra_args: [image_paths], # Pass image_paths explicitly
cost: %{cpu: 1, memory: 500} # Light CPU, moderate memory
}
# 2. Resize images function
resize_fn = %Function{
id: :resize_images,
args: [:load_images], # Result is images_map
code: &DistributedImageProcessing.resize_images_task/3,
extra_args: [800, 600], # width, height
cost: %{cpu: 2, memory: 1000} # Moderate CPU
}
# 3a. Apply grayscale effect
grayscale_fn = %Function{
id: :grayscale_effect,
args: [:resize_images],
code: &DistributedImageProcessing.apply_image_effect_task/3,
extra_args: [:grayscale, &ImageTransformations.grayscale/1],
cost: %{cpu: 2, memory: 800} # Moderate CPU
}
# 3b. Apply sepia effect (parallel branch)
sepia_fn = %Function{
id: :sepia_effect,
args: [:resize_images],
code: &DistributedImageProcessing.apply_image_effect_task/3,
extra_args: [:sepia, &ImageTransformations.sepia/1],
cost: %{cpu: 2, memory: 800} # Moderate CPU
}
# 3c. Apply blur effect (parallel branch)
blur_fn = %Function{
id: :blur_effect,
args: [:resize_images],
code: &DistributedImageProcessing.apply_image_effect_task/4, # Using 4-arity version for blur radius
extra_args: [:blur, &ImageTransformations.blur_image/2, 5], # effect_name, fun, radius
cost: %{cpu: 3, memory: 1200} # Higher CPU, more memory
}
# 3d. Apply edge detection (GPU intensive)
edge_fn = %Function{
id: :edge_detection,
args: [:resize_images],
code: &DistributedImageProcessing.apply_image_effect_task/3,
extra_args: [:edge_detection, &ImageTransformations.edge_detection/1],
cost: %{cpu: 2, memory: 1500, gpu: 1} # Needs GPU
}
# 4. Create thumbnails
thumbnail_fn = %Function{
id: :create_thumbnails,
args: [:resize_images],
code: &DistributedImageProcessing.apply_image_effect_task/4, # Using 4-arity for thumbnail size
extra_args: [:thumbnail, &ImageTransformations.create_thumbnail/2, 150], # effect_name, fun, size
cost: %{cpu: 1, memory: 500} # Light work
}
# 5. Quality assessment on different effects
quality_assessment_fn = %Function{
id: :quality_report,
args: [:grayscale_effect, :sepia_effect, :blur_effect, :edge_detection],
code: &DistributedImageProcessing.quality_assessment_task/4,
# extra_args: [], # No extra_args beyond dependencies
cost: %{cpu: 4, memory: 2000} # CPU intensive analysis
}
# 6. Create collage from effects
collage_fn = %Function{
id: :create_collage,
args: [:grayscale_effect, :sepia_effect, :blur_effect, :edge_detection],
code: &DistributedImageProcessing.create_collage_task/5,
extra_args: [[width: 1600, height: 1200]], # collage_opts passed as last arg
cost: %{cpu: 4, memory: 4000} # Heavy CPU and memory
}
# 7. Save thumbnails
save_thumbnails_fn = %Function{
id: :save_thumbnails,
args: [:create_thumbnails], # Result is a map of {path, thumbnail_img_data}
code: &DistributedImageProcessing.save_images_task/2,
extra_args: ["thumb"], # output_prefix
cost: %{cpu: 1, memory: 500} # I/O bound
}
# 8. Save collage
save_collage_fn = %Function{
id: :save_collage,
args: [:create_collage], # Result is the collage image data
code: &DistributedImageProcessing.save_single_image_task/2,
extra_args: ["final_collage"], # base_filename
cost: %{cpu: 1, memory: 500} # I/O bound
}
# Build the DAG
dag
|> DAG.add_function(load_images_fn)
|> DAG.add_function(resize_fn)
|> DAG.add_function(grayscale_fn)
|> DAG.add_function(sepia_fn)
|> DAG.add_function(blur_fn)
|> DAG.add_function(edge_fn)
|> DAG.add_function(thumbnail_fn)
|> DAG.add_function(quality_assessment_fn)
|> DAG.add_function(collage_fn)
|> DAG.add_function(save_thumbnails_fn)
|> DAG.add_function(save_collage_fn)
end
@doc """
Run the distributed image processing pipeline.
"""
def run do
# Start Handoff application if not already running
if GenServer.whereis(DistributedExecutor) == nil do
{:ok, _pid} = Handoff.start_link()
Logger.info("Handoff Application started.")
else
Logger.info("Handoff Application already running.")
end
# Register the local node with capabilities
# In a real multi-node setup, each node would do this.
local_node_caps = %{cpu: 8, memory: 16000, gpu: 1}
DistributedExecutor.register_local_node(local_node_caps)
Logger.info("Registered local node with capabilities: #{inspect(local_node_caps)}")
# Discover other nodes in the cluster (optional, but good for distributed runs)
case DistributedExecutor.discover_nodes() do
{:ok, discovered_nodes} ->
Logger.info("Discovered nodes: #{inspect Map.keys(discovered_nodes)}")
{:error, reason} ->
Logger.error("Node discovery failed: #{inspect(reason)}")
end
# Simulate image paths (would be real paths in a production app)
# For this example, we'll create some dummy files if they don't exist
# to make ImageTransformations.load_image slightly more realistic.
image_dir = "temp_images"
File.mkdir_p(image_dir)
image_paths = Enum.map(1..5, fn i ->
path = Path.join(image_dir, "image_#{i}.jpg")
# Create an empty file if it doesn't exist for placeholder loading
unless File.exists?(path), do: File.write(path, "")
path
end)
IO.puts("Processing #{length(image_paths)} images from '#{image_dir}' directory...")
# Create a DAG for image processing
dag = build_pipeline(image_paths)
# Validate the DAG
case DAG.validate(dag) do
:ok ->
IO.puts("\nExecuting distributed image processing pipeline...\n")
# Execute the DAG across all available nodes
case DistributedExecutor.execute(dag,
allocation_strategy: :cost_optimized,
max_retries: 2
) do
{:ok, exec_results} -> # Renamed to exec_results to avoid conflict
# Show the saved collage path
collage_output = exec_results.results[:save_collage] # Access actual results map
IO.puts("\nPipeline completed successfully!")
IO.puts("Collage saved to: #{collage_output.saved_path}")
# Show quality metrics
quality_metrics = exec_results.results[:quality_report] # Access actual results map
IO.puts("\nQuality metrics:")
Enum.each(quality_metrics, fn {image_id, {_img, report}} ->
# Assuming image_id is descriptive enough, removed function_id reference
IO.puts(" #{image_id}: " <>
"Sharpness: #{Float.round(report.sharpness, 1)}, " <>
"Noise: #{Float.round(report.noise_level, 1)}")
end)
# Show node allocation statistics
IO.puts("\nNode allocation:")
# Access allocations from exec_results, not from dag.functions
node_counts =
exec_results.allocations
|> Enum.reduce(%{}, fn {_k, node}, acc -> Map.update(acc, node, 1, &(&1 + 1)) end)
Enum.each(node_counts, fn {node, count} ->
IO.puts(" #{node}: #{count} functions")
end)
{:ok, exec_results.results} # Return the inner results map for consistency if needed
{:error, reason} ->
IO.puts("Error executing pipeline: #{inspect(reason)}")
{:error, reason}
end
{:error, reason} ->
IO.puts("Invalid DAG: #{inspect(reason)}")
{:error, reason}
end
end
end
Running the Pipeline
You can execute the pipeline with:
# Run in distributed mode (requires multiple connected nodes)
DistributedImageProcessing.run()
# Or just build without executing to inspect the DAG
image_paths = Enum.map(1..5, fn i -> "/path/to/image_#{i}.jpg" end)
dag = DistributedImageProcessing.build_pipeline(image_paths)
Visualizing the DAG
In a full implementation, you might add a visualization of the DAG. Here’s a placeholder for that:
# Pseudocode for DAG visualization (not implemented in this example)
# dag |> Handoff.Visualization.generate_graph() |> Handoff.Visualization.render()
Key Concepts Demonstrated
- Distributed execution across nodes
- Resource-aware function allocation
- Complex DAG with multiple parallel branches
- Resource requirements and allocation
- Load balancing and cost optimization
- Quality assessment and results aggregation