Getting Started With Squid Mesh
This Livebook introduces the core Squid Mesh model through a small workflow you can run without creating a Phoenix app or a Postgres database.
The examples use ETS-backed journal storage to keep setup light. Real host apps should use the Ecto/Postgres journal storage described in the host app integration guide.
Install
Mix.install([
{:squid_mesh, "~> 0.1.0-beta.3"}
])
Runtime Setup
Squid Mesh normally reads runtime configuration from the host application. In this notebook, we configure it directly and use an ETS table for durable facts inside the current Livebook session.
storage = {Jido.Storage.ETS, table: :squid_mesh_getting_started_livebook}
# This placeholder satisfies the library config contract for the notebook.
# A real host app should configure its actual Ecto repo instead.
defmodule SquidMeshLivebook.Repo do
end
Application.put_env(:squid_mesh, :repo, SquidMeshLivebook.Repo)
Application.put_env(:squid_mesh, :runtime, :journal)
Application.put_env(:squid_mesh, :read_model, :read_model)
Application.put_env(:squid_mesh, :journal_storage, storage)
Application.put_env(:squid_mesh, :queue, "livebook")
opts = [
runtime: :journal,
journal_storage: storage,
queue: "livebook"
]
defmodule SquidMeshLivebook.Output do
def attempt(attempt) do
Map.take(attempt, [
:step,
:status,
:attempt_number,
:visible_at,
:wakeup_emitted?,
:applied?
])
end
def runnable(runnable) do
Map.take(runnable, [:runnable_key, :key, :step, :status, :visible_at])
end
def node(node), do: Map.take(node, [:id, :status, :current?])
def edge(edge) do
Map.take(edge, [:id, :from, :to, :type, :status, :selected?, :pending?])
end
end
Define Steps
Workflow steps do the domain work. The common authoring path is
use SquidMesh.Step; raw Jido.Action modules are only needed for explicit
interop.
defmodule SquidMeshLivebook.PackLembas do
use SquidMesh.Step,
name: :pack_lembas,
description: "Packs provisions for the errand",
input_schema: [
ring_id: [type: :string, required: true]
],
output_schema: [
provisions: [type: :map, required: true]
]
@impl SquidMesh.Step
def run(%{ring_id: ring_id}, %SquidMesh.Step.Context{}) do
{:ok,
%{
provisions: %{
ring_id: ring_id,
lembas_count: 11,
packed_by: "Sam"
}
}}
end
end
defmodule SquidMeshLivebook.CrossMoria do
use SquidMesh.Step,
name: :cross_moria,
description: "Crosses Moria with the packed provisions",
input_schema: [
provisions: [type: :map, required: true]
],
output_schema: [
moria: [type: :map, required: true]
]
@impl SquidMesh.Step
def run(%{provisions: provisions}, %SquidMesh.Step.Context{run_id: run_id}) do
{:ok,
%{
moria: %{
run_id: run_id,
ring_id: provisions.ring_id,
status: "crossed",
lembas_left: provisions.lembas_count - 3
}
}}
end
end
Define A Workflow
A workflow declares the trigger, payload, steps, and transitions. The workflow definition says what should happen; the journal records what did happen.
defmodule SquidMeshLivebook.RingErrandWorkflow do
use SquidMesh.Workflow
workflow do
trigger :leave_shire do
manual()
payload do
field :ring_id, :string
end
end
step :pack_lembas, SquidMeshLivebook.PackLembas
step :cross_moria, SquidMeshLivebook.CrossMoria
transition :pack_lembas, on: :ok, to: :cross_moria
transition :cross_moria, on: :ok, to: :complete
end
end
Inspect The Workflow Spec
The DSL compiles into a normalized workflow spec. This is the data shape that tooling can inspect without parsing the module source. It is also the contract visual editors and runtime-authored workflows will build on.
{:ok, spec} = SquidMesh.Workflow.to_spec(SquidMeshLivebook.RingErrandWorkflow)
%{
workflow: spec.workflow,
triggers: Enum.map(spec.triggers, &Map.take(&1, [:name, :type, :payload])),
payload: spec.payload,
steps: Enum.map(spec.steps, &Map.take(&1, [:name, :module, :opts])),
transitions: spec.transitions,
entry_steps: spec.entry_steps
}
triggers describe how a run can start. payload is the external input
contract. steps are executable workflow nodes. transitions describe durable
progression from one step outcome to the next node or to :complete.
For deeper authoring rules, see Workflow Authoring.
Start A Run
Manual triggers start through the public API. This workflow has one trigger, so
start_run/3 uses it as the default trigger.
{:ok, run} =
SquidMesh.start_run(
SquidMeshLivebook.RingErrandWorkflow,
%{ring_id: "one-ring"},
opts
)
%{
run_id: run.run_id,
status: run.status,
reason: run.reason,
planned_runnables: run.planned_runnables,
visible_attempts: Enum.map(run.visible_attempts, &SquidMeshLivebook.Output.attempt/1),
scheduled_attempts: run.scheduled_attempts,
next_visible_at: run.next_visible_at
}
The first snapshot already has one visible attempt: pack_lembas. Visible
attempts are work the host can claim now. Scheduled attempts are work that
exists but should not be claimed until next_visible_at.
Execute Visible Work
Workers provide capacity by calling SquidMesh.execute_next/1. Each call claims
one visible journal attempt, runs the step, records the result, and makes the
next attempt visible when the workflow should continue.
A run is the whole workflow instance. An attempt is one executable unit of work
inside that run, such as the visible :pack_lembas or :cross_moria step.
worker_opts = Keyword.put(opts, :owner_id, "livebook-worker")
{:ok, first_step} = SquidMesh.execute_next(worker_opts)
{:ok, completed_run} = SquidMesh.execute_next(worker_opts)
{:ok, no_more_work} = SquidMesh.execute_next(worker_opts)
%{
first_step_status: first_step.status,
first_step_reason: first_step.reason,
visible_after_first_step: Enum.map(first_step.visible_attempts, &SquidMeshLivebook.Output.attempt/1),
applied_after_first_step: first_step.applied_runnable_keys,
completed_status: completed_run.status,
completed_reason: completed_run.reason,
completed_attempts: Enum.map(completed_run.attempts, &SquidMeshLivebook.Output.attempt/1),
no_more_work: no_more_work
}
After the first call, the pack_lembas attempt has been applied and the
cross_moria attempt becomes visible. After the second call, the run is
terminal. The third call returns :none because this queue has no visible work
left.
Inspect The Run
Inspection reads durable journal facts. Use it for dashboards, support tools, and operator-facing detail pages.
{:ok, inspected} =
SquidMesh.inspect_run(
run.run_id,
Keyword.merge(opts, include_history: true)
)
%{
status: inspected.status,
reason: inspected.reason,
context: inspected.context,
planned_runnables: Enum.map(inspected.planned_runnables, &SquidMeshLivebook.Output.runnable/1),
visible_attempts: Enum.map(inspected.visible_attempts, &SquidMeshLivebook.Output.attempt/1),
scheduled_attempts: Enum.map(inspected.scheduled_attempts, &SquidMeshLivebook.Output.attempt/1),
attempts: Enum.map(inspected.attempts, &SquidMeshLivebook.Output.attempt/1),
next_visible_at: inspected.next_visible_at
}
context is the durable run context assembled from completed step outputs.
attempts is historical evidence. visible_attempts and scheduled_attempts
explain what can run now versus what needs a later wakeup.
Inspect The Graph
Graph inspection gives UI builders a node and edge view of the same run.
{:ok, graph} = SquidMesh.inspect_run_graph(run.run_id, opts)
graph_payload = SquidMesh.Runs.GraphInspection.to_map(graph)
%{
source: graph.source,
current_node_id: graph.current_node_id,
nodes: Enum.map(graph_payload.nodes, &SquidMeshLivebook.Output.node/1),
edges: Enum.map(graph_payload.edges, &SquidMeshLivebook.Output.edge/1)
}
The graph contract is the shape a host UI can serialize after applying its own authorization and redaction policy. See the Graph Inspection Contract for the full node and edge shape.
Explain The State
Explanation condenses the run into a reason and diagnostics that are easier to show to operators.
{:ok, explanation} = SquidMesh.explain_run(run.run_id, opts)
%{
status: explanation.status,
reason: explanation.reason,
summary: explanation.summary,
next_actions: explanation.next_actions,
evidence: explanation.evidence
}
Use explanation output when a support or operator surface needs to answer “what is the runtime waiting for?” without exposing raw journal entries.
See A Scheduled Wakeup
Wait steps turn workflow-scale delays into future-visible attempts. They are useful when the workflow should continue later, while the journal remains the source of truth.
defmodule SquidMeshLivebook.RecordGandalfArrival do
use SquidMesh.Step,
name: :record_gandalf_arrival,
description: "Records that the delayed rendezvous became visible",
input_schema: [
ring_id: [type: :string, required: true]
],
output_schema: [
rendezvous: [type: :map, required: true]
]
@impl SquidMesh.Step
def run(%{ring_id: ring_id}, %SquidMesh.Step.Context{}) do
{:ok, %{rendezvous: %{ring_id: ring_id, status: "wizard arrived"}}}
end
end
defmodule SquidMeshLivebook.GandalfRendezvousWorkflow do
use SquidMesh.Workflow
workflow do
trigger :wait_for_wizard do
manual()
payload do
field :ring_id, :string
end
end
step :wait_for_gandalf, :wait, duration: 1_000
step :record_gandalf_arrival, SquidMeshLivebook.RecordGandalfArrival
transition :wait_for_gandalf, on: :ok, to: :record_gandalf_arrival
transition :record_gandalf_arrival, on: :ok, to: :complete
end
end
wakeup_time = DateTime.utc_now()
wakeup_opts = Keyword.merge(opts, queue: "livebook-wakeup", now: wakeup_time)
wakeup_worker_opts = Keyword.put(wakeup_opts, :owner_id, "livebook-worker")
{:ok, wakeup_run} =
SquidMesh.start_run(
SquidMeshLivebook.GandalfRendezvousWorkflow,
%{ring_id: "one-ring"},
wakeup_opts
)
{:ok, scheduled_run} = SquidMesh.execute_next(wakeup_worker_opts)
%{
run_id: wakeup_run.run_id,
reason: scheduled_run.reason,
visible_attempts: scheduled_run.visible_attempts,
scheduled_attempts: Enum.map(scheduled_run.scheduled_attempts, &SquidMeshLivebook.Output.attempt/1),
next_visible_at: scheduled_run.next_visible_at
}
The wait step completed, then Squid Mesh scheduled record_gandalf_arrival for later.
Until next_visible_at, workers should see no visible work for that queue.
{:ok, :none} = SquidMesh.execute_next(wakeup_worker_opts)
{:ok, completed_rendezvous} =
SquidMesh.execute_next(Keyword.put(wakeup_worker_opts, :now, scheduled_run.next_visible_at))
%{
status: completed_rendezvous.status,
reason: completed_rendezvous.reason,
attempts: Enum.map(completed_rendezvous.attempts, &SquidMeshLivebook.Output.attempt/1)
}
Add A Human Approval Boundary
Approval steps pause the workflow until an operator approves or rejects the run. This is durable workflow state, not a transient process wait.
defmodule SquidMeshLivebook.RecordCouncilApproval do
use SquidMesh.Step,
name: :record_council_approval,
description: "Records an approved errand",
input_schema: [
ring_id: [type: :string, required: true],
approval: [type: :map, required: true]
],
output_schema: [
recorded: [type: :map, required: true]
]
@impl SquidMesh.Step
def run(%{ring_id: ring_id, approval: approval}, %SquidMesh.Step.Context{}) do
{:ok, %{recorded: %{ring_id: ring_id, decision: approval.decision}}}
end
end
defmodule SquidMeshLivebook.RecordCouncilRejection do
use SquidMesh.Step,
name: :record_council_rejection,
description: "Records a rejected errand",
input_schema: [
ring_id: [type: :string, required: true],
approval: [type: :map, required: true]
],
output_schema: [
recorded: [type: :map, required: true]
]
@impl SquidMesh.Step
def run(%{ring_id: ring_id, approval: approval}, %SquidMesh.Step.Context{}) do
{:ok, %{recorded: %{ring_id: ring_id, decision: approval.decision}}}
end
end
defmodule SquidMeshLivebook.CouncilApprovalWorkflow do
use SquidMesh.Workflow
workflow do
trigger :council_review do
manual()
payload do
field :ring_id, :string
end
end
approval_step :wait_for_council, output: :approval
step :record_council_approval, SquidMeshLivebook.RecordCouncilApproval
step :record_council_rejection, SquidMeshLivebook.RecordCouncilRejection
transition :wait_for_council, on: :ok, to: :record_council_approval
transition :wait_for_council, on: :error, to: :record_council_rejection
transition :record_council_approval, on: :ok, to: :complete
transition :record_council_rejection, on: :ok, to: :complete
end
end
approval_opts = Keyword.put(opts, :queue, "livebook-approval")
approval_worker_opts = Keyword.put(approval_opts, :owner_id, "livebook-worker")
{:ok, approval_run} =
SquidMesh.start_run(
SquidMeshLivebook.CouncilApprovalWorkflow,
%{ring_id: "one-ring"},
approval_opts
)
{:ok, paused_run} = SquidMesh.execute_next(approval_worker_opts)
{:ok, paused_explanation} = SquidMesh.explain_run(approval_run.run_id, approval_opts)
{:ok, resumed_run} =
SquidMesh.approve_run(
approval_run.run_id,
%{actor: "ops_123", comment: "looks good"},
approval_opts
)
{:ok, approved_run} = SquidMesh.execute_next(approval_worker_opts)
%{
paused_status: paused_run.status,
paused_reason: paused_run.reason,
manual_state: paused_run.manual_state,
paused_summary: paused_explanation.summary,
paused_next_actions: paused_explanation.next_actions,
resumed_status: resumed_run.status,
visible_after_approval: Enum.map(resumed_run.visible_attempts, &SquidMeshLivebook.Output.attempt/1),
completed_status: approved_run.status,
manual_state_after_resume: resumed_run.manual_state
}
The paused snapshot exposes manual_state, so a host UI can render the
operator decision boundary. approve_run/3 records the decision and makes the
approval path visible. The next worker execution runs record_council_approval
and finishes the workflow.