Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Untitled notebook

notebooks/essentials.livemd

Untitled notebook

Engine System Essentials Tutorial

Mix.install([
  {:engine_system, path: "."}
])

Introduction

Welcome to the Engine System tutorial! This interactive notebook will guide you through the essentials of the Engine System library - a powerful implementation of the Engine Model in Elixir.

The Engine System promotes mailboxes to first-class actors and provides a clean separation between processing engines and their mailboxes, following formal specifications. Let’s explore the key concepts step by step.

What is an Engine?

An engine is an actor-like entity that enables:

  • Type-safe message passing
  • Effectful actions through guarded actions
  • State and environment management
  • Interface-driven communication

The system implements two types of engines:

  • Processing Engines: Handle business logic and computation
  • Mailbox Engines: Act as first-class actors for message reception and validation

You find yourself writing more processing engines than mailbox engines.

Important: The Engine System uses asynchronous message passing. When you send a message, it returns :ok if the message was queued successfully, but the actual processing happens asynchronously.

Getting Started

Start the Engine System

use EngineSystem
EngineSystem.start()

Check system information

EngineSystem.get_system_info()

Your First Engine

Let’s start with a simple echo engine to understand the basic DSL:

defengine EchoEngine do
    @moduledoc "A simple echo engine for learning"

    version("1.0.0")
    mode(:process)  # This is a processing engine

    interface do
      message(:echo, content: :any)
      message(:ping)
    end

    behaviour do
      on_message :echo, msg_payload, _config, _env, sender do
        content = msg_payload[:content] || "Hello from Echo!"
        {:ok, [{:send, sender, {:echo_reply, content}}]}
      end

      on_message :ping, _msg_payload, _config, _env, sender do
        {:ok, [{:send, sender, :pong}]}
      end
    end
  end

Spawn an instance of our echo engine

{:ok, echo_address} = spawn_engine(EchoEngine)

Send a message to our echo engine

send_message(echo_address, {:echo, %{content: "Hello Engine System!"}})

Send a ping

result = send_message(echo_address, {:ping, %{}})

Messaging

The Engine System is designed for asynchronous, distributed computing. When you send a message:

  1. The message is validated against the target engine’s interface
  2. If valid, it’s queued for processing (returns :ok)
  3. The engine processes the message asynchronously
  4. Any response effects are executed (like sending replies)

This design enables:

  • High concurrency and performance
  • Distributed systems with message routing
  • Fault tolerance and supervision

Verify our echo engine is working by checking system information

EngineSystem.get_system_info()

List all running instances to see our echo engine

EngineSystem.list_instances()

Check what messages our echo engine supports

EngineSystem.get_instance_message_tags(echo_address)

Engine Anatomy

Every engine definition consists of several key parts:

1. Metadata

  • version/1: Engine version for compatibility
  • mode/1: Either :process (business logic) or :mailbox (message handling)

2. Interface

  • message/1 or message/2: Define what messages the engine can receive
  • Type specifications for message fields

3. Configuration & Environment

  • config/1: Static configuration (optional)
  • env/1: Mutable environment state (optional)

4. Behaviour

  • on_message/5: Message handlers that define what happens when messages arrive

Let’s create a more sophisticated example:

