Workflow Authoring
This Livebook focuses on the authoring surface: workflow DSL structure, normalized workflow specs, dependency joins, input mappings, execution, and graph inspection.
It uses in-memory ETS journal storage so you can run it without a host app.
Mix.install([
{:squid_mesh, "~> 0.1.0-beta.3"}
])
Runtime Setup
The runtime options below keep all durable facts inside this Livebook session.
storage = {Jido.Storage.ETS, table: :squid_mesh_workflow_authoring_livebook}
defmodule SquidMeshAuthoringLivebook.Repo do
end
Application.put_env(:squid_mesh, :repo, SquidMeshAuthoringLivebook.Repo)
Application.put_env(:squid_mesh, :queue, "workflow-authoring")
opts = [
runtime: :journal,
journal_storage: storage,
queue: "workflow-authoring"
]
defmodule SquidMeshAuthoringLivebook.Output do
def step(step), do: Map.take(step, [:name, :module, :opts])
def attempt(attempt) do
Map.take(attempt, [
:step,
:status,
:attempt_number,
:visible_at,
:applied?,
:wakeup_emitted?
])
end
def node(node), do: Map.take(node, [:id, :status, :current?])
def edge(edge) do
Map.take(edge, [:id, :from, :to, :type, :status, :selected?, :pending?])
end
end
Define Step Modules
Prefer native SquidMesh.Step modules for application steps. The workflow DSL
stays readable, while each step keeps its own input and output contract.
defmodule SquidMeshAuthoringLivebook.ScoutWestRoad do
use SquidMesh.Step,
name: :scout_west_road,
description: "Scouts the western road",
input_schema: [
bearer: [type: :string, required: true],
west_mark: [type: :string, required: true],
mountain_mark: [type: :string, required: true]
],
output_schema: [
west_road: [type: :map, required: true]
]
@impl SquidMesh.Step
def run(
%{bearer: bearer, west_mark: west_mark, mountain_mark: mountain_mark},
%SquidMesh.Step.Context{}
) do
{:ok,
%{
west_road: %{
bearer: bearer,
map_mark: west_mark,
mountain_mark: mountain_mark,
distance_leagues: 12,
danger: "watched"
}
}}
end
end
defmodule SquidMeshAuthoringLivebook.ScoutMountainPass do
use SquidMesh.Step,
name: :scout_mountain_pass,
description: "Scouts the mountain pass",
input_schema: [
bearer: [type: :string, required: true],
mark: [type: :string, required: true]
],
output_schema: [
mountain_pass: [type: :map, required: true]
]
@impl SquidMesh.Step
def run(%{bearer: bearer, mark: mark}, %SquidMesh.Step.Context{}) do
{:ok,
%{
mountain_pass: %{
bearer: bearer,
map_mark: mark,
snow_depth: 3,
danger: "storms"
}
}}
end
end
defmodule SquidMeshAuthoringLivebook.ChooseRoute do
use SquidMesh.Step,
name: :choose_route,
description: "Chooses a route from joined scouting reports",
input_schema: [
bearer: [type: :string, required: true],
west_danger: [type: :string, required: true],
west_distance_leagues: [type: :integer, required: true],
mountain_danger: [type: :string, required: true],
mountain_snow_depth: [type: :integer, required: true]
],
output_schema: [
route_plan: [type: :map, required: true]
]
@impl SquidMesh.Step
def run(input, %SquidMesh.Step.Context{}) do
{:ok,
%{
route_plan: %{
bearer: input.bearer,
route: "moria",
reason:
"the western road is #{input.west_danger} and the pass has #{input.mountain_snow_depth} feet of snow",
compared: %{
west_distance_leagues: input.west_distance_leagues,
west_danger: input.west_danger,
mountain_danger: input.mountain_danger,
mountain_snow_depth: input.mountain_snow_depth
}
}
}}
end
end
Define A Dependency Workflow
This workflow has one root step, one dependent scout, and one join step:
-
:scout_west_roadconsumes the bearer and nested map marks from the payload -
:scout_mountain_passwaits for the western scout and consumes data from its output -
:choose_routewaits for both scouts and maps nested context into the input shape it needs
defmodule SquidMeshAuthoringLivebook.RoutePlanningWorkflow do
use SquidMesh.Workflow
workflow do
trigger :plan_errand do
manual()
payload do
field :bearer, :string, default: "Frodo"
field :map_marks, :map,
default: %{west_road: "watched", mountain_pass: "snow"}
end
end
step :scout_west_road, SquidMeshAuthoringLivebook.ScoutWestRoad,
input: [
bearer: [:bearer],
west_mark: [:map_marks, :west_road],
mountain_mark: [:map_marks, :mountain_pass]
]
step :scout_mountain_pass, SquidMeshAuthoringLivebook.ScoutMountainPass,
after: [:scout_west_road],
input: [
bearer: [:west_road, :bearer],
mark: [:west_road, :mountain_mark]
]
step :choose_route, SquidMeshAuthoringLivebook.ChooseRoute,
after: [:scout_west_road, :scout_mountain_pass],
input: [
bearer: [:west_road, :bearer],
west_danger: [:west_road, :danger],
west_distance_leagues: [:west_road, :distance_leagues],
mountain_danger: [:mountain_pass, :danger],
mountain_snow_depth: [:mountain_pass, :snow_depth]
]
end
end
Inspect The Normalized Spec
The DSL compiles into a normalized workflow spec. Tooling can inspect this shape without parsing source code.
{:ok, spec} =
SquidMesh.Workflow.to_spec(SquidMeshAuthoringLivebook.RoutePlanningWorkflow)
%{
workflow: spec.workflow,
triggers: Enum.map(spec.triggers, &Map.take(&1, [:name, :type, :payload])),
payload: spec.payload,
entry_steps: spec.entry_steps,
steps: Enum.map(spec.steps, &SquidMeshAuthoringLivebook.Output.step/1),
transitions: spec.transitions
}
Notice that dependency workflows do not need success transitions. entry_steps
shows the root step, and later work is declared with after: [...].
Start A Run
Manual triggers can start through SquidMesh.start_run/3 when the workflow has
one trigger.
{:ok, started} =
SquidMesh.start_run(
SquidMeshAuthoringLivebook.RoutePlanningWorkflow,
%{
bearer: "Frodo",
map_marks: %{west_road: "watched", mountain_pass: "snow"}
},
opts
)
%{
run_id: started.run_id,
status: started.status,
reason: started.reason,
visible_attempts: Enum.map(started.visible_attempts, &SquidMeshAuthoringLivebook.Output.attempt/1),
planned_runnable_keys: started.planned_runnable_keys
}
The first snapshot has visible work for the root step. Later dependency steps become visible only after their prerequisites complete.
Drain Visible Work
Each call to SquidMesh.execute_next/1 claims one visible attempt, executes the
step, records the result, and returns the updated snapshot.
worker_opts = Keyword.put(opts, :owner_id, "authoring-livebook-worker")
{:ok, first_scout} = SquidMesh.execute_next(worker_opts)
{:ok, second_scout} = SquidMesh.execute_next(worker_opts)
{:ok, completed} = SquidMesh.execute_next(worker_opts)
{:ok, :none} = SquidMesh.execute_next(worker_opts)
%{
first_step: List.last(first_scout.applied_runnable_keys),
second_step: List.last(second_scout.applied_runnable_keys),
final_status: completed.status,
final_reason: completed.reason,
context: completed.context,
attempts: Enum.map(completed.attempts, &SquidMeshAuthoringLivebook.Output.attempt/1)
}
The join step receives only the mapped values declared in the workflow. The full run context remains durable and inspectable.
Inspect And Explain
Inspection answers what durable state exists. Explanation answers why the run is in its current state and which action would make progress.
{:ok, inspected} = SquidMesh.inspect_run(started.run_id, opts)
{:ok, explanation} = SquidMesh.explain_run(started.run_id, opts)
%{
inspected_status: inspected.status,
inspected_context: inspected.context,
explanation_reason: explanation.reason,
explanation_summary: explanation.summary,
explanation_evidence: explanation.evidence
}
Inspect The Graph
Graph inspection turns the same durable state into node and edge output for host UIs.
{:ok, graph} = SquidMesh.inspect_run_graph(started.run_id, opts)
payload = SquidMesh.Runs.GraphInspection.to_map(graph)
%{
status: payload.status,
current_node_ids: payload.current_node_ids,
nodes: Enum.map(payload.nodes, &SquidMeshAuthoringLivebook.Output.node/1),
edges: Enum.map(payload.edges, &SquidMeshAuthoringLivebook.Output.edge/1)
}
For the complete node and edge contract, see Graph inspection contract.
Try Changing The Workflow
Useful edits to try:
-
add a retry policy to
:scout_mountain_pass -
add a second join step that depends on
:choose_route - remove one input mapping path and observe the structured validation failure
- switch to a transition-based workflow when the shape is a straight line
Read next: