Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

x + y = _ , with Journey

lib/examples/basic.livemd

x + y = _ , with Journey

Mix.install(
  [
    {:ecto_sql, "~> 3.10"},
    {:postgrex, "~> 0.21"},
    {:jason, "~> 1.4"},
    {:journey, "~> 0.10", organization: "shipworthy"},
    {:kino_vega_lite, "~> 0.1.11"},
    {:kino, "~> 0.16.1"}
  ],
  start_applications: false
)

Application.put_env(:journey, :log_level, :warning)

Application.put_env(:journey, Journey.Repo,
  database: "journey_dev",
  username: "postgres",
  password: "postgres",
  hostname: "localhost",
  log: false,
  port: 5438
)

Application.put_env(:journey, :ecto_repos, [Journey.Repo])

Application.loaded_applications()
|> Enum.map(fn {app, _, _} -> app end)
|> Enum.each(&Application.ensure_all_started/1)

Getting things done with Journey

This livebook shows using Journey for a ridiculously basic flow: computing the sum of two numbers. It has two input values (x and y) and one computation (sum).

This livebook shows creating a blueprint (graph) for computing the sum, and then executing an instance of the blueprint to perform a computation.

A few things to note:

  • every input value (:x, :y), or computation result (:sum) is persisted,
  • the :sum computation happens reliably (with a retry policy),
  • the :sum computation is as horizontally distributed as your app,
  • the :sum computation is proactive: it will be computed when x and y become available,
  • the executions of this flow can take as long as needed (milliseconds? months?), and will live through system restarts, crashes, redeployments, page reloads, etc.

Define the Blueprint of the Application

Our application is very simple, given two numbers, it computes the sum. ; )

“Business logic”: f_add(x, y)

# The function for adding two numbers. Duh. ; )
f_add = fn %{x: x, y: y} -> {:ok, x + y} end
#Function<42.81571850/1 in :erl_eval.expr/6>

The flow

import Journey.Node
import Journey.Node.Conditions
import Journey.Node.UpstreamDependencies

graph = Journey.new_graph(
  # graph name.
  "g1",
  # graph version.
  "v1",
  # graph nodes.
  [
    input(:x),
    input(:y),
    # the `:sum` computation is waiting on :x and :y.   
    compute(:sum, [:x, :y], f_add)
  ]
)
%Journey.Graph{
  name: "g1",
  version: "v1",
  nodes: [
    %Journey.Graph.Input{name: :execution_id, type: :input},
    %Journey.Graph.Input{name: :last_updated_at, type: :input},
    %Journey.Graph.Input{name: :x, type: :input},
    %Journey.Graph.Input{name: :y, type: :input},
    %Journey.Graph.Step{
      name: :sum,
      gated_by: [:x, :y],
      f_compute: #Function<42.81571850/1 in :erl_eval.expr/6>,
      f_on_save: nil,
      type: :compute,
      mutates: nil,
      max_retries: 3,
      abandon_after_seconds: 60
    }
  ]
}

Flow, visualized

