Powered by AppSignal & Oban Pro

Workflow Authoring

docs/workflow_authoring.livemd

Workflow Authoring

This Livebook focuses on the authoring surface: workflow DSL structure, normalized workflow specs, dependency joins, input mappings, execution, and graph inspection.

It uses in-memory ETS journal storage so you can run it without a host app.

Mix.install([
  {:squid_mesh, "~> 0.1.0-beta.3"}
])

Runtime Setup

The runtime options below keep all durable facts inside this Livebook session.

storage = {Jido.Storage.ETS, table: :squid_mesh_workflow_authoring_livebook}

defmodule SquidMeshAuthoringLivebook.Repo do
end

Application.put_env(:squid_mesh, :repo, SquidMeshAuthoringLivebook.Repo)
Application.put_env(:squid_mesh, :queue, "workflow-authoring")

opts = [
  runtime: :journal,
  journal_storage: storage,
  queue: "workflow-authoring"
]

defmodule SquidMeshAuthoringLivebook.Output do
  def step(step), do: Map.take(step, [:name, :module, :opts])

  def attempt(attempt) do
    Map.take(attempt, [
      :step,
      :status,
      :attempt_number,
      :visible_at,
      :applied?,
      :wakeup_emitted?
    ])
  end

  def node(node), do: Map.take(node, [:id, :status, :current?])

  def edge(edge) do
    Map.take(edge, [:id, :from, :to, :type, :status, :selected?, :pending?])
  end
end

Define Step Modules

Prefer native SquidMesh.Step modules for application steps. The workflow DSL stays readable, while each step keeps its own input and output contract.

defmodule SquidMeshAuthoringLivebook.ScoutWestRoad do
  use SquidMesh.Step,
    name: :scout_west_road,
    description: "Scouts the western road",
    input_schema: [
      bearer: [type: :string, required: true],
      west_mark: [type: :string, required: true],
      mountain_mark: [type: :string, required: true]
    ],
    output_schema: [
      west_road: [type: :map, required: true]
    ]

  @impl SquidMesh.Step
  def run(
        %{bearer: bearer, west_mark: west_mark, mountain_mark: mountain_mark},
        %SquidMesh.Step.Context{}
      ) do
    {:ok,
     %{
       west_road: %{
         bearer: bearer,
         map_mark: west_mark,
         mountain_mark: mountain_mark,
         distance_leagues: 12,
         danger: "watched"
       }
     }}
  end
end

defmodule SquidMeshAuthoringLivebook.ScoutMountainPass do
  use SquidMesh.Step,
    name: :scout_mountain_pass,
    description: "Scouts the mountain pass",
    input_schema: [
      bearer: [type: :string, required: true],
      mark: [type: :string, required: true]
    ],
    output_schema: [
      mountain_pass: [type: :map, required: true]
    ]

  @impl SquidMesh.Step
  def run(%{bearer: bearer, mark: mark}, %SquidMesh.Step.Context{}) do
    {:ok,
     %{
       mountain_pass: %{
         bearer: bearer,
         map_mark: mark,
         snow_depth: 3,
         danger: "storms"
       }
     }}
  end
end

defmodule SquidMeshAuthoringLivebook.ChooseRoute do
  use SquidMesh.Step,
    name: :choose_route,
    description: "Chooses a route from joined scouting reports",
    input_schema: [
      bearer: [type: :string, required: true],
      west_danger: [type: :string, required: true],
      west_distance_leagues: [type: :integer, required: true],
      mountain_danger: [type: :string, required: true],
      mountain_snow_depth: [type: :integer, required: true]
    ],
    output_schema: [
      route_plan: [type: :map, required: true]
    ]

  @impl SquidMesh.Step
  def run(input, %SquidMesh.Step.Context{}) do
    {:ok,
     %{
       route_plan: %{
         bearer: input.bearer,
         route: "moria",
         reason:
           "the western road is #{input.west_danger} and the pass has #{input.mountain_snow_depth} feet of snow",
         compared: %{
           west_distance_leagues: input.west_distance_leagues,
           west_danger: input.west_danger,
           mountain_danger: input.mountain_danger,
           mountain_snow_depth: input.mountain_snow_depth
         }
       }
     }}
  end
end

Define A Dependency Workflow

This workflow has one root step, one dependent scout, and one join step:

  • :scout_west_road consumes the bearer and nested map marks from the payload
  • :scout_mountain_pass waits for the western scout and consumes data from its output
  • :choose_route waits for both scouts and maps nested context into the input shape it needs