defengine CounterEngine do
    @moduledoc "A stateful counter engine"
    mode(:process)

    interface do
      message(:increment)
      message(:decrement)
      message(:get_count)
      message(:add, value: :integer)
      message(:reset)
    end

    config do
      %{
        max_count: 100,
        notifications: true
      }
    end

    env do
      %{
        counter: 0,
        history: []
      }
    end

    behaviour do
      on_message :increment, _payload, config, env, sender do
        if env.counter < config.max_count do
          new_counter = env.counter + 1
          new_env = %{env | counter: new_counter, history: [env.counter | env.history]}
          
          response = if config.notifications do
            {:count_updated, new_counter}
          else
            {:ok, new_counter}
          end
          
          {:ok, [{:update_environment, new_env}, {:send, sender, response}]}
        else
          {:ok, [{:send, sender, {:error, :max_count_reached}}]}
        end
      end

      on_message :decrement, _payload, _config, env, sender do
        new_counter = max(0, env.counter - 1)
        new_env = %{env | counter: new_counter, history: [env.counter | env.history]}
        {:ok, [{:update_environment, new_env}, {:send, sender, {:ok, new_counter}}]}
      end

      on_message :get_count, _payload, _config, env, sender do
        {:ok, [{:send, sender, {:count, env.counter}}]}
      end

      on_message :add, %{value: value}, config, env, sender do
        new_counter = env.counter + value
        if new_counter <= config.max_count do
          new_env = %{env | counter: new_counter}
          {:ok, [{:update_environment, new_env}, {:send, sender, {:ok, new_counter}}]}
        else
          {:ok, [{:send, sender, {:error, :would_exceed_max}}]}
        end
      end

      on_message :reset, _payload, _config, _env, sender do
        new_env = %{counter: 0, history: []}
        {:ok, [{:update_environment, new_env}, {:send, sender, {:ok, :reset}}]}
      end
    end
  end

Spawn a counter engine

{:ok, counter_address} = spawn_engine(CounterEngine)

Test the counter functionality - all asynchronous

IO.puts("Sending get_count message...")
send_message(counter_address, {:get_count, %{}})
IO.puts("Sending increment message...")
send_message(counter_address, {:increment, %{}})
IO.puts("Sending add message...")
send_message(counter_address, {:add, %{value: 5}})
IO.puts("Sending another get_count message...")
send_message(counter_address, {:get_count, %{}})

Message Validation

The Engine System provides comprehensive message validation:

Validate a message before sending - this works synchronously

validation_result = EngineSystem.validate_message(counter_address, {:increment, %{}})

This should fail validation - unknown message type

validation_result = EngineSystem.validate_message(counter_address, {:invalid_message, %{}})

Check what messages are supported

supported_messages = EngineSystem.get_instance_message_tags(counter_address)
IO.puts("Supported messages: #{inspect(supported_messages)}")

Message Effects and Actions

Engine behaviors return effects that describe what should happen. Common effects include:

  • {:send, address, message}: Send a message to another engine
  • {:update_environment, new_env}: Update the engine’s environment
  • {:spawn, engine_name, engconf, engenv} : Spawn a new engine with that config and environment.
  • {:terminate, reason}: Terminate the engine
defengine NotifierEngine do

    interface do
      message(:notify, message: :string, target: :string)
      message(:broadcast, message: :string)
    end

    env do
      %{
        subscribers: [],
        message_count: 0
      }
    end

    behaviour do
      on_message :notify, %{message: msg, target: target}, _config, env, sender do
        new_env = %{env | message_count: env.message_count + 1}
        
        effects = [
          {:update_environment, new_env},
          {:send, sender, {:notification_sent, target}},
          # In a real system, you'd look up the target address
          {:send, sender, {:debug, "Would notify #{target}: #{msg}"}}
        ]
        
        {:ok, effects}
      end

      on_message :broadcast, %{message: msg}, _config, env, sender do
        new_env = %{env | message_count: env.message_count + 1}
        
        # Create effects for each subscriber
        subscriber_effects = Enum.map(env.subscribers, fn sub ->
          {:send, sub, {:broadcast_message, msg}}
        end)
        
        all_effects = [
          {:update_environment, new_env},
          {:send, sender, {:broadcast_sent, length(env.subscribers)}}
          | subscriber_effects
        ]
        
        {:ok, all_effects}
      end
    end
  end

System Registry and Discovery

The Engine System provides powerful registry and discovery features:

# List all registered engine specifications
EngineSystem.list_specs()
# List all running engine instances
EngineSystem.list_instances()
# Look up information about a specific instance
EngineSystem.lookup_instance(counter_address)

Part 7: Named Engines and Communication

You can spawn engines with names for easier reference:

