Powered by AppSignal & Oban Pro

Getting Started With Squid Mesh

docs/getting_started.livemd

Getting Started With Squid Mesh

This Livebook introduces the core Squid Mesh model through a small workflow you can run without creating a Phoenix app or a Postgres database.

The examples use ETS-backed journal storage to keep setup light. Real host apps should use the Ecto/Postgres journal storage described in the host app integration guide.

Install

Mix.install([
  {:squid_mesh, "~> 0.1.0-beta.3"}
])

Runtime Setup

Squid Mesh normally reads runtime configuration from the host application. In this notebook, we configure it directly and use an ETS table for durable facts inside the current Livebook session.

storage = {Jido.Storage.ETS, table: :squid_mesh_getting_started_livebook}

# This placeholder satisfies the library config contract for the notebook.
# A real host app should configure its actual Ecto repo instead.
defmodule SquidMeshLivebook.Repo do
end

Application.put_env(:squid_mesh, :repo, SquidMeshLivebook.Repo)
Application.put_env(:squid_mesh, :runtime, :journal)
Application.put_env(:squid_mesh, :read_model, :read_model)
Application.put_env(:squid_mesh, :journal_storage, storage)
Application.put_env(:squid_mesh, :queue, "livebook")

opts = [
  runtime: :journal,
  journal_storage: storage,
  queue: "livebook"
]

defmodule SquidMeshLivebook.Output do
  def attempt(attempt) do
    Map.take(attempt, [
      :step,
      :status,
      :attempt_number,
      :visible_at,
      :wakeup_emitted?,
      :applied?
    ])
  end

  def runnable(runnable) do
    Map.take(runnable, [:runnable_key, :key, :step, :status, :visible_at])
  end

  def node(node), do: Map.take(node, [:id, :status, :current?])

  def edge(edge) do
    Map.take(edge, [:id, :from, :to, :type, :status, :selected?, :pending?])
  end
end

Define Steps

Workflow steps do the domain work. The common authoring path is use SquidMesh.Step; raw Jido.Action modules are only needed for explicit interop.

defmodule SquidMeshLivebook.PackLembas do
  use SquidMesh.Step,
    name: :pack_lembas,
    description: "Packs provisions for the errand",
    input_schema: [
      ring_id: [type: :string, required: true]
    ],
    output_schema: [
      provisions: [type: :map, required: true]
    ]

  @impl SquidMesh.Step
  def run(%{ring_id: ring_id}, %SquidMesh.Step.Context{}) do
    {:ok,
     %{
       provisions: %{
         ring_id: ring_id,
         lembas_count: 11,
         packed_by: "Sam"
       }
     }}
  end
end

defmodule SquidMeshLivebook.CrossMoria do
  use SquidMesh.Step,
    name: :cross_moria,
    description: "Crosses Moria with the packed provisions",
    input_schema: [
      provisions: [type: :map, required: true]
    ],
    output_schema: [
      moria: [type: :map, required: true]
    ]

  @impl SquidMesh.Step
  def run(%{provisions: provisions}, %SquidMesh.Step.Context{run_id: run_id}) do
    {:ok,
     %{
       moria: %{
         run_id: run_id,
         ring_id: provisions.ring_id,
         status: "crossed",
         lembas_left: provisions.lembas_count - 3
       }
     }}
  end
end

Define A Workflow

A workflow declares the trigger, payload, steps, and transitions. The workflow definition says what should happen; the journal records what did happen.

defmodule SquidMeshLivebook.RingErrandWorkflow do
  use SquidMesh.Workflow

  workflow do
    trigger :leave_shire do
      manual()

      payload do
        field :ring_id, :string
      end
    end

    step :pack_lembas, SquidMeshLivebook.PackLembas
    step :cross_moria, SquidMeshLivebook.CrossMoria

    transition :pack_lembas, on: :ok, to: :cross_moria
    transition :cross_moria, on: :ok, to: :complete
  end
end

Inspect The Workflow Spec

The DSL compiles into a normalized workflow spec. This is the data shape that tooling can inspect without parsing the module source. It is also the contract visual editors and runtime-authored workflows will build on.

{:ok, spec} = SquidMesh.Workflow.to_spec(SquidMeshLivebook.RingErrandWorkflow)

%{
  workflow: spec.workflow,
  triggers: Enum.map(spec.triggers, &Map.take(&1, [:name, :type, :payload])),
  payload: spec.payload,
  steps: Enum.map(spec.steps, &Map.take(&1, [:name, :module, :opts])),
  transitions: spec.transitions,
  entry_steps: spec.entry_steps
}

triggers describe how a run can start. payload is the external input contract. steps are executable workflow nodes. transitions describe durable progression from one step outcome to the next node or to :complete.

For deeper authoring rules, see Workflow Authoring.

Start A Run

Manual triggers start through the public API. This workflow has one trigger, so start_run/3 uses it as the default trigger.

{:ok, run} =
  SquidMesh.start_run(
    SquidMeshLivebook.RingErrandWorkflow,
    %{ring_id: "one-ring"},
    opts
  )

