Ximula
Mix.install([
{:kino, "~> 0.18.0", only: [:dev]},
{:ximula, path: Path.join(__DIR__, ".."), env: :dev}
])
Overview
This Livebook demonstrates the Ximula.Sim framework - a composable, pipeline-based simulation library for Elixir. The framework separates simulation logic (pure functions) from execution strategy (parallel/sequential) and provides observability through Telemetry and PubSub.
graph TB
Supervisor[Supervisor
strategy: :rest_for_one]
Supervisor --> Agent[Agent: MyWorld
holds grid data]
Supervisor --> GK[Gatekeeper
locked access wrapper]
Supervisor --> PubSub[Phoenix.PubSub
event broadcasting]
Supervisor --> LoopTaskSup[Task.Supervisor
loop tasks]
Supervisor --> RunnerTaskSup[Task.Supervisor
simulation tasks]
Supervisor --> Loop[Loop GenServer
queue orchestration]
style Agent fill:#90EE90
style GK fill:#90EE90
style Loop fill:#FFB6C1
style PubSub fill:#87CEEB
style LoopTaskSup fill:#FFD700
style RunnerTaskSup fill:#FFD700
Architecture
- Gatekeeper: Holds world state (Grid), provides locked concurrent access
- Loop: GenServer that orchestrates queues, manages timers and tasks
- Queue: Defines when and what to execute (interval + filter function)
- Pipeline: Sequence of stages (e.g., vegetation → population → movement)
- Stage: Uses an adapter (Single, Grid, Gatekeeper) to process entities
- Steps: Pure simulation functions that return changes
Three Levels of Execution
Queue / Loop (Timing & Orchestration)
- Purpose: Schedules when to run, filters what to process
- Properties: name, interval, func (filter + pipeline execution)
- Implementation: GenServer manages queue lifecycle
- Execution: Each queue runs its filter → pipeline in supervised task
-
API:
Loop.add_queue(loop, queue),Loop.start_sim(loop) -
Example: Urgent queue runs every 100ms, processes fields where
urgent == true - State: Ephemeral (queues can be rebuilt on restart)
Pipeline / Stage (Coordination & Execution Strategy)
- Purpose: Defines what runs and how (sequential stages)
- Properties: stage sequence, stage adapter, notification config
-
Execution:
-
Calls
TaskRunnerfor parallel execution - Adapters handle data shape (Grid, Single, Gatekeeper)
- Gatekeeper adapter locks positions, reads/writes through Gatekeeper
-
Calls
-
API:
Pipeline.execute(pipeline, state) - Example: Stage 1: grow_crops → Stage 2: consume_food
- State: Static (built once, stored as module functions or config)
Simulation / Step (Pure Logic)
- Purpose: Implements game/simulation rules
- Properties: Pure functions, no side effects
-
Signature:
def step(%Change{data, changes}, opts) -
Returns:
%Change{changes: %{key: value}} -
API:
Vegetation.grow_crops(change) - Example: Calculate crop growth based on water + soil
- Events: Emit telemetry/pubsub events for significant changes
Task Squence
sequenceDiagram
autonumber
participant GS as Loop
GenServer
loop every interval
activate GS
GS->>GS: tick
par each queue
create participant Q as Queue Task
GS->>Q: Queue.execute
Q-->>N: queue started
activate Q
Q->>Q: Pipeline.execute
Q-->>N: pipeline started
activate Q
Q->>Q: execute_stage
Q-->>N: stage started
par each entity
activate Q
create participant S as Step Task
Q->>S: execute_steps
S-->>N: entity started
activate S
S->>A: get_data
activate S
S-->>N: step started
S->>S: execute_step
S-->>N: step completed
deactivate S
activate S
S->>S: execute_step
deactivate S
activate S
S->>S: execute_step
deactivate S
S->>A: put_data
deactivate S
S-->>N: entity completed
destroy S
S-->>Q: {:ok, aggregate}
deactivate Q
end
Q-->>N: stage completed
deactivate Q
Q-->>N: pipeline completed
end
deactivate Q
Q-->>N: queue completed
destroy Q
Q-->>GS: handle_info({ref, result}, state)
deactivate GS
end
N-->>GS: handle_telemetry(queue_completed)
participant N as PubSub
Telemetry
participant A as Gatekeeper
Data Agent
Setup
defmodule MySimulation do
use Ximula.Sim
@moduledoc """
A simple vegetation growth simulation demonstrating Ximula.Sim framework.
The simulation:
- Creates a 2D torus (wrapping grid) world
- Each field has vegetation that grows by 1 each tick
- Uses Gatekeeper for locked concurrent access
- Demonstrates telemetry and PubSub events
"""
alias Phoenix.PubSub
alias Ximula.{Grid, Torus}
alias Ximula.Gatekeeper.Agent, as: Gatekeeper
alias Ximula.Sim.{Change, Loop}
@doc """
Starts the simulation supervision tree.
Uses :rest_for_one strategy - if Gatekeeper crashes, everything restarts.
If Loop crashes, Gatekeeper (data) survives.
"""
def start do
children = [
# Data layer - if this crashes, everything downstream is invalid
Gatekeeper.agent_spec(MyWorld, data: nil, name: :world),
{Ximula.Gatekeeper.Agent, name: :gatekeeper, agent: :world},
# Infrastructure - depends on data layer being healthy
{PubSub, name: :my_pubsub},
{Task.Supervisor, name: :loop_task_supervisor},
{Task.Supervisor, name: :task_runner_supervisor},
# Orchestration - depends on all infrastructure
{Loop, name: :my_sim_loop, supervisor: :loop_task_supervisor}
]
Supervisor.start_link(children, strategy: :rest_for_one, name: :my_simulation)
end
def create_world(size) do
world = Torus.create(size, size, fn _x, _y ->
%{
vegetation: 100
}
end)
Gatekeeper.direct_set(:gatekeeper, fn _ -> world end)
end
def positions(gatekeeper) do
Gatekeeper.get(gatekeeper, &Grid.positions(&1))
end
def get_field(position, gatekeeper) do
field = Gatekeeper.lock(gatekeeper, position, &Grid.get(&1, position))
Map.merge(field, %{position: position})
end
def put_field(%{position: position, vegetation: vegetation}, gatekeeper) do
:ok = Gatekeeper.update(gatekeeper, position, vegetation, &Grid.put(&1, position, %{vegetation: vegetation}))
position
end
def notify_filter(%Change{} = change) do
Change.get(change, :position) == {0,0}
end
def notify_filter(field) do
field.position == {0,0}
end
simulation do
default(gatekeeper: :gatekeeper, pubsub: :my_pubsub)
pipeline(:test) do
notify(:event_metric)
stage :flora_fauna, :gatekeeper do
notify_all(:event_metric)
notify_entity(:event_metric, &MySimulation.notify_filter/1)
read_fun(&MySimulation.get_field/2)
write_fun(&MySimulation.put_field/2)
step(MySimulation, :sim_vegetation, notify: {:event_metric, &MySimulation.notify_filter/1})
#step(MySimulation, :sim_herbivore)
#step(MySimulation, :sim_predator)
end
#stage :movement, :single do
# notify_all(:metric)
# notify_entity(:event_metric, &MySimulation.notify_filter/1)
# step(MySimulation, :sim_movement)
# step(MySimulation, :sim_crash, notify: {:event, &MySimulation.notify_filter/1})
#end
end
queue :normal do
run_pipeline(:test, supervisor: :task_runner_supervisor) do
MySimulation.positions(:gatekeeper)
end
end
end
def sim_vegetation(%Change{data: %{vegetation: _vegetation}} = change) do
Change.change_by(change, :vegetation, 1)
end
end
Start SimServer
MySimulation.start()
MySimulation.create_world(2)
Agent.get(:world, & &1)
queues = MySimulation.build_queues
Ximula.Sim.Loop.add_queues(:my_sim_loop, queues)
Telemetry
import Kino.Shorts
{:ok, data_agent} = Agent.start_link(fn -> [] end)
frame = Kino.Frame.new() |> Kino.render()
metric_table = Kino.DataTable.new([], keys: [:id, :time, :event, :duration])
Kino.Frame.render(frame, metric_table)
defmodule TelemetryTable do
def handle_event(event, measurements, _meta, %{table: metric_table, agent: agent}) do
data = Agent.get(agent, & &1)
id = Map.get(measurements, :monotonic_time)
data = [%{
id: id,
time: DateTime.utc_now(),
event: event,
duration: Map.get(measurements, :duration, 0) |> System.convert_time_unit(:native, :microsecond)
} | data]
Agent.update(agent, fn _ -> data end)
Kino.DataTable.update(metric_table, data)
end
end
:ok =
:telemetry.attach_many(
"ximula-metrics-#{:erlang.unique_integer()}",
[
[:ximula, :sim, :queue, :start],
[:ximula, :sim, :queue, :stop],
[:ximula, :sim, :pipeline, :start],
[:ximula, :sim, :pipeline, :stop],
[:ximula, :sim, :pipeline, :stage, :start],
[:ximula, :sim, :pipeline, :stage, :stop],
[:ximula, :sim, :pipeline, :stage, :entity, :start],
[:ximula, :sim, :pipeline, :stage, :entity, :stop],
[:ximula, :sim, :pipeline, :stage, :step, :start],
[:ximula, :sim, :pipeline, :stage, :step, :stop]
],
&TelemetryTable.handle_event/4,
%{table: metric_table, agent: data_agent}
)
Events
defmodule EventListener do
def start do
Task.start(fn ->
:ok = Phoenix.PubSub.subscribe(:my_pubsub, "sim:pipeline:test")
:ok = Phoenix.PubSub.subscribe(:my_pubsub, "sim:pipeline:stage:flora_fauna")
:ok = Phoenix.PubSub.subscribe(:my_pubsub, "sim:pipeline:stage:flora_fauna:entity")
listen_loop()
end)
end
def listen_loop() do
receive do
msg -> IO.inspect(msg, label: "Received message")
after
1000 -> :ok
end
listen_loop()
end
end
EventListener.start()
Run Simulation
Ximula.Sim.Loop.start_sim(:my_sim_loop)
Process.sleep(5_000)
Ximula.Sim.Loop.stop_sim(:my_sim_loop)
Ximula.Gatekeeper.Agent.get(:gatekeeper, fn world -> Ximula.Grid.get(world, {1, 1}) end)