# Spawn a named engine
{:ok, named_counter} = spawn_engine(CounterEngine, %{}, %{}, :JonaCounter)
# Look up by name
EngineSystem.lookup_address_by_name(:JonaCounter)
# Send message to named engine
send_message(named_counter, {:increment, %{}})
send_message(named_counter, {:get_count, %{}})

Sending a message with the wrong format should fail. The contract checker should in principle fails.

# send_message(named_counter, {:incrxxxement, %{}})

Custom Mailbox Engines

Advanced feature: You can create custom mailbox engines for specialized message handling:

defengine PriorityMailbox do
    @moduledoc "A mailbox that prioritizes certain messages"
    
    version("1.0.0")
    mode(:mailbox)  # This is a mailbox engine
    
    interface do
      message(:high_priority, content: :any)
      message(:normal_priority, content: :any)
      message(:low_priority, content: :any)
    end
    
    env do
      %{
        high_queue: :queue.new(),
        normal_queue: :queue.new(),
        low_queue: :queue.new()
      }
    end
    
    behaviour do
      # Mailbox engines handle message queuing differently
      on_message :high_priority, payload, _config, env, _sender do
        new_high_queue = :queue.in(payload, env.high_queue)
        new_env = %{env | high_queue: new_high_queue}
        {:ok, [{:update_environment, new_env}]}
      end
      
      on_message :normal_priority, payload, _config, env, _sender do
        new_normal_queue = :queue.in(payload, env.normal_queue)
        new_env = %{env | normal_queue: new_normal_queue}
        {:ok, [{:update_environment, new_env}]}
      end
    end
  end

Guidelines

1. Message Interface Design

  • Keep interfaces focused and cohesive
  • Use descriptive message names
  • Specify field types for validation

2. State Management

  • Use environment for mutable state
  • Use configuration for immutable settings
  • Keep state minimal and well-structured

3. Error Handling

  • Always validate inputs in message handlers
  • Return appropriate error messages
  • Use guard clauses for preconditions

4. Asynchronous Patterns

  • Design for asynchronous processing
  • Don’t expect immediate responses
  • Use message effects for complex workflows

Advanced Features

Engine Lifecycle Management

Get system status

EngineSystem.get_system_info()

Clean up terminated engines

EngineSystem.clean_terminated_engines()

Terminate a specific engine

EngineSystem.terminate_engine(named_counter)

Another Example from the book - Key-Value Store

Let’s build a practical key-value store engine:

defengine KVStore do
    @moduledoc "A key-value store engine"

    version("1.0.0")
    mode(:process)

    interface do
      message(:put, key: :atom, value: :any)
      message(:get, key: :atom)
      message(:delete, key: :atom)
      message(:list_keys)
      message(:clear)
    end

    config do
      %{
        max_entries: 1000,
        ttl_enabled: false
      }
    end

    env do
      %{
        store: %{},
        timestamps: %{},
        entry_count: 0
      }
    end

    behaviour do
      on_message :put, %{key: key, value: value}, config, env, sender do
        if env.entry_count >= config.max_entries and not Map.has_key?(env.store, key) do
          {:ok, [{:send, sender, {:error, :store_full}}]}
        else
          new_store = Map.put(env.store, key, value)
          new_timestamps = Map.put(env.timestamps, key, System.system_time(:second))
          new_count = if Map.has_key?(env.store, key), do: env.entry_count, else: env.entry_count + 1
          
          new_env = %{
            env | 
            store: new_store,
            timestamps: new_timestamps,
            entry_count: new_count
          }
          
          {:ok, [{:update_environment, new_env}, {:send, sender, {:ok, :stored}}]}
        end
      end

      on_message :get, %{key: key}, _config, env, sender do
        case Map.get(env.store, key) do
          nil -> {:ok, [{:send, sender, {:error, :not_found}}]}
          value -> {:ok, [{:send, sender, {:ok, value}}]}
        end
      end

      on_message :delete, %{key: key}, _config, env, sender do
        if Map.has_key?(env.store, key) do
          new_store = Map.delete(env.store, key)
          new_timestamps = Map.delete(env.timestamps, key)
          new_env = %{
            env |
            store: new_store,
            timestamps: new_timestamps,
            entry_count: env.entry_count - 1
          }
          {:ok, [{:update_environment, new_env}, {:send, sender, {:ok, :deleted}}]}
        else
          {:ok, [{:send, sender, {:error, :not_found}}]}
        end
      end

      on_message :list_keys, _payload, _config, env, sender do
        keys = Map.keys(env.store)
        {:ok, [{:send, sender, {:keys, keys}}]}
      end

      on_message :clear, _payload, _config, _env, sender do
        new_env = %{store: %{}, timestamps: %{}, entry_count: 0}
        {:ok, [{:update_environment, new_env}, {:send, sender, {:ok, :cleared}}]}
      end
    end
  end

