Powered by AppSignal & Oban Pro

Traverse & Mapping

livebooks/09_traverse_and_mapping.livemd

Traverse & Mapping

Mix.install(
  [
    {:jido_composer, ">= 0.0.0"},
    {:kino, "~> 0.14"}
  ],
  config: [
    jido_action: [default_timeout: :timer.minutes(5)]
  ]
)

Introduction

MapNode implements the traverse composition constructor — it takes a collection from context and runs a single node against every element. The node: field accepts any Node struct (ActionNode, FanOutNode, AgentNode, HumanNode) or a bare action module (auto-wrapped in ActionNode). Unlike FanOutNode (which runs N different branches known at definition time), MapNode runs one node N times over a collection discovered at runtime.

Aspect FanOutNode MapNode
Branch count Fixed at definition time Determined at runtime
Branch identity Each branch is different All branches are the same
Result shape Named map of results Ordered list of results
Use case Heterogeneous parallel tasks Homogeneous data processing

Part 1: Basic MapNode

First, define a simple action that doubles a numeric value and a generator action that produces a list of items.

defmodule Demo.GenerateAction do
  use Jido.Action,
    name: "generate",
    description: "Generates a list of numeric items",
    schema: [
      count: [type: :integer, required: true, doc: "Number of items to generate"]
    ]

  def run(%{count: count}, _ctx) do
    items = Enum.map(1..count, fn i -> %{value: i} end)
    {:ok, %{items: items}}
  end
end

defmodule Demo.DoubleValueAction do
  use Jido.Action,
    name: "double_value",
    description: "Doubles a numeric value",
    schema: [
      value: [type: :integer, required: true, doc: "Value to double"]
    ]

  def run(%{value: v}, _ctx), do: {:ok, %{value: v * 2}}
end

defmodule Demo.AddTenAction do
  use Jido.Action,
    name: "add_ten",
    description: "Adds ten to a value",
    schema: [value: [type: :integer, required: true]]

  def run(%{value: v}, _ctx), do: {:ok, %{result: v + 10}}
end

# Suppress doctests from Jido.Agent
defmodule Demo.Helpers do
  defmacro suppress_agent_doctests do
    quote do
      @doc false
      def plugins, do: super()
      @doc false
      def capabilities, do: super()
      @doc false
      def signal_types, do: super()
    end
  end
end

IO.puts("Actions defined.")

Top-level key with over: :items

alias Jido.Composer.Node.MapNode

{:ok, map_node} = MapNode.new(
  name: :double,
  over: :items,
  node: Demo.DoubleValueAction
)

# Run directly with a flat context
{:ok, result} = MapNode.run(map_node, %{items: [%{value: 1}, %{value: 2}, %{value: 3}]})

IO.puts("=== MapNode with over: :items ===\n")
IO.puts("Input:  [%{value: 1}, %{value: 2}, %{value: 3}]")
IO.puts("Output: #{inspect(result[:results])}")

Nested path with over: [:generate, :items]

alias Jido.Composer.Node.MapNode

{:ok, map_node} = MapNode.new(
  name: :double,
  over: [:generate, :items],
  node: Demo.DoubleValueAction
)

# Simulate context from a prior "generate" step
context = %{generate: %{items: [%{value: 10}, %{value: 20}]}}
{:ok, result} = MapNode.run(map_node, context)

IO.puts("=== MapNode with over: [:generate, :items] ===\n")
IO.puts("Context path: ctx[:generate][:items]")
IO.puts("Output:       #{inspect(result[:results])}")

MapNode in a Workflow

alias Jido.Composer.Node.MapNode

{:ok, double_node} = MapNode.new(
  name: :double,
  over: [:generate, :items],
  node: Demo.DoubleValueAction
)

defmodule Demo.GenerateDoubleWorkflow do
  @moduledoc false
  use Jido.Composer.Workflow,
    name: "generate_and_double",
    description: "Generate items then double each value",
    nodes: %{
      generate: Demo.GenerateAction,
      double: double_node
    },
    transitions: %{
      {:generate, :ok} => :double,
      {:double, :ok} => :done,
      {:_, :error} => :failed
    },
    initial: :generate,
    terminal_states: [:done, :failed],
    success_states: [:done]

  require Demo.Helpers
  Demo.Helpers.suppress_agent_doctests()
end

