Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Engines in Elixir

README.livemd

Engines in Elixir

Section

CI

> NOTE This README is a livebook, so you can run it! > watch out for the terminal output!

git clone https://github.com/anoma/engine.git
cd engine
mix deps.get
make livebook
Mix.install([
  {:engine_system, path: "."}
])

Introduction

This library is a work-in-progress implementation of the Engine Model in Elixir, following the formal specification described in Dynamic Effective Timed Communication Systems and Mailbox-as-Actors (under review).

An engine is an actor-like entity that enables type-safe message passing and effectful actions through guarded actions. These actions can modify both the engine’s state and its environment.

Among other things, this library implements the Engine Model with a DSL and runtime system. The DSL lets you easily define engines that follow the formal specification, including configuration, state, message handling, and behaviours. The runtime system manages engine lifecycles, message passing, monitoring, and introspection.

Prerequisites & Installation

Prerequisites

  • Elixir 1.18.0 or higher
  • Erlang/OTP 28 or higher
# Check your Elixir version
System.version()
def deps do
  [
    {:engine_system, "~> 0.1.0"}
  ]
end

Installation

The package can be installed by adding engine_system to your list of dependencies in mix.exs:

The Engine Model

A complete implementation of the actor model with explicit mailbox-as-actors separation, based on the formal specifications described in the research paper. This system implements the core innovation of promoting mailboxes to first-class processing engines that receive messages but verify message writing using linked processing engines.

DSL for Engine Definition

The DSL features compile-time validation, clean simplified syntax, and unified import. Let’s start the system and explore some examples:

Import the EngineSystem DSL and utilities

use EngineSystem

Start the EngineSystem application

{:ok, _} = EngineSystem.start()

Simple Echo Engine

Let’s start with a basic echo engine:

defengine SimpleEcho do
  version "1.0.0"
  mode :process

  interface do
    message :echo, text: :string
    message :ping
  end

  behaviour do
    on_message :echo, msg, _config, _env, sender do
      IO.puts(IO.ANSI.blue() <> "HoTT is better than Cold" <> IO.ANSI.reset())
      {:ok, [{:send, sender, {:echo, msg}}]}
    end

    on_message :ping, _msg, _config, _env, sender do
      {:ok, [{:send, sender, :pong}]}
    end
  end
end
SimpleEcho.__engine_spec_

Now let’s spawn an instance and test it:

Spawn the echo engine

{:ok, echo_address} = EngineSystem.spawn_engine(SimpleEcho)
IO.puts("Echo engine spawned at: #{inspect(echo_address)}")
echo_address
send_message(echo_address, {:echo, %{text: "Hello Engine System!"}})

And if you opened the livebook in a terminal, you can see the output/trace of the message passing.

send_message(echo_address, {:ping, %{}})
EngineSystem.get_system_info()

Stateless Calculator Example

Here’s an example of a stateless processing engine:

defengine StatelessCalculator do
  version "1.0.0"
  mode :process

  interface do
    message :add, a: :number, b: :number
    message :multiply, a: :number, b: :number
    message :divide, a: :number, b: :number
    message :result, value: :number
  end

  behaviour do
    on_message :add, msg, _config, _env, sender do
      {a, b} = {msg[:a], msg[:b]}
      {:ok, [{:send, sender, {:result, a + b}}]}
    end

    on_message :multiply, msg, _config, _env, sender do
      {a, b} = {msg[:a], msg[:b]}
      {:ok, [{:send, sender, {:result, a * b}}]}
    end

    on_message :divide, msg, _config, _env, sender do
      {a, b} = {msg[:a], msg[:b]}
      if b != 0 do
        {:ok, [{:send, sender, {:result, a / b}}]}
      else
        {:ok, [{:send, sender, {:error, :division_by_zero}}]}
      end
    end
  end
end

Let’s test the calculator. Spawn the calculator:

{:ok, calc_address} = spawn_engine(StatelessCalculator)
send_message(calc_address, {:add, %{a: 10, b: 5}})
send_message(calc_address, {:multiply, %{a: 7, b: 8}})
send_message(calc_address, {:divide, %{a: 15, b: 3}})
send_message(calc_address, {:divide, %{a: 10, b: 0}})  # This should return an error

Stateful Counter Engine

Now let’s create an engine with configuration and environment (state):

