Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Distributed Image Processing Example

distributed_image_processing.livemd

Distributed Image Processing Example

Introduction

This example demonstrates using Handoff for distributed image processing across multiple Erlang nodes. The pipeline loads images, applies various transformations, and creates a collage of the results. Each operation can be distributed across the available nodes based on resource requirements.

Setup

First, let’s set up the environment:

Mix.install([
  {:handoff, "~> 0.1"},
  {:image, "~> 0.14"} # For actual image processing
])

Image Transformations Module

This module simulates image processing operations. In a real application, you would use actual image processing libraries.

defmodule ImageTransformations do
  @moduledoc """
  Image transformation functions for the distributed processing example.

  This module provides functions that would be used in a real image processing
  pipeline. For simplicity, this example doesn't require actual image libraries
  and uses placeholder functions.

  In a real application, you would use libraries like:
  - https://hexdocs.pm/image/readme.html
  - https://github.com/elixir-nx/nx for tensor operations
  """

  @doc """
  Simulates loading an image from disk.
  In a real application, this would load actual image data.
  """
  def load_image(path) do
    # Simulate loading time
    Process.sleep(100)

    # Return a simulated image structure
    %{
      path: path,
      filename: Path.basename(path),
      width: Enum.random(800..1200),
      height: Enum.random(600..900),
      data: :crypto.strong_rand_bytes(100),  # Simulated image data
      format: Enum.random([:jpg, :png, :gif]),
      metadata: %{loaded_on: DateTime.utc_now()}
    }
  end

  @doc """
  Simulates resizing an image to the specified dimensions.
  """
  def resize_image(image, width, height) do
    # Simulate CPU-intensive operation
    Process.sleep(300)

    %{image |
      width: width,
      height: height,
      metadata: Map.put(image.metadata, :resized, true)
    }
  end

  @doc """
  Simulates applying a blur filter to an image.
  """
  def blur_image(image, radius \\ 5) do
    # Simulate processing time based on image size and blur radius
    processing_time = div(image.width * image.height, 100_000) * radius
    Process.sleep(processing_time)

    %{image |
      metadata: Map.put(image.metadata, :blur_applied, radius)
    }
  end

  @doc """
  Simulates converting an image to grayscale.
  """
  def grayscale(image) do
    # Simulate processing time
    Process.sleep(200)

    %{image |
      metadata: Map.put(image.metadata, :grayscale, true)
    }
  end

  @doc """
  Simulates applying a sepia filter.
  """
  def sepia(image) do
    # Simulate processing time
    Process.sleep(250)

    %{image |
      metadata: Map.put(image.metadata, :sepia, true)
    }
  end

  @doc """
  Simulates edge detection on an image.
  """
  def edge_detection(image) do
    # Simulate GPU-intensive operation
    Process.sleep(400)

    %{image |
      metadata: Map.put(image.metadata, :edge_detection, true)
    }
  end

  @doc """
  Simulates creating a thumbnail of an image.
  """
  def create_thumbnail(image, size \\ 150) do
    # Simulate processing
    Process.sleep(150)

    %{image |
      width: size,
      height: size,
      metadata: Map.put(image.metadata, :thumbnail, true)
    }
  end

  @doc """
  Simulates creating a collage from multiple images.
  """
  def create_collage(images, options \\ []) do
    # Simulate complex operation
    Process.sleep(500 + length(images) * 50)

    %{
      type: :collage,
      source_images: length(images),
      width: Keyword.get(options, :width, 1200),
      height: Keyword.get(options, :height, 800),
      data: :crypto.strong_rand_bytes(200),
      created_at: DateTime.utc_now()
    }
  end

  @doc """
  Simulates saving an image to disk.
  """
  def save_image(image, output_path) do
    # Simulate disk I/O
    Process.sleep(200)

    # Return path where image was "saved"
    %{
      original: image,
      saved_path: output_path,
      timestamp: DateTime.utc_now()
    }
  end

  @doc """
  Simulates image quality assessment.
  """
  def assess_quality(image) do
    # Simulate analysis
    Process.sleep(300)

    metrics = %{
      sharpness: :rand.uniform() * 10,
      noise_level: :rand.uniform() * 5,
      exposure: :rand.uniform() * 10,
      color_balance: :rand.uniform() * 10
    }

    {image, metrics}
  end
end

Node Setup and Discovery