defmodule SquidMeshAuthoringLivebook.RoutePlanningWorkflow do
  use SquidMesh.Workflow

  workflow do
    trigger :plan_errand do
      manual()

      payload do
        field :bearer, :string, default: "Frodo"
        field :map_marks, :map,
          default: %{west_road: "watched", mountain_pass: "snow"}
      end
    end

    step :scout_west_road, SquidMeshAuthoringLivebook.ScoutWestRoad,
      input: [
        bearer: [:bearer],
        west_mark: [:map_marks, :west_road],
        mountain_mark: [:map_marks, :mountain_pass]
      ]

    step :scout_mountain_pass, SquidMeshAuthoringLivebook.ScoutMountainPass,
      after: [:scout_west_road],
      input: [
        bearer: [:west_road, :bearer],
        mark: [:west_road, :mountain_mark]
      ]

    step :choose_route, SquidMeshAuthoringLivebook.ChooseRoute,
      after: [:scout_west_road, :scout_mountain_pass],
      input: [
        bearer: [:west_road, :bearer],
        west_danger: [:west_road, :danger],
        west_distance_leagues: [:west_road, :distance_leagues],
        mountain_danger: [:mountain_pass, :danger],
        mountain_snow_depth: [:mountain_pass, :snow_depth]
      ]
  end
end

Inspect The Normalized Spec

The DSL compiles into a normalized workflow spec. Tooling can inspect this shape without parsing source code.

{:ok, spec} =
  SquidMesh.Workflow.to_spec(SquidMeshAuthoringLivebook.RoutePlanningWorkflow)

%{
  workflow: spec.workflow,
  triggers: Enum.map(spec.triggers, &Map.take(&1, [:name, :type, :payload])),
  payload: spec.payload,
  entry_steps: spec.entry_steps,
  steps: Enum.map(spec.steps, &SquidMeshAuthoringLivebook.Output.step/1),
  transitions: spec.transitions
}

Notice that dependency workflows do not need success transitions. entry_steps shows the root step, and later work is declared with after: [...].

Start A Run

Manual triggers can start through SquidMesh.start_run/3 when the workflow has one trigger.

{:ok, started} =
  SquidMesh.start_run(
    SquidMeshAuthoringLivebook.RoutePlanningWorkflow,
    %{
      bearer: "Frodo",
      map_marks: %{west_road: "watched", mountain_pass: "snow"}
    },
    opts
  )

%{
  run_id: started.run_id,
  status: started.status,
  reason: started.reason,
  visible_attempts: Enum.map(started.visible_attempts, &SquidMeshAuthoringLivebook.Output.attempt/1),
  planned_runnable_keys: started.planned_runnable_keys
}

The first snapshot has visible work for the root step. Later dependency steps become visible only after their prerequisites complete.

Drain Visible Work

Each call to SquidMesh.execute_next/1 claims one visible attempt, executes the step, records the result, and returns the updated snapshot.

worker_opts = Keyword.put(opts, :owner_id, "authoring-livebook-worker")

{:ok, first_scout} = SquidMesh.execute_next(worker_opts)
{:ok, second_scout} = SquidMesh.execute_next(worker_opts)
{:ok, completed} = SquidMesh.execute_next(worker_opts)
{:ok, :none} = SquidMesh.execute_next(worker_opts)

%{
  first_step: List.last(first_scout.applied_runnable_keys),
  second_step: List.last(second_scout.applied_runnable_keys),
  final_status: completed.status,
  final_reason: completed.reason,
  context: completed.context,
  attempts: Enum.map(completed.attempts, &SquidMeshAuthoringLivebook.Output.attempt/1)
}

The join step receives only the mapped values declared in the workflow. The full run context remains durable and inspectable.

Inspect And Explain

Inspection answers what durable state exists. Explanation answers why the run is in its current state and which action would make progress.

{:ok, inspected} = SquidMesh.inspect_run(started.run_id, opts)
{:ok, explanation} = SquidMesh.explain_run(started.run_id, opts)

%{
  inspected_status: inspected.status,
  inspected_context: inspected.context,
  explanation_reason: explanation.reason,
  explanation_summary: explanation.summary,
  explanation_evidence: explanation.evidence
}

Inspect The Graph

Graph inspection turns the same durable state into node and edge output for host UIs.

{:ok, graph} = SquidMesh.inspect_run_graph(started.run_id, opts)
payload = SquidMesh.Runs.GraphInspection.to_map(graph)

%{
  status: payload.status,
  current_node_ids: payload.current_node_ids,
  nodes: Enum.map(payload.nodes, &SquidMeshAuthoringLivebook.Output.node/1),
  edges: Enum.map(payload.edges, &SquidMeshAuthoringLivebook.Output.edge/1)
}

For the complete node and edge contract, see Graph inspection contract.

Try Changing The Workflow

Useful edits to try:

  • add a retry policy to :scout_mountain_pass
  • add a second join step that depends on :choose_route
  • remove one input mapping path and observe the structured validation failure
  • switch to a transition-based workflow when the shape is a straight line

Read next: