Powered by AppSignal & Oban Pro

Ximula

livebooks/ximula.livemd

Ximula

Mix.install([
  {:kino, "~> 0.18.0", only: [:dev]},
  {:ximula, path: Path.join(__DIR__, ".."), env: :dev}
])

Overview

This Livebook demonstrates the Ximula.Sim framework - a composable, pipeline-based simulation library for Elixir. The framework separates simulation logic (pure functions) from execution strategy (parallel/sequential) and provides observability through Telemetry and PubSub.

graph TB
    Supervisor[Supervisor
strategy: :rest_for_one] Supervisor --> Agent[Agent: MyWorld
holds grid data] Supervisor --> GK[Gatekeeper
locked access wrapper] Supervisor --> PubSub[Phoenix.PubSub
event broadcasting] Supervisor --> LoopTaskSup[Task.Supervisor
loop tasks] Supervisor --> RunnerTaskSup[Task.Supervisor
simulation tasks] Supervisor --> Loop[Loop GenServer
queue orchestration] style Agent fill:#90EE90 style GK fill:#90EE90 style Loop fill:#FFB6C1 style PubSub fill:#87CEEB style LoopTaskSup fill:#FFD700 style RunnerTaskSup fill:#FFD700

Architecture

  • Gatekeeper: Holds world state (Grid), provides locked concurrent access
  • Loop: GenServer that orchestrates queues, manages timers and tasks
  • Queue: Defines when and what to execute (interval + filter function)
  • Pipeline: Sequence of stages (e.g., vegetation → population → movement)
  • Stage: Uses an adapter (Single, Grid, Gatekeeper) to process entities
  • Steps: Pure simulation functions that return changes

Three Levels of Execution

Queue / Loop (Timing & Orchestration)

  • Purpose: Schedules when to run, filters what to process
  • Properties: name, interval, func (filter + pipeline execution)
  • Implementation: GenServer manages queue lifecycle
  • Execution: Each queue runs its filter → pipeline in supervised task
  • API: Loop.add_queue(loop, queue), Loop.start_sim(loop)
  • Example: Urgent queue runs every 100ms, processes fields where urgent == true
  • State: Ephemeral (queues can be rebuilt on restart)

Pipeline / Stage (Coordination & Execution Strategy)

  • Purpose: Defines what runs and how (sequential stages)
  • Properties: stage sequence, stage adapter, notification config
  • Execution:
    • Calls TaskRunner for parallel execution
    • Adapters handle data shape (Grid, Single, Gatekeeper)
    • Gatekeeper adapter locks positions, reads/writes through Gatekeeper
  • API: Pipeline.execute(pipeline, state)
  • Example: Stage 1: grow_crops → Stage 2: consume_food
  • State: Static (built once, stored as module functions or config)

Simulation / Step (Pure Logic)

  • Purpose: Implements game/simulation rules
  • Properties: Pure functions, no side effects
  • Signature: def step(%Change{data, changes}, opts)
  • Returns: %Change{changes: %{key: value}}
  • API: Vegetation.grow_crops(change)
  • Example: Calculate crop growth based on water + soil
  • Events: Emit telemetry/pubsub events for significant changes

Task Squence

sequenceDiagram
  autonumber
  participant GS as Loop
GenServer loop every interval activate GS GS->>GS: tick par each queue create participant Q as Queue Task GS->>Q: Queue.execute Q-->>N: queue started activate Q Q->>Q: Pipeline.execute Q-->>N: pipeline started activate Q Q->>Q: execute_stage Q-->>N: stage started par each entity activate Q create participant S as Step Task Q->>S: execute_steps S-->>N: entity started activate S S->>A: get_data activate S S-->>N: step started S->>S: execute_step S-->>N: step completed deactivate S activate S S->>S: execute_step deactivate S activate S S->>S: execute_step deactivate S S->>A: put_data deactivate S S-->>N: entity completed destroy S S-->>Q: {:ok, aggregate} deactivate Q end Q-->>N: stage completed deactivate Q Q-->>N: pipeline completed end deactivate Q Q-->>N: queue completed destroy Q Q-->>GS: handle_info({ref, result}, state) deactivate GS end N-->>GS: handle_telemetry(queue_completed) participant N as PubSub
Telemetry participant A as Gatekeeper
Data Agent

Setup