defengine SimpleCounter do
  version "1.0.0"
  mode :process

  config do
    %{max_count: 100, step: 1}
  end

  env do
    %{count: 0, total_operations: 0}
  end

  interface do
    message(:increment)
    message(:decrement)
    message(:get_count)
    message(:reset)
    message(:count_response, value: :integer)
  end

  behaviour do
    on_message :increment, _msg, config, env, sender do
      new_count = min(env.count + config.step, config.max_count)
      new_env = %{env | count: new_count, total_operations: env.total_operations + 1}

      {:ok, [
        {:update_environment, new_env},
        {:send, sender, {:count_response, new_count}}
      ]}
    end

    on_message :decrement, _msg, config, env, sender do
      new_count = max(env.count - config.step, 0)
      new_env = %{env | count: new_count, total_operations: env.total_operations + 1}

      {:ok, [
        {:update_environment, new_env},
        {:send, sender, {:count_response, new_count}}
      ]}
    end

    on_message :get_count, _msg, _config, env, sender do
      {:ok, [{:send, sender, {:count_response, env.count}}]}
    end

    on_message :reset, _msg, _config, _env, sender do
      new_env = %{count: 0, total_operations: 0}
      {:ok, [
        {:update_environment, new_env},
        {:send, sender, {:count_response, 0}}
      ]}
    end
  end
end

Let’s test the counter. Spawn the counter with default config:

{:ok, counter_address} = spawn_engine(SimpleCounter)

Test counter operations

send_message(counter_address, {:get_count, %{}})
send_message(counter_address, {:increment, %{}})
send_message(counter_address, {:increment, %{}})
send_message(counter_address, {:get_count, %{}})
send_message(counter_address, {:decrement, %{}})
send_message(counter_address, {:get_count, %{}})
{:ok, counter_instance} = EngineSystem.lookup_instance({1,2})
state = EngineSystem.Engine.Instance.get_state(counter_instance.engine_pid)

Advanced Key-Value Store Example

Here’s a more complex example with configuration and sophisticated state management:

defengine MyKVStore do
  version "2.0.0"
  mode :process

  interface do
    message :get, key: :atom
    message :put, key: :atom, value: :any
    message :delete, key: :atom
    message :list_keys
    message :clear
    message :size
    message :result, value: {:option, :any}
  end

  config do
    %{
      access_mode: :read_write,
      max_size: 1000,
      timeout: 30.5,
      retries_enabled: true
    }
  end

  env do
    %{
      store: %{},
      access_counts: %{},
      last_accessed: nil,
      active_connections: 0
    }
  end

  behaviour do
    on_message :get, msg, _config, env, sender do
      key = msg[:key]
      value = Map.get(env.store, key, :not_found)
      new_counts = Map.update(env.access_counts, key, 1, &amp;(&amp;1 + 1))
      new_env = %{env | access_counts: new_counts, last_accessed: key}
      
      {:ok, [
        {:update_environment, new_env},
        {:send, sender, {:result, value}}
      ]}
    end

    on_message :put, msg, config, env, sender do
      {key, value} = {msg[:key], msg[:value]}
      
      if map_size(env.store) >= config.max_size and not Map.has_key?(env.store, key) do
        {:ok, [{:send, sender, {:error, :store_full}}]}
      else
        new_store = Map.put(env.store, key, value)
        new_env = %{env | store: new_store, last_accessed: key}
        
        {:ok, [
          {:update_environment, new_env},
          {:send, sender, :ack}
        ]}
      end
    end

    on_message :delete, msg, _config, env, sender do
      key = msg[:key]
      new_store = Map.delete(env.store, key)
      new_counts = Map.delete(env.access_counts, key)
      new_env = %{env | store: new_store, access_counts: new_counts}
      
      {:ok, [
        {:update_environment, new_env},
        {:send, sender, :ack}
      ]}
    end

    on_message :list_keys, _msg, _config, env, sender do
      keys = Map.keys(env.store)
      {:ok, [{:send, sender, {:result, keys}}]}
    end

    on_message :clear, _msg, _config, _env, sender do
      new_env = %{store: %{}, access_counts: %{}, last_accessed: nil, active_connections: 0}
      {:ok, [
        {:update_environment, new_env},
        {:send, sender, :ack}
      ]}
    end

    on_message :size, _msg, _config, env, sender do
      size = map_size(env.store)
      {:ok, [{:send, sender, {:result, size}}]}
    end
  end
end

Let’s test the KV store. Spawn with custom configuration:

custom_config = %{access_mode: :read_write, max_size: 5}
{:ok, kv_address} = spawn_engine(MyKVStore, custom_config)
send_message(kv_address, {:put, %{key: :name, value: "Engine System"}})
send_message(kv_address, {:put, %{key: :version, value: "2.0.0"}})
send_message(kv_address, {:put, %{key: :language, value: "Elixir"}})
send_message(kv_address, {:get, %{key: :name}})
send_message(kv_address, {:list_keys, %{}})
send_message(kv_address, {:size, %{}})

