Engines in Elixir
Section
> NOTE This README is a livebook, so you can run it! > watch out for the terminal output!
git clone https://github.com/anoma/engine.git
cd engine
mix deps.get
make livebook
Mix.install([
{:engine_system, path: "."}
])
Introduction
This library is a work-in-progress implementation of the Engine Model in Elixir, following the formal specification described in Dynamic Effective Timed Communication Systems and Mailbox-as-Actors (under review).
An engine is an actor-like entity that enables type-safe message passing and effectful actions through guarded actions. These actions can modify both the engine’s state and its environment.
Among other things, this library implements the Engine Model with a DSL and runtime system. The DSL lets you easily define engines that follow the formal specification, including configuration, state, message handling, and behaviours. The runtime system manages engine lifecycles, message passing, monitoring, and introspection.
Prerequisites & Installation
Prerequisites
- Elixir 1.18.0 or higher
- Erlang/OTP 28 or higher
# Check your Elixir version
System.version()
def deps do
[
{:engine_system, "~> 0.1.0"}
]
end
Installation
The package can be installed by adding engine_system
to your list of
dependencies in mix.exs
:
The Engine Model
A complete implementation of the actor model with explicit mailbox-as-actors separation, based on the formal specifications described in the research paper. This system implements the core innovation of promoting mailboxes to first-class processing engines that receive messages but verify message writing using linked processing engines.
DSL for Engine Definition
The DSL features compile-time validation, clean simplified syntax, and unified import. Let’s start the system and explore some examples:
Import the EngineSystem DSL and utilities
use EngineSystem
Start the EngineSystem application
{:ok, _} = EngineSystem.start()
Simple Echo Engine
Let’s start with a basic echo engine:
defengine SimpleEcho do
version "1.0.0"
mode :process
interface do
message :echo, text: :string
message :ping
end
behaviour do
on_message :echo, msg, _config, _env, sender do
IO.puts(IO.ANSI.blue() <> "HoTT is better than Cold" <> IO.ANSI.reset())
{:ok, [{:send, sender, {:echo, msg}}]}
end
on_message :ping, _msg, _config, _env, sender do
{:ok, [{:send, sender, :pong}]}
end
end
end
SimpleEcho.__engine_spec_
Now let’s spawn an instance and test it:
Spawn the echo engine
{:ok, echo_address} = EngineSystem.spawn_engine(SimpleEcho)
IO.puts("Echo engine spawned at: #{inspect(echo_address)}")
echo_address
send_message(echo_address, {:echo, %{text: "Hello Engine System!"}})
And if you opened the livebook in a terminal, you can see the output/trace of the message passing.
send_message(echo_address, {:ping, %{}})
EngineSystem.get_system_info()
Stateless Calculator Example
Here’s an example of a stateless processing engine:
defengine StatelessCalculator do
version "1.0.0"
mode :process
interface do
message :add, a: :number, b: :number
message :multiply, a: :number, b: :number
message :divide, a: :number, b: :number
message :result, value: :number
end
behaviour do
on_message :add, msg, _config, _env, sender do
{a, b} = {msg[:a], msg[:b]}
{:ok, [{:send, sender, {:result, a + b}}]}
end
on_message :multiply, msg, _config, _env, sender do
{a, b} = {msg[:a], msg[:b]}
{:ok, [{:send, sender, {:result, a * b}}]}
end
on_message :divide, msg, _config, _env, sender do
{a, b} = {msg[:a], msg[:b]}
if b != 0 do
{:ok, [{:send, sender, {:result, a / b}}]}
else
{:ok, [{:send, sender, {:error, :division_by_zero}}]}
end
end
end
end
Let’s test the calculator. Spawn the calculator:
{:ok, calc_address} = spawn_engine(StatelessCalculator)
send_message(calc_address, {:add, %{a: 10, b: 5}})
send_message(calc_address, {:multiply, %{a: 7, b: 8}})
send_message(calc_address, {:divide, %{a: 15, b: 3}})
send_message(calc_address, {:divide, %{a: 10, b: 0}}) # This should return an error
Stateful Counter Engine
Now let’s create an engine with configuration and environment (state):
defengine SimpleCounter do
version "1.0.0"
mode :process
config do
%{max_count: 100, step: 1}
end
env do
%{count: 0, total_operations: 0}
end
interface do
message(:increment)
message(:decrement)
message(:get_count)
message(:reset)
message(:count_response, value: :integer)
end
behaviour do
on_message :increment, _msg, config, env, sender do
new_count = min(env.count + config.step, config.max_count)
new_env = %{env | count: new_count, total_operations: env.total_operations + 1}
{:ok, [
{:update_environment, new_env},
{:send, sender, {:count_response, new_count}}
]}
end
on_message :decrement, _msg, config, env, sender do
new_count = max(env.count - config.step, 0)
new_env = %{env | count: new_count, total_operations: env.total_operations + 1}
{:ok, [
{:update_environment, new_env},
{:send, sender, {:count_response, new_count}}
]}
end
on_message :get_count, _msg, _config, env, sender do
{:ok, [{:send, sender, {:count_response, env.count}}]}
end
on_message :reset, _msg, _config, _env, sender do
new_env = %{count: 0, total_operations: 0}
{:ok, [
{:update_environment, new_env},
{:send, sender, {:count_response, 0}}
]}
end
end
end
Let’s test the counter. Spawn the counter with default config:
{:ok, counter_address} = spawn_engine(SimpleCounter)
Test counter operations
send_message(counter_address, {:get_count, %{}})
send_message(counter_address, {:increment, %{}})
send_message(counter_address, {:increment, %{}})
send_message(counter_address, {:get_count, %{}})
send_message(counter_address, {:decrement, %{}})
send_message(counter_address, {:get_count, %{}})
{:ok, counter_instance} = EngineSystem.lookup_instance({1,2})
state = EngineSystem.Engine.Instance.get_state(counter_instance.engine_pid)
Advanced Key-Value Store Example
Here’s a more complex example with configuration and sophisticated state management:
defengine MyKVStore do
version "2.0.0"
mode :process
interface do
message :get, key: :atom
message :put, key: :atom, value: :any
message :delete, key: :atom
message :list_keys
message :clear
message :size
message :result, value: {:option, :any}
end
config do
%{
access_mode: :read_write,
max_size: 1000,
timeout: 30.5,
retries_enabled: true
}
end
env do
%{
store: %{},
access_counts: %{},
last_accessed: nil,
active_connections: 0
}
end
behaviour do
on_message :get, msg, _config, env, sender do
key = msg[:key]
value = Map.get(env.store, key, :not_found)
new_counts = Map.update(env.access_counts, key, 1, &(&1 + 1))
new_env = %{env | access_counts: new_counts, last_accessed: key}
{:ok, [
{:update_environment, new_env},
{:send, sender, {:result, value}}
]}
end
on_message :put, msg, config, env, sender do
{key, value} = {msg[:key], msg[:value]}
if map_size(env.store) >= config.max_size and not Map.has_key?(env.store, key) do
{:ok, [{:send, sender, {:error, :store_full}}]}
else
new_store = Map.put(env.store, key, value)
new_env = %{env | store: new_store, last_accessed: key}
{:ok, [
{:update_environment, new_env},
{:send, sender, :ack}
]}
end
end
on_message :delete, msg, _config, env, sender do
key = msg[:key]
new_store = Map.delete(env.store, key)
new_counts = Map.delete(env.access_counts, key)
new_env = %{env | store: new_store, access_counts: new_counts}
{:ok, [
{:update_environment, new_env},
{:send, sender, :ack}
]}
end
on_message :list_keys, _msg, _config, env, sender do
keys = Map.keys(env.store)
{:ok, [{:send, sender, {:result, keys}}]}
end
on_message :clear, _msg, _config, _env, sender do
new_env = %{store: %{}, access_counts: %{}, last_accessed: nil, active_connections: 0}
{:ok, [
{:update_environment, new_env},
{:send, sender, :ack}
]}
end
on_message :size, _msg, _config, env, sender do
size = map_size(env.store)
{:ok, [{:send, sender, {:result, size}}]}
end
end
end
Let’s test the KV store. Spawn with custom configuration:
custom_config = %{access_mode: :read_write, max_size: 5}
{:ok, kv_address} = spawn_engine(MyKVStore, custom_config)
send_message(kv_address, {:put, %{key: :name, value: "Engine System"}})
send_message(kv_address, {:put, %{key: :version, value: "2.0.0"}})
send_message(kv_address, {:put, %{key: :language, value: "Elixir"}})
send_message(kv_address, {:get, %{key: :name}})
send_message(kv_address, {:list_keys, %{}})
send_message(kv_address, {:size, %{}})
IO.puts("KV Store tests sent!")
System Management
Let’s explore the basic system management capabilities:
List all running instances
instances = EngineSystem.list_instances()
for instance <- instances do
{node_id,engine_id} = instance.address
{engine_name, eng_version} = instance.spec_key
IO.puts(" node_id=#{inspect(node_id)}, engine_id=#{inspect(engine_id)} - #{engine_name}-#{eng_version}")
end
Get comprehensive system information
system_info = EngineSystem.get_system_info()
IO.puts("System Information:")
IO.puts(" Running instances: #{system_info.running_instances}")
IO.puts(" Registered specs: #{system_info.total_specs}")
Advanced System Management
Look up engine by address
case EngineSystem.lookup_instance(kv_address) do
{:ok, info} ->
IO.puts("Engine found:")
{engine_name, eng_version} = info.spec_key
IO.puts(" Name: #{engine_name}")
IO.puts(" Version: #{eng_version}")
IO.puts(" Status: #{info.status}")
IO.puts(" Address: #{inspect(info.address)}")
error ->
IO.puts("Error looking up engine: #{inspect(error)}")
end
Spawn with a name for easy lookup
{:ok, named_address} = spawn_engine(SimpleCounter, %{max_count: 50}, %{}, :my_counter)
Look up by name
case EngineSystem.lookup_address_by_name(:my_counter) do
{:ok, address} ->
IO.puts("Found named engine at: #{inspect(address)}")
send_message(address, {:increment, %{}})
send_message(address, {:get_count, %{}})
error ->
IO.puts("Error finding named engine: #{inspect(error)}")
end
Interface Utilities
The system provides utilities for introspecting engine interfaces:
Check if engine supports a message
if EngineSystem.has_message?(:MyKVStore, "2.0.0", :get) do
IO.puts("MyKVStore supports :get message")
end
Get message fields for a specific message
case EngineSystem.get_message_fields(:MyKVStore, "2.0.0", :put) do
{:ok, fields} ->
IO.puts("Fields for :put message: #{inspect(fields)}")
error ->
IO.puts("Error getting fields: #{inspect(error)}")
end
Get all supported message tags
tags = EngineSystem.get_message_tags(:MyKVStore, "2.0.0")
IO.puts("MyKVStore supports messages: #{inspect(tags)}")
instance_tags = EngineSystem.get_instance_message_tags(kv_address)
IO.puts("Running instance supports: #{inspect(instance_tags)}")
Message Validation
The system provides message validation before sending:
Validate a message before sending
case EngineSystem.validate_message(kv_address, {:get, %{key: :test}}) do
:ok ->
IO.puts("Message is valid!")
send_message(kv_address, {:get, %{key: :test}})
{:error, reason} ->
IO.puts("Message validation failed: #{inspect(reason)}")
end
Try an invalid message
case EngineSystem.validate_message(kv_address, {:invalid_message, %{}}) do
:ok ->
IO.puts("Message is valid!")
{:error, reason} ->
IO.puts("Expected validation failure: #{inspect(reason)}")
end
Custom Mailbox Engines
You can create custom mailbox engines for specialized message handling:
defengine PriorityMailbox do
version "1.0.0"
mode :mailbox # This is a mailbox engine
interface do
message :high_priority, content: :any
message :normal_priority, content: :any
message :low_priority, content: :any
end
env do
%{
high_queue: :queue.new(),
normal_queue: :queue.new(),
low_queue: :queue.new()
}
end
behaviour do
# Mailbox engines handle message queuing differently
on_message :high_priority, payload, _config, env, _sender do
new_high_queue = :queue.in(payload, env.high_queue)
new_env = %{env | high_queue: new_high_queue}
{:ok, [{:update_environment, new_env}]}
end
on_message :normal_priority, payload, _config, env, _sender do
new_normal_queue = :queue.in(payload, env.normal_queue)
new_env = %{env | normal_queue: new_normal_queue}
{:ok, [{:update_environment, new_env}]}
end
on_message :low_priority, payload, _config, env, _sender do
new_low_queue = :queue.in(payload, env.low_queue)
new_env = %{env | low_queue: new_low_queue}
{:ok, [{:update_environment, new_env}]}
end
end
end