Traverse & Mapping
Mix.install(
[
{:jido_composer, ">= 0.0.0"},
{:kino, "~> 0.14"}
],
config: [
jido_action: [default_timeout: :timer.minutes(5)]
]
)
Introduction
MapNode implements the traverse composition constructor — it takes a collection
from context and runs a single node against every element. The node: field accepts
any Node struct (ActionNode, FanOutNode, AgentNode, HumanNode) or a bare action module
(auto-wrapped in ActionNode). Unlike FanOutNode (which runs N different branches known
at definition time), MapNode runs one node N times over a collection discovered at runtime.
| Aspect | FanOutNode | MapNode |
|---|---|---|
| Branch count | Fixed at definition time | Determined at runtime |
| Branch identity | Each branch is different | All branches are the same |
| Result shape | Named map of results | Ordered list of results |
| Use case | Heterogeneous parallel tasks | Homogeneous data processing |
Part 1: Basic MapNode
First, define a simple action that doubles a numeric value and a generator action that produces a list of items.
defmodule Demo.GenerateAction do
use Jido.Action,
name: "generate",
description: "Generates a list of numeric items",
schema: [
count: [type: :integer, required: true, doc: "Number of items to generate"]
]
def run(%{count: count}, _ctx) do
items = Enum.map(1..count, fn i -> %{value: i} end)
{:ok, %{items: items}}
end
end
defmodule Demo.DoubleValueAction do
use Jido.Action,
name: "double_value",
description: "Doubles a numeric value",
schema: [
value: [type: :integer, required: true, doc: "Value to double"]
]
def run(%{value: v}, _ctx), do: {:ok, %{value: v * 2}}
end
defmodule Demo.AddTenAction do
use Jido.Action,
name: "add_ten",
description: "Adds ten to a value",
schema: [value: [type: :integer, required: true]]
def run(%{value: v}, _ctx), do: {:ok, %{result: v + 10}}
end
# Suppress doctests from Jido.Agent
defmodule Demo.Helpers do
defmacro suppress_agent_doctests do
quote do
@doc false
def plugins, do: super()
@doc false
def capabilities, do: super()
@doc false
def signal_types, do: super()
end
end
end
IO.puts("Actions defined.")
Top-level key with over: :items
alias Jido.Composer.Node.MapNode
{:ok, map_node} = MapNode.new(
name: :double,
over: :items,
node: Demo.DoubleValueAction
)
# Run directly with a flat context
{:ok, result} = MapNode.run(map_node, %{items: [%{value: 1}, %{value: 2}, %{value: 3}]})
IO.puts("=== MapNode with over: :items ===\n")
IO.puts("Input: [%{value: 1}, %{value: 2}, %{value: 3}]")
IO.puts("Output: #{inspect(result[:results])}")
Nested path with over: [:generate, :items]
alias Jido.Composer.Node.MapNode
{:ok, map_node} = MapNode.new(
name: :double,
over: [:generate, :items],
node: Demo.DoubleValueAction
)
# Simulate context from a prior "generate" step
context = %{generate: %{items: [%{value: 10}, %{value: 20}]}}
{:ok, result} = MapNode.run(map_node, context)
IO.puts("=== MapNode with over: [:generate, :items] ===\n")
IO.puts("Context path: ctx[:generate][:items]")
IO.puts("Output: #{inspect(result[:results])}")
MapNode in a Workflow
alias Jido.Composer.Node.MapNode
{:ok, double_node} = MapNode.new(
name: :double,
over: [:generate, :items],
node: Demo.DoubleValueAction
)
defmodule Demo.GenerateDoubleWorkflow do
@moduledoc false
use Jido.Composer.Workflow,
name: "generate_and_double",
description: "Generate items then double each value",
nodes: %{
generate: Demo.GenerateAction,
double: double_node
},
transitions: %{
{:generate, :ok} => :double,
{:double, :ok} => :done,
{:_, :error} => :failed
},
initial: :generate,
terminal_states: [:done, :failed],
success_states: [:done]
require Demo.Helpers
Demo.Helpers.suppress_agent_doctests()
end
agent = Demo.GenerateDoubleWorkflow.new()
{:ok, ctx} = Demo.GenerateDoubleWorkflow.run_sync(agent, %{count: 4})
IO.puts("=== Workflow: Generate → Double ===\n")
IO.puts("Generated items: #{inspect(ctx[:generate][:items])}")
IO.puts("Doubled results: #{inspect(ctx[:double][:results])}")
Part 2: Input Preparation
MapNode handles two types of collection elements differently:
- Map elements are merged into the node params (the action sees the map’s keys directly)
-
Non-map elements (integers, strings, etc.) are wrapped as
%{item: element}
alias Jido.Composer.Node.MapNode
defmodule Demo.InspectParamsAction do
use Jido.Action,
name: "inspect_params",
description: "Returns the params it receives for inspection",
schema: []
def run(params, _ctx) do
# Return only the interesting keys (filter out upstream context noise)
relevant =
params
|> Map.drop([:count, :generate])
|> Map.reject(fn {_k, v} -> is_map(v) and map_size(v) > 3 end)
{:ok, relevant}
end
end
# Map elements — keys merge directly
{:ok, map_node} = MapNode.new(name: :inspect, over: :items, node: Demo.InspectParamsAction)
{:ok, result_maps} = MapNode.run(map_node, %{items: [%{name: "Alice"}, %{name: "Bob"}]})
IO.puts("=== Map Elements ===")
IO.puts("Each element's keys are merged into params:")
for r <- result_maps[:results] do
IO.puts(" #{inspect(r)}")
end
# Non-map elements — wrapped as %{item: element}
{:ok, result_scalars} = MapNode.run(map_node, %{items: [42, "hello", :world]})
IO.puts("\n=== Non-Map Elements ===")
IO.puts("Each element is wrapped as %{item: element}:")
for r <- result_scalars[:results] do
IO.puts(" #{inspect(r)}")
end
Part 3: Error Handling
MapNode supports two error policies:
-
:fail_fast(default) — If any element fails, the entire MapNode fails immediately -
:collect_partial— Continue processing; failed elements appear as{:error, reason}in results
alias Jido.Composer.Node.MapNode
defmodule Demo.MaybeFailAction do
use Jido.Action,
name: "maybe_fail",
description: "Fails on negative values, succeeds otherwise",
schema: [
value: [type: :integer, required: true]
]
def run(%{value: v}, _ctx) when v < 0, do: {:error, "negative value: #{v}"}
def run(%{value: v}, _ctx), do: {:ok, %{squared: v * v}}
end
items = [%{value: 1}, %{value: -2}, %{value: 3}]
# Fail-fast: stops at first error
{:ok, ff_node} = MapNode.new(name: :ff, over: :items, node: Demo.MaybeFailAction, on_error: :fail_fast)
ff_result = MapNode.run(ff_node, %{items: items})
IO.puts("=== Fail-Fast ===")
IO.puts("Result: #{inspect(ff_result)}")
# Collect-partial: continues past errors
{:ok, cp_node} = MapNode.new(name: :cp, over: :items, node: Demo.MaybeFailAction, on_error: :collect_partial)
{:ok, cp_result} = MapNode.run(cp_node, %{items: items})
IO.puts("\n=== Collect-Partial ===")
IO.puts("Results (errors inline):")
for {r, i} <- Enum.with_index(cp_result[:results]) do
IO.puts(" [#{i}] #{inspect(r)}")
end
Part 4: Backpressure with max_concurrency
By default, MapNode processes all elements concurrently. Use max_concurrency to
limit parallel execution — useful for rate-limited APIs or resource-constrained systems.
alias Jido.Composer.Node.MapNode
defmodule Demo.SlowAction do
use Jido.Action,
name: "slow_action",
description: "Simulates a slow operation",
schema: [
value: [type: :integer, required: true]
]
def run(%{value: v}, _ctx) do
Process.sleep(100)
{:ok, %{processed: v}}
end
end
items = Enum.map(1..6, fn i -> %{value: i} end)
# Sequential (max_concurrency: 1)
{:ok, seq_node} = MapNode.new(name: :seq, over: :items, node: Demo.SlowAction, max_concurrency: 1)
{seq_us, {:ok, _}} = :timer.tc(fn -> MapNode.run(seq_node, %{items: items}) end)
# Parallel (max_concurrency: 6)
{:ok, par_node} = MapNode.new(name: :par, over: :items, node: Demo.SlowAction, max_concurrency: 6)
{par_us, {:ok, _}} = :timer.tc(fn -> MapNode.run(par_node, %{items: items}) end)
IO.puts("=== Backpressure Comparison (6 items × 100ms each) ===\n")
IO.puts("Sequential (max_concurrency: 1): #{div(seq_us, 1000)}ms")
IO.puts("Parallel (max_concurrency: 6): #{div(par_us, 1000)}ms")
IO.puts("Speedup: #{Float.round(seq_us / par_us, 1)}x")
Part 5: Node-over-Node Composition
MapNode’s node: field accepts any Node struct, not just actions. This enables
powerful composition: map a FanOutNode over a collection to run multiple branches
per element.
alias Jido.Composer.Node.{ActionNode, FanOutNode, MapNode}
# Build a FanOutNode that computes double AND add-ten for a single element
{:ok, double_node} = ActionNode.new(Demo.DoubleValueAction)
{:ok, add_ten_node} = ActionNode.new(Demo.AddTenAction)
{:ok, fan_out} = FanOutNode.new(
name: "compute_both",
branches: [double: double_node, add_ten: add_ten_node]
)
# Map that FanOutNode over a collection — each element gets BOTH computations
{:ok, map_node} = MapNode.new(
name: :process,
over: :items,
node: fan_out
)
{:ok, result} = MapNode.run(map_node, %{items: [%{value: 5}, %{value: 10}, %{value: 15}]})
IO.puts("=== Node-over-Node: FanOutNode mapped over collection ===\n")
for {r, i} <- Enum.with_index(result[:results]) do
IO.puts(" Item #{i}: double=#{inspect(r[:double])}, add_ten=#{inspect(r[:add_ten])}")
end
IO.puts("\nEach element got its own FanOutNode with two branches.")
IO.puts("Result shape: ordered list of maps, each map keyed by branch name.")
Part 6: MapNode vs FanOutNode
Here’s the same problem solved both ways to illustrate when each is appropriate.
Problem: Given a value, compute both its double and its square.
alias Jido.Composer.Node.{ActionNode, FanOutNode, MapNode}
defmodule Demo.SquareAction do
use Jido.Action,
name: "square",
description: "Squares a value",
schema: [value: [type: :integer, required: true]]
def run(%{value: v}, _ctx), do: {:ok, %{result: v * v}}
end
# FanOutNode: two DIFFERENT actions, fixed branches
{:ok, double_node} = ActionNode.new(Demo.DoubleValueAction)
{:ok, square_node} = ActionNode.new(Demo.SquareAction)
{:ok, fan_out} = FanOutNode.new(
name: "compute",
branches: [double: double_node, square: square_node]
)
fan_result = FanOutNode.run(fan_out, %{value: 5}, [])
IO.puts("=== FanOutNode: Two different actions ===")
IO.puts("Double branch: #{inspect(elem(fan_result, 1)[:double])}")
IO.puts("Square branch: #{inspect(elem(fan_result, 1)[:square])}")
# MapNode: ONE node applied to MANY items
{:ok, map_node} = MapNode.new(
name: :process,
over: :items,
node: Demo.DoubleValueAction
)
{:ok, map_result} = MapNode.run(map_node, %{items: [%{value: 1}, %{value: 2}, %{value: 3}]})
IO.puts("\n=== MapNode: One node, many items ===")
IO.puts("Results: #{inspect(map_result[:results])}")
IO.puts("\n=== When to use which ===")
IO.puts("FanOutNode: fixed set of DIFFERENT branches (heterogeneous)")
IO.puts("MapNode: variable set of SAME operation (homogeneous)")
Next Steps
You’ve learned how to:
- Map any node over a runtime-determined collection with MapNode
-
Use top-level keys and nested paths with
over: - Handle map vs non-map element preparation
- Choose between fail-fast and collect-partial error policies
-
Control concurrency with
max_concurrency - Compose nodes: map a FanOutNode over a collection (node-over-node)
- Decide between MapNode and FanOutNode
Next guides:
- Composition Patterns (livebook 10) — Combining constructors, compile-time vs runtime
- LLM Orchestrator (livebook 04) — Adaptive tool composition with LLM decisions