graph
|> Journey.Tools.generate_mermaid_graph()
|> Kino.Mermaid.new()
graph TD
    %% Graph
    subgraph Graph["🧩 'g1', version v1"]
        execution_id[execution_id]
        last_updated_at[last_updated_at]
        x[x]
        y[y]
        sum["sum
(anonymous fn)"] x --> sum y --> sum end %% Styling classDef inputNode fill:#e1f5fe,stroke:#01579b,stroke-width:2px,color:#000000 classDef computeNode fill:#f3e5f5,stroke:#4a148c,stroke-width:2px,color:#000000 classDef scheduleNode fill:#fff3e0,stroke:#e65100,stroke-width:2px,color:#000000 classDef mutateNode fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px,color:#000000 %% Apply styles to actual nodes class y,x,last_updated_at,execution_id inputNode class sum computeNode

Executing instances of the blueprint

Starting an new execution

execution = Journey.start_execution(graph)

# Take a note of the id of the execution, so we can reload it in case the data center reboots.
execution_id = execution.id
"EXECZ5J17YRXL83G0Y9HVBEH"
# No values are set, except for system-provided values.
Journey.values_all(execution)
%{
  sum: :not_set,
  y: :not_set,
  x: :not_set,
  last_updated_at: {:set, 1755151863},
  execution_id: {:set, "EXECZ5J17YRXL83G0Y9HVBEH"}
}

Once we have :x and :y, :sum gets computed

execution = Journey.set_value(execution, :x, 12); :ok
:ok

Btw, if the world crashed (or got redeployed), no worries. As long as we took a note of the ID of the execution, we can load the execution as soon as things are back up, and proceed as if nothing happened.

reloaded_execution = Journey.load(execution_id)
%Journey.Persistence.Schema.Execution{
  __meta__: #Ecto.Schema.Metadata<:loaded, "executions">,
  id: "EXECZ5J17YRXL83G0Y9HVBEH",
  graph_name: "g1",
  graph_version: "v1",
  archived_at: nil,
  values: [
    %Journey.Persistence.Schema.Execution.Value{
      __meta__: #Ecto.Schema.Metadata<:loaded, "values">,
      id: "VALBMDGBLBGBG6T403Z37YJ",
      execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
      execution: #Ecto.Association.NotLoaded,
      node_name: :last_updated_at,
      node_type: :input,
      node_value: 1755151863,
      set_time: 1755151863,
      ex_revision: 1,
      inserted_at: 1755151863,
      updated_at: 1755151863
    },
    %Journey.Persistence.Schema.Execution.Value{
      __meta__: #Ecto.Schema.Metadata<:loaded, "values">,
      id: "VALD4A75R8E7XJ92RHVMJVE",
      execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
      execution: #Ecto.Association.NotLoaded,
      node_name: :x,
      node_type: :input,
      node_value: 12,
      set_time: 1755151863,
      ex_revision: 1,
      inserted_at: 1755151863,
      updated_at: 1755151863
    },
    %Journey.Persistence.Schema.Execution.Value{
      __meta__: #Ecto.Schema.Metadata<:loaded, "values">,
      id: "VALE18VJVA3GX0VB454LJ63",
      execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
      execution: #Ecto.Association.NotLoaded,
      node_name: :execution_id,
      node_type: :input,
      node_value: "EXECZ5J17YRXL83G0Y9HVBEH",
      set_time: 1755151863,
      ex_revision: 0,
      inserted_at: 1755151863,
      updated_at: 1755151863
    },
    %Journey.Persistence.Schema.Execution.Value{
      __meta__: #Ecto.Schema.Metadata<:loaded, "values">,
      id: "VALJATV0Y26YY8TAE4L5RRH",
      execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
      execution: #Ecto.Association.NotLoaded,
      node_name: :y,
      node_type: :input,
      node_value: nil,
      set_time: nil,
      ex_revision: 0,
      inserted_at: 1755151863,
      updated_at: 1755151863
    },
    %Journey.Persistence.Schema.Execution.Value{
      __meta__: #Ecto.Schema.Metadata<:loaded, "values">,
      id: "VALH3YV7MBZMY6Y1HG7GVHV",
      execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
      execution: #Ecto.Association.NotLoaded,
      node_name: :sum,
      node_type: :compute,
      node_value: nil,
      set_time: nil,
      ex_revision: 0,
      inserted_at: 1755151863,
      updated_at: 1755151863
    }
  ],
  computations: [
    %Journey.Persistence.Schema.Execution.Computation{
      __meta__: #Ecto.Schema.Metadata<:loaded, "computations">,
      id: "CMPB8LZ37J6972AYH6Z0ZVM",
      execution_id: "EXECZ5J17YRXL83G0Y9HVBEH",
      execution: #Ecto.Association.NotLoaded,
      node_name: :sum,
      computation_type: :compute,
      state: :not_set,
      ex_revision_at_start: nil,
      ex_revision_at_completion: nil,
      scheduled_time: nil,
      start_time: nil,
      completion_time: nil,
      deadline: nil,
      error_details: nil,
      computed_with: nil,
      inserted_at: 1755151863,
      updated_at: 1755151863
    }
  ],
  revision: 1,
  inserted_at: 1755151863,
  updated_at: 1755151863
}
reloaded_execution = Journey.set_value(reloaded_execution, :y, 2); :ok
:ok
Journey.values_all(reloaded_execution)
%{
  sum: :not_set,
  y: {:set, 2},
  x: {:set, 12},
  last_updated_at: {:set, 1755151863},
  execution_id: {:set, "EXECZ5J17YRXL83G0Y9HVBEH"}
}

at this point, the customer provided :x and :y, and we can get the computed :sum:

Journey.get_value(reloaded_execution, :sum, wait_any: true)
{:ok, 14}

“oh, wait, :x was actually something else!”

reloaded_execution = Journey.set_value(reloaded_execution, :x, 133); :ok
:ok

“no worries, here is the updated sum”

Journey.get_value(reloaded_execution, :sum, wait_new: true)
{:ok, 135}

This basic computation happened with peristence, resiliency, and scalability.

Search through computation records

“find me all the records where the sum is greater than 20, and whose x is less than 20!”

Journey.list_executions(
  graph_name: graph.name,
  graph_version: graph.version,
  order_by_execution_fields: [:inserted_at],
  value_filters: [{:sum, :gt, 20}, {:x, :lt, 20}]
)
|> Enum.map(fn e ->
  Journey.values(e)
end)
[
  %{y: 9, x: 2, last_updated_at: 1755145434, execution_id: "EXECTYHV1RM721EHH3V89A42"},
  %{y: 9, x: 2, last_updated_at: 1755145483, execution_id: "EXECY4RE369XYJJ0VJX48ZH7"}
]

Flow Analytics: executions stats

Journey.Insights.Status.status()
%{
  status: :healthy,
  graphs: [
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 4, failed: 0, not_set: 2, computing: 0},
          most_recently_created: "2025-08-12T18:43:19Z",
          most_recently_updated: "2025-08-12T18:43:19Z"
        },
        executions: %{
          active: 3,
          archived: 0,
          most_recently_created: "2025-08-12T18:40:47Z",
          most_recently_updated: "2025-08-12T18:43:19Z"
        }
      },
      graph_name: "basic graph, greetings  1",
      graph_version: "1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 4, failed: 0, not_set: 0, computing: 0},
          most_recently_created: "2025-08-12T17:16:04Z",
          most_recently_updated: "2025-08-12T17:16:04Z"
        },
        executions: %{
          active: 3,
          archived: 0,
          most_recently_created: "2025-08-12T17:08:11Z",
          most_recently_updated: "2025-08-12T17:16:04Z"
        }
      },
      graph_name: "basic graph, greetings  123",
      graph_version: "1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 1, not_set: 1, computing: 0},
          most_recently_created: "2025-08-04T17:15:37Z",
          most_recently_updated: "2025-08-04T17:15:37Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-04T17:15:37Z",
          most_recently_updated: "2025-08-04T17:15:37Z"
        }
      },
      graph_name: "test failure graph 698",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
          most_recently_created: "2025-08-04T17:11:24Z",
          most_recently_updated: "2025-08-04T17:11:31Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-04T17:11:24Z",
          most_recently_updated: "2025-08-04T17:11:31Z"
        }
      },
      graph_name: "test failure graph 869",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 0, not_set: 1, computing: 0},
          most_recently_created: "2025-08-04T17:09:51Z",
          most_recently_updated: "2025-08-04T17:09:51Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-04T17:09:51Z",
          most_recently_updated: "2025-08-04T17:09:51Z"
        }
      },
      graph_name: "test",
      graph_version: "v1"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 1, failed: 0, not_set: 0, computing: 0},
          most_recently_created: "2025-08-12T14:34:43Z",
          most_recently_updated: "2025-08-12T14:34:51Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-12T14:34:43Z",
          most_recently_updated: "2025-08-12T14:34:51Z"
        }
      },
      graph_name: "workflow with history",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
          most_recently_created: "2025-08-04T17:10:26Z",
          most_recently_updated: "2025-08-04T17:10:27Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-04T17:10:25Z",
          most_recently_updated: "2025-08-04T17:10:27Z"
        }
      },
      graph_name: "test failure graph",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
          most_recently_created: "2025-08-04T17:14:51Z",
          most_recently_updated: "2025-08-04T17:15:00Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-04T17:14:51Z",
          most_recently_updated: "2025-08-04T17:15:00Z"
        }
      },
      graph_name: "test failure graph 12",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 4, not_set: 0, computing: 0},
          most_recently_created: "2025-08-04T17:27:48Z",
          most_recently_updated: "2025-08-04T17:27:55Z"
        },
        executions: %{
          active: 2,
          archived: 0,
          most_recently_created: "2025-08-04T17:27:48Z",
          most_recently_updated: "2025-08-04T17:27:55Z"
        }
      },
      graph_name: "astrological sign workflow, failure compute ",
      graph_version: "v2.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
          most_recently_created: "2025-08-04T17:15:03Z",
          most_recently_updated: "2025-08-04T17:15:12Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-04T17:15:03Z",
          most_recently_updated: "2025-08-04T17:15:12Z"
        }
      },
      graph_name: "test failure graph 872",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
          most_recently_created: "2025-08-04T17:13:59Z",
          most_recently_updated: "2025-08-04T17:14:05Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-04T17:13:59Z",
          most_recently_updated: "2025-08-04T17:14:05Z"
        }
      },
      graph_name: "test failure graph 798",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{by_state: %{}, most_recently_created: nil, most_recently_updated: nil},
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-13T17:11:44Z",
          most_recently_updated: "2025-08-13T17:11:44Z"
        }
      },
      graph_name: "graphtest2025-08-13",
      graph_version: "v2"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
          most_recently_created: "2025-08-04T17:15:25Z",
          most_recently_updated: "2025-08-04T17:15:34Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-04T17:15:25Z",
          most_recently_updated: "2025-08-04T17:15:34Z"
        }
      },
      graph_name: "test failure graph 588",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{
            abandoned: 1023,
            cancelled: 0,
            success: 61390,
            failed: 0,
            not_set: 151685,
            computing: 33
          },
          most_recently_created: "2025-08-14T05:31:04Z",
          most_recently_updated: "2025-08-14T05:31:05Z"
        },
        executions: %{
          active: 12652,
          archived: 5107,
          most_recently_created: "2025-08-14T05:29:39Z",
          most_recently_updated: "2025-08-14T05:31:05Z"
        }
      },
      graph_name: "Credit Card Application flow graph",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{by_state: %{}, most_recently_created: nil, most_recently_updated: nil},
        executions: %{
          active: 2,
          archived: 0,
          most_recently_created: "2025-08-13T17:11:40Z",
          most_recently_updated: "2025-08-13T17:11:40Z"
        }
      },
      graph_name: "graphtest2025-08-13",
      graph_version: "v1"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
          most_recently_created: "2025-08-04T17:15:15Z",
          most_recently_updated: "2025-08-04T17:15:24Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-04T17:15:14Z",
          most_recently_updated: "2025-08-04T17:15:24Z"
        }
      },
      graph_name: "test failure graph 925",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{
            abandoned: 0,
            cancelled: 0,
            success: 1696,
            failed: 0,
            not_set: 2250,
            computing: 54
          },
          most_recently_created: "2025-08-01T22:05:06Z",
          most_recently_updated: "2025-08-01T22:05:09Z"
        },
        executions: %{
          active: 900,
          archived: 100,
          most_recently_created: "2025-08-01T22:05:06Z",
          most_recently_updated: "2025-08-01T22:05:09Z"
        }
      },
      graph_name: "flow_analytics_perf_test",
      graph_version: "1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
          most_recently_created: "2025-08-04T17:11:13Z",
          most_recently_updated: "2025-08-04T17:11:14Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-04T17:11:13Z",
          most_recently_updated: "2025-08-04T17:11:14Z"
        }
      },
      graph_name: "test failure graph 821",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 2, cancelled: 0, success: 32, failed: 7, not_set: 4, computing: 0},
          most_recently_created: "2025-08-14T06:11:03Z",
          most_recently_updated: "2025-08-14T06:11:03Z"
        },
        executions: %{
          active: 22,
          archived: 0,
          most_recently_created: "2025-08-14T06:11:03Z",
          most_recently_updated: "2025-08-14T06:11:03Z"
        }
      },
      graph_name: "g1",
      graph_version: "v1"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 2, not_set: 0, computing: 0},
          most_recently_created: "2025-08-04T17:14:40Z",
          most_recently_updated: "2025-08-04T17:14:50Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-04T17:14:40Z",
          most_recently_updated: "2025-08-04T17:14:50Z"
        }
      },
      graph_name: "test failure graph 279",
      graph_version: "v1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 1, failed: 0, not_set: 0, computing: 0},
          most_recently_created: "2025-08-11T06:04:56Z",
          most_recently_updated: "2025-08-11T06:04:56Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-08-11T06:04:56Z",
          most_recently_updated: "2025-08-11T06:04:56Z"
        }
      },
      graph_name: "basic graph, greetings TestDebug test_debug",
      graph_version: "1.0.0"
    },
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 0, failed: 0, not_set: 3, computing: 0},
          most_recently_created: "2025-07-30T04:45:26Z",
          most_recently_updated: "2025-07-30T04:45:26Z"
        },
        executions: %{
          active: 1,
          archived: 0,
          most_recently_created: "2025-07-30T04:45:26Z",
          most_recently_updated: "2025-07-30T04:59:16Z"
        }
      },
      graph_name: "horoscope workflow, success Elixir.Journey.Examples.Horoscope-64eAJ",
      graph_version: "v1.0.0"
    }
  ],
  database_connected: true
}
# get some analytics for the executions flowing through the system: how many, what does the funnel look like
Journey.Insights.FlowAnalytics.flow_analytics(graph.name, graph.version)
%{
  graph_name: "g1",
  graph_version: "v1",
  node_stats: %{
    nodes: [
      %{
        node_type: :input,
        node_name: :x,
        flow_ends_here_count: 0,
        reached_count: 18,
        reached_percentage: 81.8,
        average_time_to_reach: 56,
        flow_ends_here_percentage_of_all: 0.0,
        flow_ends_here_percentage_of_reached: 0.0
      },
      %{
        node_type: :input,
        node_name: :y,
        flow_ends_here_count: 0,
        reached_count: 18,
        reached_percentage: 81.8,
        average_time_to_reach: 646,
        flow_ends_here_percentage_of_all: 0.0,
        flow_ends_here_percentage_of_reached: 0.0
      },
      %{
        node_type: :compute,
        node_name: :sum,
        flow_ends_here_count: 0,
        reached_count: 16,
        reached_percentage: 72.7,
        average_time_to_reach: 84,
        flow_ends_here_percentage_of_all: 0.0,
        flow_ends_here_percentage_of_reached: 0.0
      }
    ]
  },
  analyzed_at: "2025-08-14T06:11:04.181883Z",
  executions: %{
    count: 22,
    duration_median_seconds_to_last_update: 1,
    duration_avg_seconds_to_last_update: 556
  }
}

In summary

This showed:

  • an application defined as a graph + business logic,
  • an execution of the flow take place, step by step,
  • an execution of the flow be interrupted and resumed, as if nothing happened,
  • analytics describing the “funnel” of executions of your graph,

Behind the scenes (not visible in this simple example):

  • computations were subject to a retry policy and retries in case of failures,
  • computations scale seamlessly: they run on any replica of your application.

This all happened without application data getting shipped to a third party, or requiring a SAAS dependency.

See Journey documentation for examples of more complex applications (e.g. a Horoscope app, or a Credit Card Application flow, which includes Mutations, and one-time and recurring Scheduled events).