Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Branching DAG Distributed Execution Example

branching_distributed_dag.livemd

Branching DAG Distributed Execution Example

Introduction

This Livebook demonstrates how to build and execute a branching (tree-like) DAG using the Handoff library, with distributed execution across nodes. The example is based on the branching DAG from the documentation, showing how a single data source feeds two parallel transformations, which are then aggregated.

Setup

Install dependencies:

Mix.install([
  {:handoff, "~> 0.1"}
])

# Handoff requires fully qualified function captures for :code and extra_args.
defmodule Transformations do
  def inc(x), do: x + 1
  def double(x), do: x * 2
  def sum_two_lists(a, b), do: Enum.sum(a) + Enum.sum(b)
end

DAG Construction

Let’s define a simple branching DAG:

alias Handoff.Function

dag = Handoff.new()

source_fn = %Function{
  id: :input_data,
  args: [],
  code: &Elixir.Function.identity/1,
  extra_args: [[10, 20, 30]]
}

preprocess_a = %Function{
  id: :pre_a,
  args: [:input_data],
  code: &Enum.map/2,
  extra_args: [&Transformations.inc/1],
  cost: %{cpu: 2}
}

preprocess_b = %Function{
  id: :pre_b,
  args: [:input_data],
  code: &Enum.map/2,
  extra_args: [&Transformations.double/1],
  cost: %{cpu: 2}
}

aggregate = %Function{
  id: :agg,
  args: [:pre_a, :pre_b],
  code: &Transformations.sum_two_lists/2,
  cost: %{cpu: 1}
}

dag =
  dag
  |> Handoff.DAG.add_function(source_fn)
  |> Handoff.DAG.add_function(preprocess_a)
  |> Handoff.DAG.add_function(preprocess_b)
  |> Handoff.DAG.add_function(aggregate)

:ok = Handoff.DAG.validate(dag)

DAG Structure Visualization

graph TD;
  input_data --> pre_a;
  input_data --> pre_b;
  pre_a --> agg;
  pre_b --> agg;

Distributed Execution

You can execute the DAG across your cluster. For demonstration, we’ll run it locally, but you can connect multiple nodes for true distributed execution.

# Start Handoff (if not already started)
Application.ensure_all_started(:handoff)

# Register the local node with its capabilities
Handoff.register_local_node(%{cpu: 4, memory: 8000})

# Execute the DAG
distributed_result = Handoff.execute_distributed(dag)

IO.inspect(distributed_result, label: "Distributed Execution Result")

Summary

  • Demonstrated a branching DAG with Handoff
  • Showed how to construct, visualize, and execute the DAG in a distributed fashion
  • For more complex examples, see the distributed image processing Livebook