agent = Demo.GenerateDoubleWorkflow.new()
{:ok, ctx} = Demo.GenerateDoubleWorkflow.run_sync(agent, %{count: 4})

IO.puts("=== Workflow: Generate → Double ===\n")
IO.puts("Generated items: #{inspect(ctx[:generate][:items])}")
IO.puts("Doubled results: #{inspect(ctx[:double][:results])}")

Part 2: Input Preparation

MapNode handles two types of collection elements differently:

  • Map elements are merged into the node params (the action sees the map’s keys directly)
  • Non-map elements (integers, strings, etc.) are wrapped as %{item: element}
alias Jido.Composer.Node.MapNode

defmodule Demo.InspectParamsAction do
  use Jido.Action,
    name: "inspect_params",
    description: "Returns the params it receives for inspection",
    schema: []

  def run(params, _ctx) do
    # Return only the interesting keys (filter out upstream context noise)
    relevant =
      params
      |> Map.drop([:count, :generate])
      |> Map.reject(fn {_k, v} -> is_map(v) and map_size(v) > 3 end)

    {:ok, relevant}
  end
end

# Map elements — keys merge directly
{:ok, map_node} = MapNode.new(name: :inspect, over: :items, node: Demo.InspectParamsAction)
{:ok, result_maps} = MapNode.run(map_node, %{items: [%{name: "Alice"}, %{name: "Bob"}]})

IO.puts("=== Map Elements ===")
IO.puts("Each element's keys are merged into params:")

for r <- result_maps[:results] do
  IO.puts("  #{inspect(r)}")
end

# Non-map elements — wrapped as %{item: element}
{:ok, result_scalars} = MapNode.run(map_node, %{items: [42, "hello", :world]})

IO.puts("\n=== Non-Map Elements ===")
IO.puts("Each element is wrapped as %{item: element}:")

for r <- result_scalars[:results] do
  IO.puts("  #{inspect(r)}")
end

Part 3: Error Handling

MapNode supports two error policies:

  • :fail_fast (default) — If any element fails, the entire MapNode fails immediately
  • :collect_partial — Continue processing; failed elements appear as {:error, reason} in results
alias Jido.Composer.Node.MapNode

defmodule Demo.MaybeFailAction do
  use Jido.Action,
    name: "maybe_fail",
    description: "Fails on negative values, succeeds otherwise",
    schema: [
      value: [type: :integer, required: true]
    ]

  def run(%{value: v}, _ctx) when v < 0, do: {:error, "negative value: #{v}"}
  def run(%{value: v}, _ctx), do: {:ok, %{squared: v * v}}
end

items = [%{value: 1}, %{value: -2}, %{value: 3}]

# Fail-fast: stops at first error
{:ok, ff_node} = MapNode.new(name: :ff, over: :items, node: Demo.MaybeFailAction, on_error: :fail_fast)
ff_result = MapNode.run(ff_node, %{items: items})

IO.puts("=== Fail-Fast ===")
IO.puts("Result: #{inspect(ff_result)}")

# Collect-partial: continues past errors
{:ok, cp_node} = MapNode.new(name: :cp, over: :items, node: Demo.MaybeFailAction, on_error: :collect_partial)
{:ok, cp_result} = MapNode.run(cp_node, %{items: items})

IO.puts("\n=== Collect-Partial ===")
IO.puts("Results (errors inline):")

for {r, i} <- Enum.with_index(cp_result[:results]) do
  IO.puts("  [#{i}] #{inspect(r)}")
end

Part 4: Backpressure with max_concurrency

By default, MapNode processes all elements concurrently. Use max_concurrency to limit parallel execution — useful for rate-limited APIs or resource-constrained systems.

alias Jido.Composer.Node.MapNode

defmodule Demo.SlowAction do
  use Jido.Action,
    name: "slow_action",
    description: "Simulates a slow operation",
    schema: [
      value: [type: :integer, required: true]
    ]

  def run(%{value: v}, _ctx) do
    Process.sleep(100)
    {:ok, %{processed: v}}
  end
end

items = Enum.map(1..6, fn i -> %{value: i} end)

# Sequential (max_concurrency: 1)
{:ok, seq_node} = MapNode.new(name: :seq, over: :items, node: Demo.SlowAction, max_concurrency: 1)
{seq_us, {:ok, _}} = :timer.tc(fn -> MapNode.run(seq_node, %{items: items}) end)

