Untitled notebook
Engine System Essentials Tutorial
Mix.install([
{:engine_system, path: "."}
])
Introduction
Welcome to the Engine System tutorial! This interactive notebook will guide you through the essentials of the Engine System library - a powerful implementation of the Engine Model in Elixir.
The Engine System promotes mailboxes to first-class actors and provides a clean separation between processing engines and their mailboxes, following formal specifications. Let’s explore the key concepts step by step.
What is an Engine?
An engine is an actor-like entity that enables:
- Type-safe message passing
- Effectful actions through guarded actions
- State and environment management
- Interface-driven communication
The system implements two types of engines:
- Processing Engines: Handle business logic and computation
- Mailbox Engines: Act as first-class actors for message reception and validation
You find yourself writing more processing engines than mailbox engines.
Important: The Engine System uses asynchronous message passing. When you
send a message, it returns :ok
if the message was queued successfully, but the
actual processing happens asynchronously.
Getting Started
Start the Engine System
use EngineSystem
EngineSystem.start()
Check system information
EngineSystem.get_system_info()
Your First Engine
Let’s start with a simple echo engine to understand the basic DSL:
defengine EchoEngine do
@moduledoc "A simple echo engine for learning"
version("1.0.0")
mode(:process) # This is a processing engine
interface do
message(:echo, content: :any)
message(:ping)
end
behaviour do
on_message :echo, msg_payload, _config, _env, sender do
content = msg_payload[:content] || "Hello from Echo!"
{:ok, [{:send, sender, {:echo_reply, content}}]}
end
on_message :ping, _msg_payload, _config, _env, sender do
{:ok, [{:send, sender, :pong}]}
end
end
end
Spawn an instance of our echo engine
{:ok, echo_address} = spawn_engine(EchoEngine)
Send a message to our echo engine
send_message(echo_address, {:echo, %{content: "Hello Engine System!"}})
Send a ping
result = send_message(echo_address, {:ping, %{}})
Messaging
The Engine System is designed for asynchronous, distributed computing. When you send a message:
- The message is validated against the target engine’s interface
-
If valid, it’s queued for processing (returns
:ok
) - The engine processes the message asynchronously
- Any response effects are executed (like sending replies)
This design enables:
- High concurrency and performance
- Distributed systems with message routing
- Fault tolerance and supervision
Verify our echo engine is working by checking system information
EngineSystem.get_system_info()
List all running instances to see our echo engine
EngineSystem.list_instances()
Check what messages our echo engine supports
EngineSystem.get_instance_message_tags(echo_address)
Engine Anatomy
Every engine definition consists of several key parts:
1. Metadata
-
version/1
: Engine version for compatibility -
mode/1
: Either:process
(business logic) or:mailbox
(message handling)
2. Interface
-
message/1
ormessage/2
: Define what messages the engine can receive - Type specifications for message fields
3. Configuration & Environment
-
config/1
: Static configuration (optional) -
env/1
: Mutable environment state (optional)
4. Behaviour
-
on_message/5
: Message handlers that define what happens when messages arrive
Let’s create a more sophisticated example:
defengine CounterEngine do
@moduledoc "A stateful counter engine"
mode(:process)
interface do
message(:increment)
message(:decrement)
message(:get_count)
message(:add, value: :integer)
message(:reset)
end
config do
%{
max_count: 100,
notifications: true
}
end
env do
%{
counter: 0,
history: []
}
end
behaviour do
on_message :increment, _payload, config, env, sender do
if env.counter < config.max_count do
new_counter = env.counter + 1
new_env = %{env | counter: new_counter, history: [env.counter | env.history]}
response = if config.notifications do
{:count_updated, new_counter}
else
{:ok, new_counter}
end
{:ok, [{:update_environment, new_env}, {:send, sender, response}]}
else
{:ok, [{:send, sender, {:error, :max_count_reached}}]}
end
end
on_message :decrement, _payload, _config, env, sender do
new_counter = max(0, env.counter - 1)
new_env = %{env | counter: new_counter, history: [env.counter | env.history]}
{:ok, [{:update_environment, new_env}, {:send, sender, {:ok, new_counter}}]}
end
on_message :get_count, _payload, _config, env, sender do
{:ok, [{:send, sender, {:count, env.counter}}]}
end
on_message :add, %{value: value}, config, env, sender do
new_counter = env.counter + value
if new_counter <= config.max_count do
new_env = %{env | counter: new_counter}
{:ok, [{:update_environment, new_env}, {:send, sender, {:ok, new_counter}}]}
else
{:ok, [{:send, sender, {:error, :would_exceed_max}}]}
end
end
on_message :reset, _payload, _config, _env, sender do
new_env = %{counter: 0, history: []}
{:ok, [{:update_environment, new_env}, {:send, sender, {:ok, :reset}}]}
end
end
end
Spawn a counter engine
{:ok, counter_address} = spawn_engine(CounterEngine)
Test the counter functionality - all asynchronous
IO.puts("Sending get_count message...")
send_message(counter_address, {:get_count, %{}})
IO.puts("Sending increment message...")
send_message(counter_address, {:increment, %{}})
IO.puts("Sending add message...")
send_message(counter_address, {:add, %{value: 5}})
IO.puts("Sending another get_count message...")
send_message(counter_address, {:get_count, %{}})
Message Validation
The Engine System provides comprehensive message validation:
Validate a message before sending - this works synchronously
validation_result = EngineSystem.validate_message(counter_address, {:increment, %{}})
This should fail validation - unknown message type
validation_result = EngineSystem.validate_message(counter_address, {:invalid_message, %{}})
Check what messages are supported
supported_messages = EngineSystem.get_instance_message_tags(counter_address)
IO.puts("Supported messages: #{inspect(supported_messages)}")
Message Effects and Actions
Engine behaviors return effects that describe what should happen. Common effects include:
-
{:send, address, message}
: Send a message to another engine -
{:update_environment, new_env}
: Update the engine’s environment -
{:spawn, engine_name, engconf, engenv}
: Spawn a new engine with that config and environment. -
{:terminate, reason}
: Terminate the engine
defengine NotifierEngine do
interface do
message(:notify, message: :string, target: :string)
message(:broadcast, message: :string)
end
env do
%{
subscribers: [],
message_count: 0
}
end
behaviour do
on_message :notify, %{message: msg, target: target}, _config, env, sender do
new_env = %{env | message_count: env.message_count + 1}
effects = [
{:update_environment, new_env},
{:send, sender, {:notification_sent, target}},
# In a real system, you'd look up the target address
{:send, sender, {:debug, "Would notify #{target}: #{msg}"}}
]
{:ok, effects}
end
on_message :broadcast, %{message: msg}, _config, env, sender do
new_env = %{env | message_count: env.message_count + 1}
# Create effects for each subscriber
subscriber_effects = Enum.map(env.subscribers, fn sub ->
{:send, sub, {:broadcast_message, msg}}
end)
all_effects = [
{:update_environment, new_env},
{:send, sender, {:broadcast_sent, length(env.subscribers)}}
| subscriber_effects
]
{:ok, all_effects}
end
end
end
System Registry and Discovery
The Engine System provides powerful registry and discovery features:
# List all registered engine specifications
EngineSystem.list_specs()
# List all running engine instances
EngineSystem.list_instances()
# Look up information about a specific instance
EngineSystem.lookup_instance(counter_address)
Part 7: Named Engines and Communication
You can spawn engines with names for easier reference:
# Spawn a named engine
{:ok, named_counter} = spawn_engine(CounterEngine, %{}, %{}, :JonaCounter)
# Look up by name
EngineSystem.lookup_address_by_name(:JonaCounter)
# Send message to named engine
send_message(named_counter, {:increment, %{}})
send_message(named_counter, {:get_count, %{}})
Sending a message with the wrong format should fail. The contract checker should in principle fails.
# send_message(named_counter, {:incrxxxement, %{}})
Custom Mailbox Engines
Advanced feature: You can create custom mailbox engines for specialized message handling:
defengine PriorityMailbox do
@moduledoc "A mailbox that prioritizes certain messages"
version("1.0.0")
mode(:mailbox) # This is a mailbox engine
interface do
message(:high_priority, content: :any)
message(:normal_priority, content: :any)
message(:low_priority, content: :any)
end
env do
%{
high_queue: :queue.new(),
normal_queue: :queue.new(),
low_queue: :queue.new()
}
end
behaviour do
# Mailbox engines handle message queuing differently
on_message :high_priority, payload, _config, env, _sender do
new_high_queue = :queue.in(payload, env.high_queue)
new_env = %{env | high_queue: new_high_queue}
{:ok, [{:update_environment, new_env}]}
end
on_message :normal_priority, payload, _config, env, _sender do
new_normal_queue = :queue.in(payload, env.normal_queue)
new_env = %{env | normal_queue: new_normal_queue}
{:ok, [{:update_environment, new_env}]}
end
end
end
Guidelines
1. Message Interface Design
- Keep interfaces focused and cohesive
- Use descriptive message names
- Specify field types for validation
2. State Management
- Use environment for mutable state
- Use configuration for immutable settings
- Keep state minimal and well-structured
3. Error Handling
- Always validate inputs in message handlers
- Return appropriate error messages
- Use guard clauses for preconditions
4. Asynchronous Patterns
- Design for asynchronous processing
- Don’t expect immediate responses
- Use message effects for complex workflows
Advanced Features
Engine Lifecycle Management
Get system status
EngineSystem.get_system_info()
Clean up terminated engines
EngineSystem.clean_terminated_engines()
Terminate a specific engine
EngineSystem.terminate_engine(named_counter)
Another Example from the book - Key-Value Store
Let’s build a practical key-value store engine:
defengine KVStore do
@moduledoc "A key-value store engine"
version("1.0.0")
mode(:process)
interface do
message(:put, key: :atom, value: :any)
message(:get, key: :atom)
message(:delete, key: :atom)
message(:list_keys)
message(:clear)
end
config do
%{
max_entries: 1000,
ttl_enabled: false
}
end
env do
%{
store: %{},
timestamps: %{},
entry_count: 0
}
end
behaviour do
on_message :put, %{key: key, value: value}, config, env, sender do
if env.entry_count >= config.max_entries and not Map.has_key?(env.store, key) do
{:ok, [{:send, sender, {:error, :store_full}}]}
else
new_store = Map.put(env.store, key, value)
new_timestamps = Map.put(env.timestamps, key, System.system_time(:second))
new_count = if Map.has_key?(env.store, key), do: env.entry_count, else: env.entry_count + 1
new_env = %{
env |
store: new_store,
timestamps: new_timestamps,
entry_count: new_count
}
{:ok, [{:update_environment, new_env}, {:send, sender, {:ok, :stored}}]}
end
end
on_message :get, %{key: key}, _config, env, sender do
case Map.get(env.store, key) do
nil -> {:ok, [{:send, sender, {:error, :not_found}}]}
value -> {:ok, [{:send, sender, {:ok, value}}]}
end
end
on_message :delete, %{key: key}, _config, env, sender do
if Map.has_key?(env.store, key) do
new_store = Map.delete(env.store, key)
new_timestamps = Map.delete(env.timestamps, key)
new_env = %{
env |
store: new_store,
timestamps: new_timestamps,
entry_count: env.entry_count - 1
}
{:ok, [{:update_environment, new_env}, {:send, sender, {:ok, :deleted}}]}
else
{:ok, [{:send, sender, {:error, :not_found}}]}
end
end
on_message :list_keys, _payload, _config, env, sender do
keys = Map.keys(env.store)
{:ok, [{:send, sender, {:keys, keys}}]}
end
on_message :clear, _payload, _config, _env, sender do
new_env = %{store: %{}, timestamps: %{}, entry_count: 0}
{:ok, [{:update_environment, new_env}, {:send, sender, {:ok, :cleared}}]}
end
end
end
Test the KV Store
{:ok, kv_addr} = spawn_engine(KVStore)
Store some values - all asynchronous
send_message(kv_addr, {:put, %{key: :name, value: "Engine System"}})
send_message(kv_addr, {:put, %{key: :version, value: "1.0.0"}})
send_message(kv_addr, {:put, %{key: :language, value: "Elixir"}})
Retrieve values - asynchronous
send_message(kv_addr, {:get, %{key: :name}})
send_message(kv_addr, {:get, %{key: :version}})
List all keys - asynchronous
send_message(kv_addr, {:list_keys, %{}})
Verifying Engine Processing
An important question: when you send a message and get :ok
back, how do you
know the engine actually processed it? The Engine System is asynchronous - :ok
means “message queued”, not “message processed”. Here’s how to verify engines
are actually working:
Process Reductions - The Smoking Gun
The most reliable way to prove engines are processing messages is monitoring process reductions - Erlang’s unit of computational work:
Spawn an engine
{:ok, echo_addr} = spawn_engine(EchoEngine)
{:ok, instance} = EngineSystem.lookup_instance(echo_addr)
Capture computational work before sending message
before_reductions = Process.info(instance.engine_pid)[:reductions]
Send message
result = send_message(echo_addr, {:echo, %{content: "test"}})
Check computational work after processing
Process.sleep(100) # Allow processing
after_reductions = Process.info(instance.engine_pid)[:reductions]
work_done = after_reductions - before_reductions
if work_done > 0 do
IO.puts("Yey, PROOF: Engine processed your message!")
else
IO.puts("No processing detected")
end
Message Queue Monitoring
Check if messages are consumed from queues:
Check queue before sending
queue_before = Process.info(instance.engine_pid)[:message_queue_len]
send_message(echo_addr, {:ping, %{}})
Process.sleep(50)
queue_after = Process.info(instance.engine_pid)[:message_queue_len]
if queue_before > queue_after or queue_after == 0 do
IO.puts("Message consumed from queue!")
end
State Changes for Stateful Engines
For engines that maintain state, verify state evolution:
For counter engines, state changes prove processing
{:ok, counter_addr} = spawn_engine(CounterEngine)
send_message(counter_addr, {:increment, %{}})
send_message(counter_addr, {:increment, %{}})
{:ok, counter_instance} = EngineSystem.lookup_instance(counter_addr)
reductions = Process.info(counter_instance.engine_pid)[:reductions]
IO.puts("Counter did #{reductions} computational units of work")
{:ok, counter_instance} = EngineSystem.lookup_instance(counter_addr)
System Health Monitoring
Verify overall system health:
Check system state
system_info = EngineSystem.get_system_info()
instances = EngineSystem.list_instances()
IO.puts("Running instances: #{system_info.running_instances}")
Check each instance health
Enum.each(instances, fn inst ->
engine_alive = Process.alive?(inst.engine_pid)
mailbox_alive = Process.alive?(inst.mailbox_pid)
status = if engine_alive and mailbox_alive, do: "HEALTHY", else: "DEAD"
IO.puts("#{status} #{inspect(inst.address)}")
if engine_alive do
info = Process.info(inst.engine_pid)
IO.puts(" Work done: #{info[:reductions]} reductions")
IO.puts(" Queue: #{info[:message_queue_len]} messages")
end
end)