%{
  run_id: run.run_id,
  status: run.status,
  reason: run.reason,
  planned_runnables: run.planned_runnables,
  visible_attempts: Enum.map(run.visible_attempts, &SquidMeshLivebook.Output.attempt/1),
  scheduled_attempts: run.scheduled_attempts,
  next_visible_at: run.next_visible_at
}

The first snapshot already has one visible attempt: pack_lembas. Visible attempts are work the host can claim now. Scheduled attempts are work that exists but should not be claimed until next_visible_at.

Execute Visible Work

Workers provide capacity by calling SquidMesh.execute_next/1. Each call claims one visible journal attempt, runs the step, records the result, and makes the next attempt visible when the workflow should continue.

A run is the whole workflow instance. An attempt is one executable unit of work inside that run, such as the visible :pack_lembas or :cross_moria step.

worker_opts = Keyword.put(opts, :owner_id, "livebook-worker")

{:ok, first_step} = SquidMesh.execute_next(worker_opts)
{:ok, completed_run} = SquidMesh.execute_next(worker_opts)
{:ok, no_more_work} = SquidMesh.execute_next(worker_opts)

%{
  first_step_status: first_step.status,
  first_step_reason: first_step.reason,
  visible_after_first_step: Enum.map(first_step.visible_attempts, &SquidMeshLivebook.Output.attempt/1),
  applied_after_first_step: first_step.applied_runnable_keys,
  completed_status: completed_run.status,
  completed_reason: completed_run.reason,
  completed_attempts: Enum.map(completed_run.attempts, &SquidMeshLivebook.Output.attempt/1),
  no_more_work: no_more_work
}

After the first call, the pack_lembas attempt has been applied and the cross_moria attempt becomes visible. After the second call, the run is terminal. The third call returns :none because this queue has no visible work left.

Inspect The Run

Inspection reads durable journal facts. Use it for dashboards, support tools, and operator-facing detail pages.

{:ok, inspected} =
  SquidMesh.inspect_run(
    run.run_id,
    Keyword.merge(opts, include_history: true)
  )

%{
  status: inspected.status,
  reason: inspected.reason,
  context: inspected.context,
  planned_runnables: Enum.map(inspected.planned_runnables, &SquidMeshLivebook.Output.runnable/1),
  visible_attempts: Enum.map(inspected.visible_attempts, &SquidMeshLivebook.Output.attempt/1),
  scheduled_attempts: Enum.map(inspected.scheduled_attempts, &SquidMeshLivebook.Output.attempt/1),
  attempts: Enum.map(inspected.attempts, &SquidMeshLivebook.Output.attempt/1),
  next_visible_at: inspected.next_visible_at
}

context is the durable run context assembled from completed step outputs. attempts is historical evidence. visible_attempts and scheduled_attempts explain what can run now versus what needs a later wakeup.

Inspect The Graph

Graph inspection gives UI builders a node and edge view of the same run.

{:ok, graph} = SquidMesh.inspect_run_graph(run.run_id, opts)
graph_payload = SquidMesh.Runs.GraphInspection.to_map(graph)

%{
  source: graph.source,
  current_node_id: graph.current_node_id,
  nodes: Enum.map(graph_payload.nodes, &SquidMeshLivebook.Output.node/1),
  edges: Enum.map(graph_payload.edges, &SquidMeshLivebook.Output.edge/1)
}

The graph contract is the shape a host UI can serialize after applying its own authorization and redaction policy. See the Graph Inspection Contract for the full node and edge shape.

Explain The State

Explanation condenses the run into a reason and diagnostics that are easier to show to operators.

{:ok, explanation} = SquidMesh.explain_run(run.run_id, opts)

%{
  status: explanation.status,
  reason: explanation.reason,
  summary: explanation.summary,
  next_actions: explanation.next_actions,
  evidence: explanation.evidence
}

Use explanation output when a support or operator surface needs to answer “what is the runtime waiting for?” without exposing raw journal entries.

See A Scheduled Wakeup

Wait steps turn workflow-scale delays into future-visible attempts. They are useful when the workflow should continue later, while the journal remains the source of truth.

defmodule SquidMeshLivebook.RecordGandalfArrival do
  use SquidMesh.Step,
    name: :record_gandalf_arrival,
    description: "Records that the delayed rendezvous became visible",
    input_schema: [
      ring_id: [type: :string, required: true]
    ],
    output_schema: [
      rendezvous: [type: :map, required: true]
    ]

  @impl SquidMesh.Step
  def run(%{ring_id: ring_id}, %SquidMesh.Step.Context{}) do
    {:ok, %{rendezvous: %{ring_id: ring_id, status: "wizard arrived"}}}
  end
end

defmodule SquidMeshLivebook.GandalfRendezvousWorkflow do
  use SquidMesh.Workflow

  workflow do
    trigger :wait_for_wizard do
      manual()

      payload do
        field :ring_id, :string
      end
    end

    step :wait_for_gandalf, :wait, duration: 1_000
    step :record_gandalf_arrival, SquidMeshLivebook.RecordGandalfArrival

    transition :wait_for_gandalf, on: :ok, to: :record_gandalf_arrival
    transition :record_gandalf_arrival, on: :ok, to: :complete
  end
