x + y = _ , with Journey
Mix.install(
[
{:ecto_sql, "~> 3.10"},
{:postgrex, "~> 0.21"},
{:jason, "~> 1.4"},
{:journey, "~> 0.10", organization: "shipworthy"},
{:kino_vega_lite, "~> 0.1.11"},
{:kino, "~> 0.16.1"}
],
start_applications: false
)
Application.put_env(:journey, :log_level, :warning)
Application.put_env(:journey, Journey.Repo,
database: "journey_dev",
username: "postgres",
password: "postgres",
hostname: "localhost",
log: false,
port: 5438
)
Application.put_env(:journey, :ecto_repos, [Journey.Repo])
Application.loaded_applications()
|> Enum.map(fn {app, _, _} -> app end)
|> Enum.each(&Application.ensure_all_started/1)
Getting things done with Journey
This livebook shows using Journey for a ridiculously basic flow: computing the sum of two numbers. It has two input values (x
and y
) and one computation (sum
).
This livebook shows creating a blueprint (graph) for computing the sum, and then executing an instance of the blueprint to perform a computation.
A few things to note:
- every input value (:x, :y), or computation result (:sum) is persisted,
- the :sum computation happens reliably (with a retry policy),
- the :sum computation is as horizontally distributed as your app,
-
the :sum computation is proactive: it will be computed when
x
andy
become available, - the executions of this flow can take as long as needed (milliseconds? months?), and will live through system restarts, crashes, redeployments, page reloads, etc.
Define the Blueprint of the Application
Our application is very simple, given two numbers, it computes the sum. ; )
“Business logic”: f_add(x, y)
# The function for adding two numbers. Duh. ; )
f_add = fn %{x: x, y: y} -> {:ok, x + y} end
#Function<42.81571850/1 in :erl_eval.expr/6>
The flow
import Journey.Node
import Journey.Node.Conditions
import Journey.Node.UpstreamDependencies
graph = Journey.new_graph(
# graph name.
"g1",
# graph version.
"v1",
# graph nodes.
[
input(:x),
input(:y),
# the `:sum` computation is waiting on :x and :y.
compute(:sum, [:x, :y], f_add)
]
)
%Journey.Graph{
name: "g1",
version: "v1",
nodes: [
%Journey.Graph.Input{name: :execution_id, type: :input},
%Journey.Graph.Input{name: :last_updated_at, type: :input},
%Journey.Graph.Input{name: :x, type: :input},
%Journey.Graph.Input{name: :y, type: :input},
%Journey.Graph.Step{
name: :sum,
gated_by: [:x, :y],
f_compute: #Function<42.81571850/1 in :erl_eval.expr/6>,
f_on_save: nil,
type: :compute,
mutates: nil,
max_retries: 3,
abandon_after_seconds: 60
}
]
}
Flow, visualized
graph
|> Journey.Tools.generate_mermaid_graph()
|> Kino.Mermaid.new()
graph TD
%% Graph
subgraph Graph["🧩 'g1', version v1"]
execution_id[execution_id]
last_updated_at[last_updated_at]
x[x]
y[y]
sum["sum
(anonymous fn)"]
x --> sum
y --> sum
end
%% Styling
classDef inputNode fill:#e1f5fe,stroke:#01579b,stroke-width:2px,color:#000000
classDef computeNode fill:#f3e5f5,stroke:#4a148c,stroke-width:2px,color:#000000
classDef scheduleNode fill:#fff3e0,stroke:#e65100,stroke-width:2px,color:#000000
classDef mutateNode fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px,color:#000000
%% Apply styles to actual nodes
class y,x,last_updated_at,execution_id inputNode
class sum computeNode
Executing instances of the blueprint
Starting an new execution
execution = Journey.start_execution(graph)
# Take a note of the id of the execution, so we can reload it in case the data center reboots.
execution_id = execution.id
"EXECZ5J17YRXL83G0Y9HVBEH"
# No values are set, except for system-provided values.
Journey.values_all(execution)
%{
sum: :not_set,
y: :not_set,
x: :not_set,
last_updated_at: {:set, 1755151863},
execution_id: {:set, "EXECZ5J17YRXL83G0Y9HVBEH"}
}
Once we have :x
and :y
, :sum
gets computed
execution = Journey.set_value(execution, :x, 12); :ok
:ok
Btw, if the world crashed (or got redeployed), no worries. As long as we took a note of the ID of the execution, we can load the execution as soon as things are back up, and proceed as if nothing happened.
reloaded_execution = Journey.load(execution_id)
%Journey.Persistence.Schema.Execution{
__meta__: #Ecto.Schema.Metadata<:loaded, "executions">,
id: "EXECZ5J17YRXL83G0Y9HVBEH",
graph_name: "g1",
graph_version: "v1",
archived_at: nil,
values: [
%Journey.Persistence.Schema.Execution.Value{
__meta__: #Ecto.Schema.Metadata<:loaded, "values">,
id: "VALBMDGBLBGBG6T403Z37YJ",
execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
execution: #Ecto.Association.NotLoaded,
node_name: :last_updated_at,
node_type: :input,
node_value: 1755151863,
set_time: 1755151863,
ex_revision: 1,
inserted_at: 1755151863,
updated_at: 1755151863
},
%Journey.Persistence.Schema.Execution.Value{
__meta__: #Ecto.Schema.Metadata<:loaded, "values">,
id: "VALD4A75R8E7XJ92RHVMJVE",
execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
execution: #Ecto.Association.NotLoaded,
node_name: :x,
node_type: :input,
node_value: 12,
set_time: 1755151863,
ex_revision: 1,
inserted_at: 1755151863,
updated_at: 1755151863
},
%Journey.Persistence.Schema.Execution.Value{
__meta__: #Ecto.Schema.Metadata<:loaded, "values">,
id: "VALE18VJVA3GX0VB454LJ63",
execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
execution: #Ecto.Association.NotLoaded,
node_name: :execution_id,
node_type: :input,
node_value: "EXECZ5J17YRXL83G0Y9HVBEH",
set_time: 1755151863,
ex_revision: 0,
inserted_at: 1755151863,
updated_at: 1755151863
},
%Journey.Persistence.Schema.Execution.Value{
__meta__: #Ecto.Schema.Metadata<:loaded, "values">,
id: "VALJATV0Y26YY8TAE4L5RRH",
execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
execution: #Ecto.Association.NotLoaded,
node_name: :y,
node_type: :input,
node_value: nil,
set_time: nil,
ex_revision: 0,
inserted_at: 1755151863,
updated_at: 1755151863
},
%Journey.Persistence.Schema.Execution.Value{
__meta__: #Ecto.Schema.Metadata<:loaded, "values">,
id: "VALH3YV7MBZMY6Y1HG7GVHV",
execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
execution: #Ecto.Association.NotLoaded,
node_name: :sum,
node_type: :compute,
node_value: nil,
set_time: nil,
ex_revision: 0,
inserted_at: 1755151863,
updated_at: 1755151863
}
],
computations: [
%Journey.Persistence.Schema.Execution.Computation{
__meta__: #Ecto.Schema.Metadata<:loaded, "computations">,
id: "CMPB8LZ37J6972AYH6Z0ZVM",
execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
execution: #Ecto.Association.NotLoaded,
node_name: :sum,
computation_type: :compute,
state: :not_set,
ex_revision_at_start: nil,
ex_revision_at_completion: nil,
scheduled_time: nil,
start_time: nil,
completion_time: nil,
deadline: nil,
error_details: nil,
computed_with: nil,
inserted_at: 1755151863,
updated_at: 1755151863
}
],
revision: 1,
inserted_at: 1755151863,
updated_at: 1755151863
}
reloaded_execution = Journey.set_value(reloaded_execution, :y, 2); :ok
:ok
Journey.values_all(reloaded_execution)
%{
sum: :not_set,
y: {:set, 2},
x: {:set, 12},
last_updated_at: {:set, 1755151863},
execution_id: {:set, "EXECZ5J17YRXL83G0Y9HVBEH"}
}
at this point, the customer provided :x
and :y
, and we can get the computed :sum
:
Journey.get_value(reloaded_execution, :sum, wait_any: true)
{:ok, 14}
“oh, wait, :x was actually something else!”
reloaded_execution = Journey.set_value(reloaded_execution, :x, 133); :ok
:ok
“no worries, here is the updated sum”
Journey.get_value(reloaded_execution, :sum, wait_new: true)
{:ok, 135}
This basic computation happened with peristence, resiliency, and scalability.
Search through computation records
“find me all the records where the sum is greater than 20, and whose x is less than 20!”
Journey.list_executions(
graph_name: graph.name,
graph_version: graph.version,
order_by_execution_fields: [:inserted_at],
value_filters: [{:sum, :gt, 20}, {:x, :lt, 20}]
)
|> Enum.map(fn e ->
Journey.values(e)
end)
[
%{y: 9, x: 2, last_updated_at: 1755145434, execution_id: "EXECTYHV1RM721EHH3V89A42"},
%{y: 9, x: 2, last_updated_at: 1755145483, execution_id: "EXECY4RE369XYJJ0VJX48ZH7"}
]
Flow Analytics: executions stats
Journey.Insights.Status.status()
%{
status: :healthy,
graphs: [
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 4, failed: 0, not_set: 2, computing: 0},
most_recently_created: "2025-08-12T18:43:19Z",
most_recently_updated: "2025-08-12T18:43:19Z"
},
executions: %{
active: 3,
archived: 0,
most_recently_created: "2025-08-12T18:40:47Z",
most_recently_updated: "2025-08-12T18:43:19Z"
}
},
graph_name: "basic graph, greetings 1",
graph_version: "1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 4, failed: 0, not_set: 0, computing: 0},
most_recently_created: "2025-08-12T17:16:04Z",
most_recently_updated: "2025-08-12T17:16:04Z"
},
executions: %{
active: 3,
archived: 0,
most_recently_created: "2025-08-12T17:08:11Z",
most_recently_updated: "2025-08-12T17:16:04Z"
}
},
graph_name: "basic graph, greetings 123",
graph_version: "1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 1, not_set: 1, computing: 0},
most_recently_created: "2025-08-04T17:15:37Z",
most_recently_updated: "2025-08-04T17:15:37Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-04T17:15:37Z",
most_recently_updated: "2025-08-04T17:15:37Z"
}
},
graph_name: "test failure graph 698",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
most_recently_created: "2025-08-04T17:11:24Z",
most_recently_updated: "2025-08-04T17:11:31Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-04T17:11:24Z",
most_recently_updated: "2025-08-04T17:11:31Z"
}
},
graph_name: "test failure graph 869",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 0, not_set: 1, computing: 0},
most_recently_created: "2025-08-04T17:09:51Z",
most_recently_updated: "2025-08-04T17:09:51Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-04T17:09:51Z",
most_recently_updated: "2025-08-04T17:09:51Z"
}
},
graph_name: "test",
graph_version: "v1"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 1, failed: 0, not_set: 0, computing: 0},
most_recently_created: "2025-08-12T14:34:43Z",
most_recently_updated: "2025-08-12T14:34:51Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-12T14:34:43Z",
most_recently_updated: "2025-08-12T14:34:51Z"
}
},
graph_name: "workflow with history",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
most_recently_created: "2025-08-04T17:10:26Z",
most_recently_updated: "2025-08-04T17:10:27Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-04T17:10:25Z",
most_recently_updated: "2025-08-04T17:10:27Z"
}
},
graph_name: "test failure graph",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
most_recently_created: "2025-08-04T17:14:51Z",
most_recently_updated: "2025-08-04T17:15:00Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-04T17:14:51Z",
most_recently_updated: "2025-08-04T17:15:00Z"
}
},
graph_name: "test failure graph 12",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 4, not_set: 0, computing: 0},
most_recently_created: "2025-08-04T17:27:48Z",
most_recently_updated: "2025-08-04T17:27:55Z"
},
executions: %{
active: 2,
archived: 0,
most_recently_created: "2025-08-04T17:27:48Z",
most_recently_updated: "2025-08-04T17:27:55Z"
}
},
graph_name: "astrological sign workflow, failure compute ",
graph_version: "v2.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
most_recently_created: "2025-08-04T17:15:03Z",
most_recently_updated: "2025-08-04T17:15:12Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-04T17:15:03Z",
most_recently_updated: "2025-08-04T17:15:12Z"
}
},
graph_name: "test failure graph 872",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
most_recently_created: "2025-08-04T17:13:59Z",
most_recently_updated: "2025-08-04T17:14:05Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-04T17:13:59Z",
most_recently_updated: "2025-08-04T17:14:05Z"
}
},
graph_name: "test failure graph 798",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{by_state: %{}, most_recently_created: nil, most_recently_updated: nil},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-13T17:11:44Z",
most_recently_updated: "2025-08-13T17:11:44Z"
}
},
graph_name: "graphtest2025-08-13",
graph_version: "v2"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
most_recently_created: "2025-08-04T17:15:25Z",
most_recently_updated: "2025-08-04T17:15:34Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-04T17:15:25Z",
most_recently_updated: "2025-08-04T17:15:34Z"
}
},
graph_name: "test failure graph 588",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{
abandoned: 1023,
cancelled: 0,
success: 61390,
failed: 0,
not_set: 151685,
computing: 33
},
most_recently_created: "2025-08-14T05:31:04Z",
most_recently_updated: "2025-08-14T05:31:05Z"
},
executions: %{
active: 12652,
archived: 5107,
most_recently_created: "2025-08-14T05:29:39Z",
most_recently_updated: "2025-08-14T05:31:05Z"
}
},
graph_name: "Credit Card Application flow graph",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{by_state: %{}, most_recently_created: nil, most_recently_updated: nil},
executions: %{
active: 2,
archived: 0,
most_recently_created: "2025-08-13T17:11:40Z",
most_recently_updated: "2025-08-13T17:11:40Z"
}
},
graph_name: "graphtest2025-08-13",
graph_version: "v1"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
most_recently_created: "2025-08-04T17:15:15Z",
most_recently_updated: "2025-08-04T17:15:24Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-04T17:15:14Z",
most_recently_updated: "2025-08-04T17:15:24Z"
}
},
graph_name: "test failure graph 925",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{
abandoned: 0,
cancelled: 0,
success: 1696,
failed: 0,
not_set: 2250,
computing: 54
},
most_recently_created: "2025-08-01T22:05:06Z",
most_recently_updated: "2025-08-01T22:05:09Z"
},
executions: %{
active: 900,
archived: 100,
most_recently_created: "2025-08-01T22:05:06Z",
most_recently_updated: "2025-08-01T22:05:09Z"
}
},
graph_name: "flow_analytics_perf_test",
graph_version: "1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
most_recently_created: "2025-08-04T17:11:13Z",
most_recently_updated: "2025-08-04T17:11:14Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-04T17:11:13Z",
most_recently_updated: "2025-08-04T17:11:14Z"
}
},
graph_name: "test failure graph 821",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 2, cancelled: 0, success: 32, failed: 7, not_set: 4, computing: 0},
most_recently_created: "2025-08-14T06:11:03Z",
most_recently_updated: "2025-08-14T06:11:03Z"
},
executions: %{
active: 22,
archived: 0,
most_recently_created: "2025-08-14T06:11:03Z",
most_recently_updated: "2025-08-14T06:11:03Z"
}
},
graph_name: "g1",
graph_version: "v1"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
most_recently_created: "2025-08-04T17:14:40Z",
most_recently_updated: "2025-08-04T17:14:50Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-04T17:14:40Z",
most_recently_updated: "2025-08-04T17:14:50Z"
}
},
graph_name: "test failure graph 279",
graph_version: "v1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 1, failed: 0, not_set: 0, computing: 0},
most_recently_created: "2025-08-11T06:04:56Z",
most_recently_updated: "2025-08-11T06:04:56Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-08-11T06:04:56Z",
most_recently_updated: "2025-08-11T06:04:56Z"
}
},
graph_name: "basic graph, greetings TestDebug test_debug",
graph_version: "1.0.0"
},
%{
stats: %{
computations: %{
by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 0, not_set: 3, computing: 0},
most_recently_created: "2025-07-30T04:45:26Z",
most_recently_updated: "2025-07-30T04:45:26Z"
},
executions: %{
active: 1,
archived: 0,
most_recently_created: "2025-07-30T04:45:26Z",
most_recently_updated: "2025-07-30T04:59:16Z"
}
},
graph_name: "horoscope workflow, success Elixir.Journey.Examples.Horoscope-64eAJ",
graph_version: "v1.0.0"
}
],
database_connected: true
}
# get some analytics for the executions flowing through the system: how many, what does the funnel look like
Journey.Insights.FlowAnalytics.flow_analytics(graph.name, graph.version)
%{
graph_name: "g1",
graph_version: "v1",
node_stats: %{
nodes: [
%{
node_type: :input,
node_name: :x,
flow_ends_here_count: 0,
reached_count: 18,
reached_percentage: 81.8,
average_time_to_reach: 56,
flow_ends_here_percentage_of_all: 0.0,
flow_ends_here_percentage_of_reached: 0.0
},
%{
node_type: :input,
node_name: :y,
flow_ends_here_count: 0,
reached_count: 18,
reached_percentage: 81.8,
average_time_to_reach: 646,
flow_ends_here_percentage_of_all: 0.0,
flow_ends_here_percentage_of_reached: 0.0
},
%{
node_type: :compute,
node_name: :sum,
flow_ends_here_count: 0,
reached_count: 16,
reached_percentage: 72.7,
average_time_to_reach: 84,
flow_ends_here_percentage_of_all: 0.0,
flow_ends_here_percentage_of_reached: 0.0
}
]
},
analyzed_at: "2025-08-14T06:11:04.181883Z",
executions: %{
count: 22,
duration_median_seconds_to_last_update: 1,
duration_avg_seconds_to_last_update: 556
}
}
In summary
This showed:
- an application defined as a graph + business logic,
- an execution of the flow take place, step by step,
- an execution of the flow be interrupted and resumed, as if nothing happened,
- analytics describing the “funnel” of executions of your graph,
Behind the scenes (not visible in this simple example):
- computations were subject to a retry policy and retries in case of failures,
- computations scale seamlessly: they run on any replica of your application.
This all happened without application data getting shipped to a third party, or requiring a SAAS dependency.
See Journey documentation for examples of more complex applications (e.g. a Horoscope app, or a Credit Card Application flow, which includes Mutations, and one-time and recurring Scheduled events).