> Note: This section requires running multiple nodes. If you’re running this in Livebook, you can skip to the “Pipeline Implementation” section and run it in single-node mode.

To run as a true distributed system, start multiple nodes in separate terminals:

# Terminal 1
iex --name node1@127.0.0.1 -S mix

# Terminal 2
iex --name node2@127.0.0.1 -S mix

# Terminal 3 (optional)
iex --name node3@127.0.0.1 -S mix

Then connect the nodes from node1:

# Run this in node1
Node.connect(:"node2@127.0.0.1")
Node.connect(:"node3@127.0.0.1")
Node.list()

Pipeline Implementation

defmodule DistributedImageProcessing do
  @moduledoc """
  Example of distributed image processing using Handoff.
  """

  alias Handoff.{Function, DAG, DistributedExecutor}
  require Logger

  # Task implementation for loading images
  def load_images_task(image_paths) do
    IO.puts("Loading #{length(image_paths)} images...")
    # In a real scenario, ensure paths are accessible by the executing node
    image_paths |> Enum.map(&{&1, ImageTransformations.load_image(&1)}) |> Enum.into(%{})
  end

  # Task implementation for resizing images
  def resize_images_task(images_map, width, height) do
    IO.puts("Resizing images to #{width}x#{height}...")
    images_map |> Map.new(fn {path, img} ->
      {path, ImageTransformations.resize_image(img, width, height)}
    end)
  end

  # Task implementation for applying a single image transformation to all images in a map
  def apply_image_effect_task(images_map, effect_name, transformation_fun) do
    IO.puts("Applying #{effect_name} effect...")
    images_map |> Map.new(fn {path, img} -> {path, transformation_fun.(img)} end)
  end

  def apply_image_effect_task(images_map, effect_name, transformation_fun, extra_fun_arg) do
    IO.puts("Applying #{effect_name} effect with arg #{extra_fun_arg}...")
    images_map |> Map.new(fn {path, img} -> {path, transformation_fun.(img, extra_fun_arg)} end)
  end

  # Task implementation for quality assessment
  def quality_assessment_task(grayscale_results, sepia_results, blur_results, edge_results) do
    IO.puts("Assessing image quality across effects...")
    all_effects = %{
      grayscale: grayscale_results,
      sepia: sepia_results,
      blur: blur_results,
      edge: edge_results
    }
    all_effects
    |> Enum.flat_map(fn {effect, images} ->
      images |> Enum.map(fn {path, img} ->
        {"#{effect}_#{Path.basename(path)}", ImageTransformations.assess_quality(img)}
      end)
    end)
    |> Enum.into(%{})
  end

  # Task implementation for creating a collage
  def create_collage_task(grayscale_imgs, sepia_imgs, blur_imgs, edge_imgs, collage_opts) do
    IO.puts("Creating collage from all effects...")
    all_images =
      Map.values(grayscale_imgs)
      |> Enum.concat(Map.values(sepia_imgs))
      |> Enum.concat(Map.values(blur_imgs))
      |> Enum.concat(Map.values(edge_imgs))
    ImageTransformations.create_collage(all_images, collage_opts)
  end

  # Task implementation for saving multiple images
  def save_images_task(images_map, output_prefix) do
    IO.puts("Saving images with prefix '#{output_prefix}'...")
    images_map |> Map.new(fn {path, img} ->
      output_path = "output/#{output_prefix}_#{Path.basename(path)}" # Ensure 'output' dir exists or handle errors
      {path, ImageTransformations.save_image(img, output_path)}
    end)
  end

  # Task implementation for saving a single image (like a collage)
  def save_single_image_task(image_data, base_filename) do
    IO.puts("Saving final image as '#{base_filename}'...")
    # Ensure 'output' dir exists or handle errors. Appending timestamp to avoid overwrites.
    output_path = "output/#{base_filename}_#{DateTime.utc_now() |> DateTime.to_unix()}.jpg"
    ImageTransformations.save_image(image_data, output_path)
  end

  @doc """
  Build the image processing pipeline DAG.
  """
  def build_pipeline(image_paths) do
    dag = DAG.new(:distributed_img_proc_dag) # Give the DAG an ID

    # 1. Load images function
    load_images_fn = %Function{
      id: :load_images,
      args: [],
      code: &DistributedImageProcessing.load_images_task/1,
      extra_args: [image_paths], # Pass image_paths explicitly
      cost: %{cpu: 1, memory: 500}  # Light CPU, moderate memory
    }

    # 2. Resize images function
    resize_fn = %Function{
      id: :resize_images,
      args: [:load_images], # Result is images_map
      code: &DistributedImageProcessing.resize_images_task/3,
      extra_args: [800, 600], # width, height
      cost: %{cpu: 2, memory: 1000}  # Moderate CPU
    }

    # 3a. Apply grayscale effect
    grayscale_fn = %Function{
      id: :grayscale_effect,
      args: [:resize_images],
      code: &DistributedImageProcessing.apply_image_effect_task/3,
      extra_args: [:grayscale, &ImageTransformations.grayscale/1],
      cost: %{cpu: 2, memory: 800}  # Moderate CPU
    }

    # 3b. Apply sepia effect (parallel branch)
    sepia_fn = %Function{
      id: :sepia_effect,
      args: [:resize_images],
      code: &DistributedImageProcessing.apply_image_effect_task/3,
      extra_args: [:sepia, &ImageTransformations.sepia/1],
      cost: %{cpu: 2, memory: 800}  # Moderate CPU
    }

    # 3c. Apply blur effect (parallel branch)
    blur_fn = %Function{
      id: :blur_effect,
      args: [:resize_images],
      code: &DistributedImageProcessing.apply_image_effect_task/4, # Using 4-arity version for blur radius
      extra_args: [:blur, &ImageTransformations.blur_image/2, 5], # effect_name, fun, radius
      cost: %{cpu: 3, memory: 1200}  # Higher CPU, more memory
    }

    # 3d. Apply edge detection (GPU intensive)
    edge_fn = %Function{
      id: :edge_detection,
      args: [:resize_images],
      code: &DistributedImageProcessing.apply_image_effect_task/3,
      extra_args: [:edge_detection, &ImageTransformations.edge_detection/1],
      cost: %{cpu: 2, memory: 1500, gpu: 1}  # Needs GPU
    }

    # 4. Create thumbnails
    thumbnail_fn = %Function{
      id: :create_thumbnails,
      args: [:resize_images],
      code: &DistributedImageProcessing.apply_image_effect_task/4, # Using 4-arity for thumbnail size
      extra_args: [:thumbnail, &ImageTransformations.create_thumbnail/2, 150], # effect_name, fun, size
      cost: %{cpu: 1, memory: 500}  # Light work
    }

    # 5. Quality assessment on different effects
    quality_assessment_fn = %Function{
      id: :quality_report,
      args: [:grayscale_effect, :sepia_effect, :blur_effect, :edge_detection],
      code: &DistributedImageProcessing.quality_assessment_task/4,
      # extra_args: [], # No extra_args beyond dependencies
      cost: %{cpu: 4, memory: 2000}  # CPU intensive analysis
    }

    # 6. Create collage from effects
    collage_fn = %Function{
      id: :create_collage,
      args: [:grayscale_effect, :sepia_effect, :blur_effect, :edge_detection],
      code: &DistributedImageProcessing.create_collage_task/5,
      extra_args: [[width: 1600, height: 1200]], # collage_opts passed as last arg
      cost: %{cpu: 4, memory: 4000}  # Heavy CPU and memory
    }

    # 7. Save thumbnails
    save_thumbnails_fn = %Function{
      id: :save_thumbnails,
      args: [:create_thumbnails], # Result is a map of {path, thumbnail_img_data}
      code: &DistributedImageProcessing.save_images_task/2,
      extra_args: ["thumb"], # output_prefix
      cost: %{cpu: 1, memory: 500}  # I/O bound
    }

    # 8. Save collage
    save_collage_fn = %Function{
      id: :save_collage,
      args: [:create_collage], # Result is the collage image data
      code: &DistributedImageProcessing.save_single_image_task/2,
      extra_args: ["final_collage"], # base_filename
      cost: %{cpu: 1, memory: 500}  # I/O bound
    }

    # Build the DAG
    dag
    |> DAG.add_function(load_images_fn)
    |> DAG.add_function(resize_fn)
    |> DAG.add_function(grayscale_fn)
    |> DAG.add_function(sepia_fn)
    |> DAG.add_function(blur_fn)
    |> DAG.add_function(edge_fn)
    |> DAG.add_function(thumbnail_fn)
    |> DAG.add_function(quality_assessment_fn)
    |> DAG.add_function(collage_fn)
    |> DAG.add_function(save_thumbnails_fn)
    |> DAG.add_function(save_collage_fn)
  end

  @doc """
  Run the distributed image processing pipeline.
  """
  def run do
    # Start Handoff application if not already running
    if GenServer.whereis(DistributedExecutor) == nil do
      {:ok, _pid} = Handoff.start_link()
      Logger.info("Handoff Application started.")
    else
      Logger.info("Handoff Application already running.")
    end

    # Register the local node with capabilities
    # In a real multi-node setup, each node would do this.
    local_node_caps = %{cpu: 8, memory: 16000, gpu: 1}
    DistributedExecutor.register_local_node(local_node_caps)
    Logger.info("Registered local node with capabilities: #{inspect(local_node_caps)}")

    # Discover other nodes in the cluster (optional, but good for distributed runs)
    case DistributedExecutor.discover_nodes() do
      {:ok, discovered_nodes} ->
        Logger.info("Discovered nodes: #{inspect Map.keys(discovered_nodes)}")
      {:error, reason} ->
        Logger.error("Node discovery failed: #{inspect(reason)}")
    end

    # Simulate image paths (would be real paths in a production app)
    # For this example, we'll create some dummy files if they don't exist
    # to make ImageTransformations.load_image slightly more realistic.
    image_dir = "temp_images"
    File.mkdir_p(image_dir)
    image_paths = Enum.map(1..5, fn i ->
      path = Path.join(image_dir, "image_#{i}.jpg")
      # Create an empty file if it doesn't exist for placeholder loading
      unless File.exists?(path), do: File.write(path, "")
      path
    end)
    IO.puts("Processing #{length(image_paths)} images from '#{image_dir}' directory...")

    # Create a DAG for image processing
    dag = build_pipeline(image_paths)

    # Validate the DAG
    case DAG.validate(dag) do
      :ok ->
        IO.puts("\nExecuting distributed image processing pipeline...\n")

        # Execute the DAG across all available nodes
        case DistributedExecutor.execute(dag,
          allocation_strategy: :cost_optimized,
          max_retries: 2
        ) do
          {:ok, exec_results} -> # Renamed to exec_results to avoid conflict
            # Show the saved collage path
            collage_output = exec_results.results[:save_collage] # Access actual results map
            IO.puts("\nPipeline completed successfully!")
            IO.puts("Collage saved to: #{collage_output.saved_path}")

            # Show quality metrics
            quality_metrics = exec_results.results[:quality_report] # Access actual results map
            IO.puts("\nQuality metrics:")
            Enum.each(quality_metrics, fn {image_id, {_img, report}} ->
              # Assuming image_id is descriptive enough, removed function_id reference
              IO.puts("  #{image_id}: " <>
                "Sharpness: #{Float.round(report.sharpness, 1)}, " <>
                "Noise: #{Float.round(report.noise_level, 1)}")
            end)

            # Show node allocation statistics
            IO.puts("\nNode allocation:")
            # Access allocations from exec_results, not from dag.functions
            node_counts =
              exec_results.allocations
              |> Enum.reduce(%{}, fn {_k, node}, acc -> Map.update(acc, node, 1, &amp;(&amp;1 + 1)) end)

            Enum.each(node_counts, fn {node, count} ->
              IO.puts("  #{node}: #{count} functions")
            end)

            {:ok, exec_results.results} # Return the inner results map for consistency if needed

          {:error, reason} ->
            IO.puts("Error executing pipeline: #{inspect(reason)}")
            {:error, reason}
        end

      {:error, reason} ->
        IO.puts("Invalid DAG: #{inspect(reason)}")
        {:error, reason}
    end
  end
end

Running the Pipeline

You can execute the pipeline with:

# Run in distributed mode (requires multiple connected nodes)
DistributedImageProcessing.run()

# Or just build without executing to inspect the DAG
image_paths = Enum.map(1..5, fn i -> "/path/to/image_#{i}.jpg" end)
dag = DistributedImageProcessing.build_pipeline(image_paths)

Visualizing the DAG

In a full implementation, you might add a visualization of the DAG. Here’s a placeholder for that:

# Pseudocode for DAG visualization (not implemented in this example)
# dag |> Handoff.Visualization.generate_graph() |> Handoff.Visualization.render()

Key Concepts Demonstrated

  • Distributed execution across nodes
  • Resource-aware function allocation
  • Complex DAG with multiple parallel branches
  • Resource requirements and allocation
  • Load balancing and cost optimization
  • Quality assessment and results aggregation