end

wakeup_time = DateTime.utc_now()
wakeup_opts = Keyword.merge(opts, queue: "livebook-wakeup", now: wakeup_time)
wakeup_worker_opts = Keyword.put(wakeup_opts, :owner_id, "livebook-worker")

{:ok, wakeup_run} =
  SquidMesh.start_run(
    SquidMeshLivebook.GandalfRendezvousWorkflow,
    %{ring_id: "one-ring"},
    wakeup_opts
  )

{:ok, scheduled_run} = SquidMesh.execute_next(wakeup_worker_opts)

%{
  run_id: wakeup_run.run_id,
  reason: scheduled_run.reason,
  visible_attempts: scheduled_run.visible_attempts,
  scheduled_attempts: Enum.map(scheduled_run.scheduled_attempts, &SquidMeshLivebook.Output.attempt/1),
  next_visible_at: scheduled_run.next_visible_at
}

The wait step completed, then Squid Mesh scheduled record_gandalf_arrival for later. Until next_visible_at, workers should see no visible work for that queue.

{:ok, :none} = SquidMesh.execute_next(wakeup_worker_opts)

{:ok, completed_rendezvous} =
  SquidMesh.execute_next(Keyword.put(wakeup_worker_opts, :now, scheduled_run.next_visible_at))

%{
  status: completed_rendezvous.status,
  reason: completed_rendezvous.reason,
  attempts: Enum.map(completed_rendezvous.attempts, &SquidMeshLivebook.Output.attempt/1)
}

Add A Human Approval Boundary

Approval steps pause the workflow until an operator approves or rejects the run. This is durable workflow state, not a transient process wait.

defmodule SquidMeshLivebook.RecordCouncilApproval do
  use SquidMesh.Step,
    name: :record_council_approval,
    description: "Records an approved errand",
    input_schema: [
      ring_id: [type: :string, required: true],
      approval: [type: :map, required: true]
    ],
    output_schema: [
      recorded: [type: :map, required: true]
    ]

  @impl SquidMesh.Step
  def run(%{ring_id: ring_id, approval: approval}, %SquidMesh.Step.Context{}) do
    {:ok, %{recorded: %{ring_id: ring_id, decision: approval.decision}}}
  end
end

defmodule SquidMeshLivebook.RecordCouncilRejection do
  use SquidMesh.Step,
    name: :record_council_rejection,
    description: "Records a rejected errand",
    input_schema: [
      ring_id: [type: :string, required: true],
      approval: [type: :map, required: true]
    ],
    output_schema: [
      recorded: [type: :map, required: true]
    ]

  @impl SquidMesh.Step
  def run(%{ring_id: ring_id, approval: approval}, %SquidMesh.Step.Context{}) do
    {:ok, %{recorded: %{ring_id: ring_id, decision: approval.decision}}}
  end
end

defmodule SquidMeshLivebook.CouncilApprovalWorkflow do
  use SquidMesh.Workflow

  workflow do
    trigger :council_review do
      manual()

      payload do
        field :ring_id, :string
      end
    end

    approval_step :wait_for_council, output: :approval
    step :record_council_approval, SquidMeshLivebook.RecordCouncilApproval
    step :record_council_rejection, SquidMeshLivebook.RecordCouncilRejection

    transition :wait_for_council, on: :ok, to: :record_council_approval
    transition :wait_for_council, on: :error, to: :record_council_rejection
    transition :record_council_approval, on: :ok, to: :complete
    transition :record_council_rejection, on: :ok, to: :complete
  end
end

approval_opts = Keyword.put(opts, :queue, "livebook-approval")
approval_worker_opts = Keyword.put(approval_opts, :owner_id, "livebook-worker")

{:ok, approval_run} =
  SquidMesh.start_run(
    SquidMeshLivebook.CouncilApprovalWorkflow,
    %{ring_id: "one-ring"},
    approval_opts
  )

{:ok, paused_run} = SquidMesh.execute_next(approval_worker_opts)

{:ok, paused_explanation} = SquidMesh.explain_run(approval_run.run_id, approval_opts)

{:ok, resumed_run} =
  SquidMesh.approve_run(
    approval_run.run_id,
    %{actor: "ops_123", comment: "looks good"},
    approval_opts
  )

{:ok, approved_run} = SquidMesh.execute_next(approval_worker_opts)

%{
  paused_status: paused_run.status,
  paused_reason: paused_run.reason,
  manual_state: paused_run.manual_state,
  paused_summary: paused_explanation.summary,
  paused_next_actions: paused_explanation.next_actions,
  resumed_status: resumed_run.status,
  visible_after_approval: Enum.map(resumed_run.visible_attempts, &SquidMeshLivebook.Output.attempt/1),
  completed_status: approved_run.status,
  manual_state_after_resume: resumed_run.manual_state
}

The paused snapshot exposes manual_state, so a host UI can render the operator decision boundary. approve_run/3 records the decision and makes the approval path visible. The next worker execution runs record_council_approval and finishes the workflow.

What To Read Next