Test the KV Store

{:ok, kv_addr} = spawn_engine(KVStore)

Store some values - all asynchronous

send_message(kv_addr, {:put, %{key: :name, value: "Engine System"}})
send_message(kv_addr, {:put, %{key: :version, value: "1.0.0"}})
send_message(kv_addr, {:put, %{key: :language, value: "Elixir"}})

Retrieve values - asynchronous

send_message(kv_addr, {:get, %{key: :name}})
send_message(kv_addr, {:get, %{key: :version}})

List all keys - asynchronous

send_message(kv_addr, {:list_keys, %{}})

Verifying Engine Processing

An important question: when you send a message and get :ok back, how do you know the engine actually processed it? The Engine System is asynchronous - :ok means “message queued”, not “message processed”. Here’s how to verify engines are actually working:

Process Reductions - The Smoking Gun

The most reliable way to prove engines are processing messages is monitoring process reductions - Erlang’s unit of computational work:

Spawn an engine

{:ok, echo_addr} = spawn_engine(EchoEngine)
{:ok, instance} = EngineSystem.lookup_instance(echo_addr)

Capture computational work before sending message

before_reductions = Process.info(instance.engine_pid)[:reductions]

Send message

result = send_message(echo_addr, {:echo, %{content: "test"}})

Check computational work after processing

Process.sleep(100)  # Allow processing
after_reductions = Process.info(instance.engine_pid)[:reductions]
work_done = after_reductions - before_reductions
if work_done > 0 do
  IO.puts("Yey, PROOF: Engine processed your message!")
else
  IO.puts("No processing detected")
end

Message Queue Monitoring

Check if messages are consumed from queues:

Check queue before sending

queue_before = Process.info(instance.engine_pid)[:message_queue_len]
send_message(echo_addr, {:ping, %{}})
Process.sleep(50)
queue_after = Process.info(instance.engine_pid)[:message_queue_len]
if queue_before > queue_after or queue_after == 0 do
  IO.puts("Message consumed from queue!")
end

State Changes for Stateful Engines

For engines that maintain state, verify state evolution:

For counter engines, state changes prove processing

{:ok, counter_addr} = spawn_engine(CounterEngine)
send_message(counter_addr, {:increment, %{}})
send_message(counter_addr, {:increment, %{}})
{:ok, counter_instance} = EngineSystem.lookup_instance(counter_addr)
reductions = Process.info(counter_instance.engine_pid)[:reductions]
IO.puts("Counter did #{reductions} computational units of work")
{:ok, counter_instance} = EngineSystem.lookup_instance(counter_addr)

System Health Monitoring

Verify overall system health:

Check system state

system_info = EngineSystem.get_system_info()
instances = EngineSystem.list_instances()

IO.puts("Running instances: #{system_info.running_instances}")

Check each instance health

Enum.each(instances, fn inst ->
  engine_alive = Process.alive?(inst.engine_pid)
  mailbox_alive = Process.alive?(inst.mailbox_pid)

status = if engine_alive and mailbox_alive, do: "HEALTHY", else: "DEAD"
  IO.puts("#{status} #{inspect(inst.address)}")

if engine_alive do

info = Process.info(inst.engine_pid)
IO.puts("  Work done: #{info[:reductions]} reductions")
IO.puts("  Queue: #{info[:message_queue_len]} messages")
end
end)