Powered by AppSignal & Oban Pro

Building a resilient application with Journey

lib/examples/basic.livemd

Building a resilient application with Journey

# [Optional] Setting Build Key, see https://gojourney.dev/your_keys
# (Using "Journey Livebook Demo" build key)
System.put_env("JOURNEY_BUILD_KEY", "B27AXHMERm2Z6ehZhL49v")

Mix.install(
  [
    {:ecto_sql, "~> 3.10"},
    {:postgrex, "~> 0.21"},
    {:jason, "~> 1.4"},
    {:journey, "~> 0.10"},
    # {:journey, path: Path.join([__DIR__, "../.."])},
    {:kino_vega_lite, "~> 0.1.11"},
    {:kino, "~> 0.16.1"}
  ],
  start_applications: false
)

Application.put_env(:journey, :log_level, :warning)

# Update this configuration to point to your database server
# (to create the database, run `mix ecto.create` from the root of the repo).
Application.put_env(:journey, Journey.Repo,
  database: "journey_dev",
  username: "postgres",
  password: "postgres",
  hostname: "localhost",
  log: false,
  port: 5432
)

Application.put_env(:journey, :ecto_repos, [Journey.Repo])

Application.loaded_applications()
|> Enum.map(fn {app, _, _} -> app end)
|> Enum.each(&Application.ensure_all_started/1)

Getting things done with Journey

This livebook shows using Journey for a ridiculously basic flow: computing the sum of two numbers, and determining whether the sum exceeds a threshold. It has two input values (x and y) and two computations (sum and large_value_alert).

This livebook shows creating a blueprint (graph) for computing the sum and large_value_alert, and then executing an instance of the blueprint to perform computations for a particular set of inputs.

A few things to note:

  • every input value (:x, :y), or computation result (:sum, :large_value_alert) is persisted,
  • the two computations
    • happen reliably (their functions are executed with a retry policy),
    • are as horizontally distributed as your app is (the functions will run wherever your app runs),
    • are proactive (:sum will be computed when :x and :y become available, and :large_value_alert will be computed when :sum is available).
  • executions of this flow can be as long-running as needed (milliseconds? months?), and will live through system restarts, crashes, redeployments, page reloads, etc.

These attributes – reliability, scalability, and persistence – come without the need to subscribe to an online service, or ship your application’s data to a third party, or to deploy any additional infrastructure. Just your application, using a package, storing data in your database, and running as it normally would.

Define the Blueprint of the Application

Our application is very simple, given two numbers, it computes the sum, and sets an alert if the sum is “too large”.

“Business logic”: f_add(x, y)

# The function for adding two numbers. Part of the "business logic" of this application.
f_add = fn %{x: x, y: y} -> {:ok, x + y} end
#Function<42.81571850/1 in :erl_eval.expr/6>

The flow

A journey graph is the blueprint for this application. It defines its inputs and computed values exist, their dependencies, and attaches functions to self-computing values.

import Journey.Node
import Journey.Node.Conditions
import Journey.Node.UpstreamDependencies