IO.puts("KV Store tests sent!")

System Management

Let’s explore the basic system management capabilities:

List all running instances

instances = EngineSystem.list_instances()
for instance <- instances do
  {node_id,engine_id} = instance.address
  {engine_name, eng_version} = instance.spec_key
  IO.puts("  node_id=#{inspect(node_id)}, engine_id=#{inspect(engine_id)} - #{engine_name}-#{eng_version}")
end

Get comprehensive system information

system_info = EngineSystem.get_system_info()
IO.puts("System Information:")
IO.puts("  Running instances: #{system_info.running_instances}")
IO.puts("  Registered specs: #{system_info.total_specs}")

Advanced System Management

Look up engine by address

case EngineSystem.lookup_instance(kv_address) do
  {:ok, info} ->
    IO.puts("Engine found:")
    {engine_name, eng_version} = info.spec_key
    IO.puts("  Name: #{engine_name}")
    IO.puts("  Version: #{eng_version}")
    IO.puts("  Status: #{info.status}")
    IO.puts("  Address: #{inspect(info.address)}")
  error ->
    IO.puts("Error looking up engine: #{inspect(error)}")
end

Spawn with a name for easy lookup

{:ok, named_address} = spawn_engine(SimpleCounter, %{max_count: 50}, %{}, :my_counter)

Look up by name

case EngineSystem.lookup_address_by_name(:my_counter) do
  {:ok, address} ->
    IO.puts("Found named engine at: #{inspect(address)}")
    send_message(address, {:increment, %{}})
    send_message(address, {:get_count, %{}})
  error ->
    IO.puts("Error finding named engine: #{inspect(error)}")
end

Interface Utilities

The system provides utilities for introspecting engine interfaces:

Check if engine supports a message

if EngineSystem.has_message?(:MyKVStore, "2.0.0", :get) do
  IO.puts("MyKVStore supports :get message")
end

Get message fields for a specific message

case EngineSystem.get_message_fields(:MyKVStore, "2.0.0", :put) do
  {:ok, fields} ->
    IO.puts("Fields for :put message: #{inspect(fields)}")
  error ->
    IO.puts("Error getting fields: #{inspect(error)}")
end

Get all supported message tags

tags = EngineSystem.get_message_tags(:MyKVStore, "2.0.0")
IO.puts("MyKVStore supports messages: #{inspect(tags)}")
instance_tags = EngineSystem.get_instance_message_tags(kv_address)
IO.puts("Running instance supports: #{inspect(instance_tags)}")

Message Validation

The system provides message validation before sending:

Validate a message before sending

case EngineSystem.validate_message(kv_address, {:get, %{key: :test}}) do
  :ok ->
    IO.puts("Message is valid!")
    send_message(kv_address, {:get, %{key: :test}})
  {:error, reason} ->
    IO.puts("Message validation failed: #{inspect(reason)}")
end

Try an invalid message

case EngineSystem.validate_message(kv_address, {:invalid_message, %{}}) do
  :ok ->
    IO.puts("Message is valid!")
  {:error, reason} ->
    IO.puts("Expected validation failure: #{inspect(reason)}")
end

Custom Mailbox Engines

You can create custom mailbox engines for specialized message handling:

defengine PriorityMailbox do
  version "1.0.0"
  mode :mailbox  # This is a mailbox engine

  interface do
    message :high_priority, content: :any
    message :normal_priority, content: :any
    message :low_priority, content: :any
  end

  env do
    %{
      high_queue: :queue.new(),
      normal_queue: :queue.new(),
      low_queue: :queue.new()
    }
  end

  behaviour do
    # Mailbox engines handle message queuing differently
    on_message :high_priority, payload, _config, env, _sender do
      new_high_queue = :queue.in(payload, env.high_queue)
      new_env = %{env | high_queue: new_high_queue}
      {:ok, [{:update_environment, new_env}]}
    end

    on_message :normal_priority, payload, _config, env, _sender do
      new_normal_queue = :queue.in(payload, env.normal_queue)
      new_env = %{env | normal_queue: new_normal_queue}
      {:ok, [{:update_environment, new_env}]}
    end

    on_message :low_priority, payload, _config, env, _sender do
      new_low_queue = :queue.in(payload, env.low_queue)
      new_env = %{env | low_queue: new_low_queue}
      {:ok, [{:update_environment, new_env}]}
    end
  end
end