defmodule MySimulation do
  use Ximula.Sim

  @moduledoc """
  A simple vegetation growth simulation demonstrating Ximula.Sim framework.
  
  The simulation:
  - Creates a 2D torus (wrapping grid) world
  - Each field has vegetation that grows by 1 each tick
  - Uses Gatekeeper for locked concurrent access
  - Demonstrates telemetry and PubSub events
  """
  alias Phoenix.PubSub
  
  alias Ximula.{Grid, Torus}
  alias Ximula.Gatekeeper.Agent, as: Gatekeeper
  alias Ximula.Sim.{Change, Loop}
  
  @doc """
  Starts the simulation supervision tree.
  
  Uses :rest_for_one strategy - if Gatekeeper crashes, everything restarts.
  If Loop crashes, Gatekeeper (data) survives.
  """
  def start do
    children = [
      # Data layer - if this crashes, everything downstream is invalid
      Gatekeeper.agent_spec(MyWorld, data: nil, name: :world),
      {Ximula.Gatekeeper.Agent, name: :gatekeeper, agent: :world},

      # Infrastructure - depends on data layer being healthy
      {PubSub, name: :my_pubsub},
      {Task.Supervisor, name: :loop_task_supervisor},
      {Task.Supervisor, name: :task_runner_supervisor},

      # Orchestration - depends on all infrastructure
      {Loop, name: :my_sim_loop, supervisor: :loop_task_supervisor}
    ]
    Supervisor.start_link(children, strategy: :rest_for_one, name: :my_simulation)
  end
  
  def create_world(size) do
    world = Torus.create(size, size, fn _x, _y ->
      %{
        vegetation: 100
      }
    end)
    Gatekeeper.direct_set(:gatekeeper, fn _ -> world end)
  end

  def positions(gatekeeper) do
    Gatekeeper.get(gatekeeper, &Grid.positions(&1))
  end

  def get_field(position, gatekeeper) do
    field = Gatekeeper.lock(gatekeeper, position, &Grid.get(&1, position))
    Map.merge(field, %{position: position})
  end

  def put_field(%{position: position, vegetation: vegetation}, gatekeeper) do
    :ok = Gatekeeper.update(gatekeeper, position, vegetation, &Grid.put(&1, position, %{vegetation: vegetation}))
    position
  end

  def notify_filter(%Change{} = change) do
    Change.get(change, :position) == {0,0}
  end

  def notify_filter(field) do
    field.position == {0,0}
  end

  simulation do
    default(gatekeeper: :gatekeeper, pubsub: :my_pubsub)
    
    pipeline(:test) do
      notify(:event_metric)

      stage :flora_fauna, :gatekeeper do
        notify_all(:event_metric)
        notify_entity(:event_metric, &MySimulation.notify_filter/1)
        read_fun(&MySimulation.get_field/2)
        write_fun(&MySimulation.put_field/2)
        step(MySimulation, :sim_vegetation, notify: {:event_metric, &MySimulation.notify_filter/1})
        #step(MySimulation, :sim_herbivore)
        #step(MySimulation, :sim_predator)
      end

      #stage :movement, :single do
      #  notify_all(:metric)
      #  notify_entity(:event_metric, &MySimulation.notify_filter/1)
      #  step(MySimulation, :sim_movement)
      #  step(MySimulation, :sim_crash, notify: {:event, &MySimulation.notify_filter/1})
      #end
    end

    queue :normal do
      run_pipeline(:test, supervisor: :task_runner_supervisor) do
        MySimulation.positions(:gatekeeper)
      end
    end

  end

  def sim_vegetation(%Change{data: %{vegetation: _vegetation}} = change) do
    Change.change_by(change, :vegetation, 1)
  end
  
end

Start SimServer

MySimulation.start()
MySimulation.create_world(2)
Agent.get(:world, & &1)
queues = MySimulation.build_queues
Ximula.Sim.Loop.add_queues(:my_sim_loop, queues)

Telemetry

import Kino.Shorts

{:ok, data_agent} = Agent.start_link(fn -> [] end)
frame = Kino.Frame.new() |> Kino.render()
metric_table = Kino.DataTable.new([], keys: [:id, :time, :event, :duration])
Kino.Frame.render(frame, metric_table)

defmodule TelemetryTable do
  def handle_event(event, measurements, _meta, %{table: metric_table, agent: agent}) do
    data = Agent.get(agent, & &1)
    id = Map.get(measurements, :monotonic_time)
    data = [%{
      id: id, 
      time: DateTime.utc_now(),
      event: event, 
      duration: Map.get(measurements, :duration, 0) |> System.convert_time_unit(:native, :microsecond)
    } | data]
    Agent.update(agent, fn _ -> data end)
    Kino.DataTable.update(metric_table, data)
  end
end

:ok =
  :telemetry.attach_many(
    "ximula-metrics-#{:erlang.unique_integer()}",
    [
      [:ximula, :sim, :queue, :start],
      [:ximula, :sim, :queue, :stop],
      [:ximula, :sim, :pipeline, :start],
      [:ximula, :sim, :pipeline, :stop],
      [:ximula, :sim, :pipeline, :stage, :start],
      [:ximula, :sim, :pipeline, :stage, :stop],
      [:ximula, :sim, :pipeline, :stage, :entity, :start],
      [:ximula, :sim, :pipeline, :stage, :entity, :stop],
      [:ximula, :sim, :pipeline, :stage, :step, :start],
      [:ximula, :sim, :pipeline, :stage, :step, :stop]
    ],
    &TelemetryTable.handle_event/4,
    %{table: metric_table, agent: data_agent}
  )

      

Events

defmodule EventListener do

  def start do
    Task.start(fn ->
      :ok = Phoenix.PubSub.subscribe(:my_pubsub, "sim:pipeline:test")
      :ok = Phoenix.PubSub.subscribe(:my_pubsub, "sim:pipeline:stage:flora_fauna")
      :ok = Phoenix.PubSub.subscribe(:my_pubsub, "sim:pipeline:stage:flora_fauna:entity")
      listen_loop()
    end)
  end

  def listen_loop() do
    receive do
      msg -> IO.inspect(msg, label: "Received message")
    after
      1000 -> :ok
    end
    listen_loop()
  end

end
EventListener.start()

Run Simulation

Ximula.Sim.Loop.start_sim(:my_sim_loop)
Process.sleep(5_000)
Ximula.Sim.Loop.stop_sim(:my_sim_loop)
Ximula.Gatekeeper.Agent.get(:gatekeeper, fn world -> Ximula.Grid.get(world, {1, 1}) end)