graph = Journey.new_graph(
  # graph name.
  "g1",
  # graph version.
  "v1",
  # graph nodes.
  [
    input(:x),
    input(:y),
    # the `:sum` computation is waiting on :x and :y.   
    compute(:sum, [:x, :y], f_add),
    compute(
      :large_value_alert, 
      unblocked_when(
        :sum, 
        fn sum_node -> sum_node.set_time != nil and sum_node.node_value > 40 end
      ),
      fn _ -> {:ok, "🚨"} end
    ),
  ]
)
%Journey.Graph{
  name: "g1",
  version: "v1",
  nodes: [
    %Journey.Graph.Input{name: :execution_id, type: :input},
    %Journey.Graph.Input{name: :last_updated_at, type: :input},
    %Journey.Graph.Input{name: :x, type: :input},
    %Journey.Graph.Input{name: :y, type: :input},
    %Journey.Graph.Step{
      name: :sum,
      gated_by: [:x, :y],
      f_compute: #Function<42.81571850/1 in :erl_eval.expr/6>,
      f_on_save: nil,
      type: :compute,
      mutates: nil,
      max_retries: 3,
      abandon_after_seconds: 60
    },
    %Journey.Graph.Step{
      name: :large_value_alert,
      gated_by: {:sum, #Function<42.81571850/1 in :erl_eval.expr/6>},
      f_compute: #Function<42.81571850/1 in :erl_eval.expr/6>,
      f_on_save: nil,
      type: :compute,
      mutates: nil,
      max_retries: 3,
      abandon_after_seconds: 60
    }
  ]
}

Flow, visualized

Here is the visual – Mermaid – representation of the graph that we have just defined.

You can see the two input values (:x, :y), the two computations (:sum,:large_value_alert), and their dependencies.

It also shows two system values, :execution_id and :last_updated_at, which are maintained by the runtime.

graph
|> Journey.Tools.generate_mermaid_graph()
|> Kino.Mermaid.new()
graph TD
    %% Graph
    subgraph Graph["🧩 'g1', version v1"]
        execution_id[execution_id]
        last_updated_at[last_updated_at]
        x[x]
        y[y]
        sum["sum
(anonymous fn)"] large_value_alert["large_value_alert
(anonymous fn)"] x --> sum y --> sum sum --> large_value_alert end %% Styling classDef inputNode fill:#e1f5fe,stroke:#01579b,stroke-width:2px,color:#000000 classDef computeNode fill:#f3e5f5,stroke:#4a148c,stroke-width:2px,color:#000000 classDef scheduleNode fill:#fff3e0,stroke:#e65100,stroke-width:2px,color:#000000 classDef mutateNode fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px,color:#000000 %% Apply styles to actual nodes class y,x,last_updated_at,execution_id inputNode class large_value_alert,sum computeNode

Executing instances of the blueprint

Now that we have the blueprint of the application, we can run its executions.

Starting a new execution

Here is an example of starting a new execution of the graph. If the application handles a user’s visit to your website, this might happen when the user lands on the web page, and, perhaps, starts engaging with it.

We’ll take a note of the id of the execution, just in case everything crashes (or if the user reloads the page, or leaves and comes back in a month) and we need to reload it later.

execution = Journey.start_execution(graph)

# Take a note of the id of the execution, so we can reload it in case the data center reboots.
execution_id = execution.id
"EXECLG0R2MD2JBARMVRE380L"

The new execution doesn’t have much in it at this point, nothing has been set or computed, except for the two system fields.

# No values are set, except for system-provided values.
Journey.values_all(execution)
%{
  sum: :not_set,
  y: :not_set,
  x: :not_set,
  last_updated_at: {:set, 1755833244},
  execution_id: {:set, "EXECLG0R2MD2JBARMVRE380L"},
  large_value_alert: :not_set
}

Once :x and :y are provided, :sum gets computed

The user might start supplying the data:

execution = Journey.set_value(execution, :x, 12); :ok
:ok

Btw, if the world crashed (or got redeployed, or if the user leaves), no worries.

Since we took a note of the ID of the execution, we can load the execution as soon as things are back up (or when the user comes back), and proceed as if nothing happened.

reloaded_execution = Journey.load(execution_id)
%Journey.Persistence.Schema.Execution{
  __meta__: #Ecto.Schema.Metadata<:loaded, "executions">,
  id: "EXECLG0R2MD2JBARMVRE380L",
  graph_name: "g1",
  graph_version: "v1",
  archived_at: nil,
  values: [
    %Journey.Persistence.Schema.Execution.Value{
      __meta__: #Ecto.Schema.Metadata<:loaded, "values">,
      id: "VALGLMD0YZDTBAD40LTX7G3",
      execution_id: "EXECLG0R2MD2JBARMVRE380L",
      execution: #Ecto.Association.NotLoaded,
      node_name: :last_updated_at,
      node_type: :input,
      node_value: 1755833251,
      set_time: 1755833251,
      ex_revision: 1,
      inserted_at: 1755833244,
      updated_at: 1755833251
    },
    %Journey.Persistence.Schema.Execution.Value{
      __meta__: #Ecto.Schema.Metadata<:loaded, "values">,
      id: "VALXH5TGBT2EZ64LR26AEYX",
      execution_id: "EXECLG0R2MD2JBARMVRE380L",
      execution: #Ecto.Association.NotLoaded,
      node_name: :x,
      node_type: :input,
      node_value: 12,
      set_time: 1755833251,
      ex_revision: 1,
      inserted_at: 1755833244,
      updated_at: 1755833251
    },
    %Journey.Persistence.Schema.Execution.Value{
      __meta__: #Ecto.Schema.Metadata<:loaded, "values">,
      id: "VAL5GZB3YHBG48RX2G00AYL",
      execution_id: "EXECLG0R2MD2JBARMVRE380L",
      execution: #Ecto.Association.NotLoaded,
      node_name: :execution_id,
      node_type: :input,
      node_value: "EXECLG0R2MD2JBARMVRE380L",
      set_time: 1755833244,
      ex_revision: 0,
      inserted_at: 1755833244,
      updated_at: 1755833244
    },
    %Journey.Persistence.Schema.Execution.Value{
      __meta__: #Ecto.Schema.Metadata<:loaded, "values">,
      id: "VAL3HT5MMBTHX7TXD93D2A0",
      execution_id: "EXECLG0R2MD2JBARMVRE380L",
      execution: #Ecto.Association.NotLoaded,
      node_name: :y,
      node_type: :input,
      node_value: nil,
      set_time: nil,
      ex_revision: 0,
      inserted_at: 1755833244,
      updated_at: 1755833244
    },
    %Journey.Persistence.Schema.Execution.Value{
      __meta__: #Ecto.Schema.Metadata<:loaded, "values">,
      id: "VALM10Z0R5VHJDA7VJL68G9",
      execution_id: "EXECLG0R2MD2JBARMVRE380L",
      execution: #Ecto.Association.NotLoaded,
      node_name: :sum,
      node_type: :compute,
      node_value: nil,
      set_time: nil,
      ex_revision: 0,
      inserted_at: 1755833244,
      updated_at: 1755833244
    },
    %Journey.Persistence.Schema.Execution.Value{
      __meta__: #Ecto.Schema.Metadata<:loaded, "values">,
      id: "VALGJGJA9A860BB3JH59YY9",
      execution_id: "EXECLG0R2MD2JBARMVRE380L",
      execution: #Ecto.Association.NotLoaded,
      node_name: :large_value_alert,
      node_type: :compute,
      node_value: nil,
      set_time: nil,
      ex_revision: 0,
      inserted_at: 1755833244,
      updated_at: 1755833244
    }
  ],
  computations: [
    %Journey.Persistence.Schema.Execution.Computation{
      __meta__: #Ecto.Schema.Metadata<:loaded, "computations">,
      id: "CMPR0Y39G6ARJYG37T4G9G3",
      execution_id: "EXECLG0R2MD2JBARMVRE380L",
      execution: #Ecto.Association.NotLoaded,
      node_name: :sum,
      computation_type: :compute,
      state: :not_set,
      ex_revision_at_start: nil,
      ex_revision_at_completion: nil,
      scheduled_time: nil,
      start_time: nil,
      completion_time: nil,
      deadline: nil,
      error_details: nil,
      computed_with: nil,
      inserted_at: 1755833244,
      updated_at: 1755833244
    },
    %Journey.Persistence.Schema.Execution.Computation{
      __meta__: #Ecto.Schema.Metadata<:loaded, "computations">,
      id: "CMPZ1T53HMV6LYRA3GY7G5R",
      execution_id: "EXECLG0R2MD2JBARMVRE380L",
      execution: #Ecto.Association.NotLoaded,
      node_name: :large_value_alert,
      computation_type: :compute,
      state: :not_set,
      ex_revision_at_start: nil,
      ex_revision_at_completion: nil,
      scheduled_time: nil,
      start_time: nil,
      completion_time: nil,
      deadline: nil,
      error_details: nil,
      computed_with: nil,
      inserted_at: 1755833244,
      updated_at: 1755833244
    }
  ],
  revision: 1,
  inserted_at: 1755833244,
  updated_at: 1755833251
}

The user is supplying the other input:

reloaded_execution = Journey.set_value(reloaded_execution, :y, 2); :ok
:ok

Now that both :x and :y have been supplied, :sum gets computed. Here is the state attached to the execution:

(Note: since :sum is “small”, :large_value_alert does not get set (thanks to the condition we defined in the graph for this node.)

Journey.values_all(reloaded_execution)
%{
  sum: {:set, 14},
  y: {:set, 2},
  x: {:set, 12},
  last_updated_at: {:set, 1755833255},
  execution_id: {:set, "EXECLG0R2MD2JBARMVRE380L"},
  large_value_alert: :not_set
}

Can also get specific values:

Journey.get_value(reloaded_execution, :sum, wait_any: true)
{:ok, 14}
Journey.get_value(reloaded_execution, :large_value_alert)
{:error, :not_set}

Bigger :x -> bigger :sum -> :large_value_alert 🚨!!

If an input value changes, the downstream nodes get re-evaluated.

reloaded_execution = Journey.set_value(reloaded_execution, :x, 133); :ok
:ok

“no worries, here is the updated sum”

Journey.get_value(reloaded_execution, :sum, wait_new: true)
{:ok, 135}

The updated :x pushes :sum over the threshold that triggers :large_value_alert:

Journey.get_value(reloaded_execution, :large_value_alert)
{:ok, "🚨"}
reloaded_execution = Journey.set_value(reloaded_execution, :x, 1); :ok
:ok
Journey.get_value(reloaded_execution, :sum, wait_new: true)
{:ok, 3}
Journey.get_value(reloaded_execution, :large_value_alert)
{:error, :not_set}

This basic computation happened with persistence, resiliency, and scalability.

Searching through executions

You can search the database for execution records, querying by specific values, with sorting, limits and pagination:

Journey.list_executions(
  graph_name: graph.name,
  graph_version: graph.version,
  order_by_execution_fields: [:inserted_at],
  filter_by: [{:sum, :gt, 2}, {:x, :lt, 10}],
  offset: 0,
  limit: 10
)
|> Enum.map(fn e ->
  Journey.values(e)
end)
[
  %{sum: 3, y: 2, x: 1, last_updated_at: 1755826416, execution_id: "EXECV9Z60T26H0AY5BT50M6G"},
  %{sum: 3, y: 2, x: 1, last_updated_at: 1755827033, execution_id: "EXECYGV60454G29YD84R2X14"},
  %{sum: 3, y: 2, x: 1, last_updated_at: 1755828575, execution_id: "EXECXDGXZ01MT5915MJD984L"},
  %{sum: 3, y: 2, x: 1, last_updated_at: 1755830344, execution_id: "EXECAE6861D337HRL9AY4J5Z"},
  %{sum: 3, y: 2, x: 1, last_updated_at: 1755831312, execution_id: "EXEC03G5YMTV1TDVT75DTE9M"},
  %{sum: 3, y: 2, x: 1, last_updated_at: 1755833269, execution_id: "EXECLG0R2MD2JBARMVRE380L"}
]

System Status: health, stats

You can find the stats of the system: the executions what graphs are running, how many are there, and what is happening, generally.

This is a general “high level stats and health check”.

System stats, as a human friendly text:

Journey.Insights.Status.status() |> Journey.Insights.Status.to_text() |> IO.puts()
System Status: HEALTHY
Database: Connected
================================================================================

GRAPHS (1 total):
----------
Name: 'g1'
Version: 'v1'
Executions:
- active: 6
- archived: 0
First activity: 2025-08-22T03:27:24Z
Last activity: 2025-08-22T03:27:49Z
Computations:
✓ success: 25
◯ not_set: 7
:ok

System stats, as a code-friendly data structure:

Journey.Insights.Status.status()
%{
  status: :healthy,
  graphs: [
    %{
      stats: %{
        computations: %{
          by_state: %{abandoned: 0, cancelled: 0, success: 25, failed: 0, not_set: 7, computing: 0},
          most_recently_created: "2025-08-22T03:27:49Z",
          most_recently_updated: "2025-08-22T03:27:49Z"
        },
        executions: %{
          active: 6,
          archived: 0,
          most_recently_created: "2025-08-22T03:27:24Z",
          most_recently_updated: "2025-08-22T03:27:49Z"
        }
      },
      graph_name: "g1",
      graph_version: "v1"
    }
  ],
  database_connected: true
}

Flow Analytics: see user’s progression

Since every data-setting operation touches the execution, Journey can provide stats on what is happening in the system. What percentage of users reached :x, what percentage of users reached :y, etc.

In a way, this is not unlike funnel analytics for the application defined by the graph.

“Flow Analytics” as a human-friendly text.

Journey.Insights.FlowAnalytics.flow_analytics(graph.name, graph.version) 
|> Journey.Insights.FlowAnalytics.to_text()
|> IO.puts()
Graph: 'g1'
Version: 'v1'
Analyzed at: 2025-08-22T03:28:01.343599Z

EXECUTION STATS:
----------
Total executions: 6
Average duration: 8 seconds
Median duration: 1 second

NODE STATS (3 nodes):
----------
Node Name: 'sum'
Type: compute
Reached by: 6 executions (100.0%)
Average time to reach: 8 seconds
Flow ends here: 0 executions (0.0% of all, 0.0% of reached)

Node Name: 'x'
Type: input
Reached by: 6 executions (100.0%)
Average time to reach: 8 seconds
Flow ends here: 0 executions (0.0% of all, 0.0% of reached)

Node Name: 'y'
Type: input
Reached by: 6 executions (100.0%)
Average time to reach: 3 seconds
Flow ends here: 0 executions (0.0% of all, 0.0% of reached)
:ok

“Flow Analytics” as a code-friendly datastructure.

# get some analytics for the executions flowing through the system: how many, what does the funnel look like
Journey.Insights.FlowAnalytics.flow_analytics(graph.name, graph.version)
%{
  graph_name: "g1",
  graph_version: "v1",
  node_stats: %{
    nodes: [
      %{
        node_type: :compute,
        node_name: :sum,
        reached_count: 6,
        reached_percentage: 100.0,
        average_time_to_reach: 8,
        flow_ends_here_count: 0,
        flow_ends_here_percentage_of_all: 0.0,
        flow_ends_here_percentage_of_reached: 0.0
      },
      %{
        node_type: :input,
        node_name: :x,
        reached_count: 6,
        reached_percentage: 100.0,
        average_time_to_reach: 8,
        flow_ends_here_count: 0,
        flow_ends_here_percentage_of_all: 0.0,
        flow_ends_here_percentage_of_reached: 0.0
      },
      %{
        node_type: :input,
        node_name: :y,
        reached_count: 6,
        reached_percentage: 100.0,
        average_time_to_reach: 3,
        flow_ends_here_count: 0,
        flow_ends_here_percentage_of_all: 0.0,
        flow_ends_here_percentage_of_reached: 0.0
      }
    ]
  },
  analyzed_at: "2025-08-22T03:28:04.202091Z",
  executions: %{
    count: 6,
    duration_median_seconds_to_last_update: 1,
    duration_avg_seconds_to_last_update: 8
  }
}

In summary

This showed:

  • an application defined as a graph + business logic (the function attached to compute nodes),
  • an execution of the flow take place, step by step,
  • an execution of the flow be interrupted and resumed, as if nothing happened,
  • analytics describing the “funnel” of executions of your graph,

Behind the scenes (not visible in this simple example):

  • computations were subject to a retry policy and retries in case of failures,
  • computations scale seamlessly: they run on any replica of your application.

This all happened without application data getting shipped to a third party, or requiring a SAAS dependency.

See Journey documentation for examples of more complex applications (e.g. a Horoscope app, or a Credit Card Application flow, which includes Mutations, and one-time and recurring Scheduled events).