# Parallel (max_concurrency: 6)
{:ok, par_node} = MapNode.new(name: :par, over: :items, node: Demo.SlowAction, max_concurrency: 6)
{par_us, {:ok, _}} = :timer.tc(fn -> MapNode.run(par_node, %{items: items}) end)

IO.puts("=== Backpressure Comparison (6 items × 100ms each) ===\n")
IO.puts("Sequential (max_concurrency: 1): #{div(seq_us, 1000)}ms")
IO.puts("Parallel   (max_concurrency: 6): #{div(par_us, 1000)}ms")
IO.puts("Speedup: #{Float.round(seq_us / par_us, 1)}x")

Part 5: Node-over-Node Composition

MapNode’s node: field accepts any Node struct, not just actions. This enables powerful composition: map a FanOutNode over a collection to run multiple branches per element.

alias Jido.Composer.Node.{ActionNode, FanOutNode, MapNode}

# Build a FanOutNode that computes double AND add-ten for a single element
{:ok, double_node} = ActionNode.new(Demo.DoubleValueAction)
{:ok, add_ten_node} = ActionNode.new(Demo.AddTenAction)

{:ok, fan_out} = FanOutNode.new(
  name: "compute_both",
  branches: [double: double_node, add_ten: add_ten_node]
)

# Map that FanOutNode over a collection — each element gets BOTH computations
{:ok, map_node} = MapNode.new(
  name: :process,
  over: :items,
  node: fan_out
)

{:ok, result} = MapNode.run(map_node, %{items: [%{value: 5}, %{value: 10}, %{value: 15}]})

IO.puts("=== Node-over-Node: FanOutNode mapped over collection ===\n")

for {r, i} <- Enum.with_index(result[:results]) do
  IO.puts("  Item #{i}: double=#{inspect(r[:double])}, add_ten=#{inspect(r[:add_ten])}")
end

IO.puts("\nEach element got its own FanOutNode with two branches.")
IO.puts("Result shape: ordered list of maps, each map keyed by branch name.")

Part 6: MapNode vs FanOutNode

Here’s the same problem solved both ways to illustrate when each is appropriate.

Problem: Given a value, compute both its double and its square.

alias Jido.Composer.Node.{ActionNode, FanOutNode, MapNode}

defmodule Demo.SquareAction do
  use Jido.Action,
    name: "square",
    description: "Squares a value",
    schema: [value: [type: :integer, required: true]]

  def run(%{value: v}, _ctx), do: {:ok, %{result: v * v}}
end

# FanOutNode: two DIFFERENT actions, fixed branches
{:ok, double_node} = ActionNode.new(Demo.DoubleValueAction)
{:ok, square_node} = ActionNode.new(Demo.SquareAction)

{:ok, fan_out} = FanOutNode.new(
  name: "compute",
  branches: [double: double_node, square: square_node]
)

fan_result = FanOutNode.run(fan_out, %{value: 5}, [])

IO.puts("=== FanOutNode: Two different actions ===")
IO.puts("Double branch: #{inspect(elem(fan_result, 1)[:double])}")
IO.puts("Square branch: #{inspect(elem(fan_result, 1)[:square])}")

# MapNode: ONE node applied to MANY items
{:ok, map_node} = MapNode.new(
  name: :process,
  over: :items,
  node: Demo.DoubleValueAction
)

{:ok, map_result} = MapNode.run(map_node, %{items: [%{value: 1}, %{value: 2}, %{value: 3}]})

IO.puts("\n=== MapNode: One node, many items ===")
IO.puts("Results: #{inspect(map_result[:results])}")

IO.puts("\n=== When to use which ===")
IO.puts("FanOutNode: fixed set of DIFFERENT branches (heterogeneous)")
IO.puts("MapNode:    variable set of SAME operation (homogeneous)")

Next Steps

You’ve learned how to:

  • Map any node over a runtime-determined collection with MapNode
  • Use top-level keys and nested paths with over:
  • Handle map vs non-map element preparation
  • Choose between fail-fast and collect-partial error policies
  • Control concurrency with max_concurrency
  • Compose nodes: map a FanOutNode over a collection (node-over-node)
  • Decide between MapNode and FanOutNode

Next guides:

  • Composition Patterns (livebook 10) — Combining constructors, compile-time vs runtime
  • LLM Orchestrator (livebook 04) — Adaptive tool composition with LLM decisions