Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us
Notesclub

About Finite Automata with Elixir/Erlang

state-machines.livemd

About Finite Automata with Elixir/Erlang

Mix.install([
  {:machinery, "~> 1.0.0"},
  {:gen_state_machine, "~> 3.0"},
  {:xstate, "~> 0.1.0"},
  {:jason, "~> 1.4"},
  {:fsmx, "~> 0.4.0"},
  {:kino_db, "~> 0.2.1"},
  {:ecto_sqlite3, "~> 0.9.1"},
  {:ecto_sql, "~> 3.9"},
  {:kino_kroki, "~> 0.1"}
])

Introduction

State machines or State Automata are designed to recognize patterns in general (cf Wikipedia). A machine M is represented by a finite numbers of states and inputs, and by a transition_function t which takes a state, an input (eq event) and returns a state and some actions: $t: S\times I \to (A,S)$. The machine describes all possible transitions, the change of state in response to some inputs.

A transition is meant to Run To Completion. One interest with the State Machine pattern is to trigger side-effects (the action $A$ above) that can run before or after a transition. Side-effects are typicaly updating a database, trigger timers, or sending messages, typically an email. Actions on the database are meant to be atomic, executed within a transaction so that they can be rolled back.

> There exits a pattern sagas defined to manage Long Lived Transactions. Sage is an Elixir implementation. It allows you to secure distributed transactions. It transforms a LLT guarded by a succession of with clauses and error handling into an execution pipeline where you define a “compensation” for each transaction to enable secured rollbacks.

The finite state machine formalism can be interresting to use when your process graph is not only a sequential series of events, but has some cycles, branches or timeouts. Erlang has an in-build solution :gen_statem and its Elixir version GenStateMachine.

Some blogs and documentation:

Below are presented:

  • a (simple) “struct” event-based module implementation of a state machine, with the “door” example,
  • the “door” implementated with the libraries Machinery and gen_statem and Fsmx. The main difference is that you need to code every transitions with gen_statem whilst it relies on a map of transitions for the others.
  • the “door” with an Ecto based implementation with Fsmx using a local serverless SQLite session. We use an atomic transaction that can be rolled-back.
  • a time-event based process using GenStateMachine for the “inactive module” process, where you control when you logout an inactive user,
  • an NFA example with a regex, built with gen_statem.

DFA: the Door model

DFA or Deterministic Finite Automata: given a state and an input, there exists only one output or next state. For this reason it is said to be “deterministic”. The words “input” and “event” are equivalent here.

Let’s take the door example; it can open or locked or unlocked.

stateDiagram-v2
direction LR
  locked-->unlocked: unlock
  unlocked-->opened: open
  opened-->unlocked: close
  unlocked-->locked: lock

The map of our transitions $t: S\times E\to S$ is:

%{
  {:locked, :unlock} => :unlocked,
  {:unlocked, :lock} => :locked,
  {:unlocked, :open} => :opened,
  {:opened, :close} => :unlocked,
  {"*", :any}        => :trapped         #<-- :dead_end
}

Not all possible comibnations are declared above. For example, when the state :locked receives the event :unlock, the state changes to :unlocked but we don’t know ofr the effect of :close on :locked. Depending on your needs, you may consider that for any other state, the effect of the event :unlock is:

  • whether the identity,
  • or a trap state ❌ where you raise an error as a non-existing transition.

You have several ways to represent this:

  • an example of state transition diagram:
    stateDiagram-v2
    direction LR

    opened --> unlocked: close
    opened --> ❌: lock, unlock, open
    locked --> unlocked: unlock
    locked --> ❌: lock, open, close
    unlocked --> opened: open
    unlocked --> unlocked: unlock, close
    unlocked --> locked: lock
  • an example of the corresponding state/event transition table:
effect of event on state unlocked locked opened
:unlock id -> unloked
:lock -> locked
:open -> opened
:close id -> unlocked

Struct based implementation

As said before, the transition function t takes this struct (current state) and an event and returns the next state plus possible an action: $t: S\times E \to (A,S)$. We will code the following function:

   handle_event: (%State{state: current}, event)  ->  %State{state: new_state}

Thanks to pattern matching, Elixir makes it easy to code all of the allowed transitions. For simplicity, all the effects of inputs on the states that are not listed as admissible are considered as trapped, in other words will generate an error tuple.

The module below is just a bunch of functions and does not persist the state. This means it is usable only to test if a given sequence of events is allowed.

defmodule SDoor do
  @moduledoc """
  Simple implementation of a struct and pattern matching (cf "source")
  """
  require Logger
  defstruct state: :locked

  @doc """
  Takes a %SDoor{} struct with a given state, and an event, and returns
  the struct with updated state.
    iex> SDoor.handle_event(%SDoor{state: :unlocked}, :open)
    {:ok, %SDoor{state: :opened}}
    iex> SDoor.handle_event(%SDoor{state: :locked}, :open)
    {:error, :event_not_allowed}
  """
  def handle_event(%SDoor{state: :locked} = door, :unlock) do
    {:ok, %SDoor{door | state: :unlocked}}
  end

  def handle_event(%SDoor{state: :unlocked} = door, :open) do
    {:ok, %SDoor{door | state: :opened}}
  end

  def handle_event(%SDoor{state: :opened} = door, :close) do
    {:ok, %SDoor{door | state: :unlocked}}
  end

  def handle_event(%SDoor{state: :unlocked} = door, :lock) do
    {:ok, %SDoor{door | state: :locked}}
  end

  # all other combinations state/event are not permitted.
  def handle_event(_, _), do: {:error, :event_not_allowed}
end

We can only test the validity of a sequence of events transitions: [unlock]->[open]->[lock]. The last one is meant to fail.

%SDoor{}
|> SDoor.handle_event(:unlock)
|> elem(1)
|> SDoor.handle_event(:open)
|> elem(1)
|> SDoor.handle_event(:lock)
|> dbg()

If we want to manage state, we can introduce an Agent:

defmodule ADoor do
  use Agent
  require Logger

  def start(init), do: Agent.start(fn -> init end)
  def state(pid), do: Agent.get(pid, &amp; &amp;1)
  def set(pid, state), do: Agent.update(pid, fn _ -> state end)
end

We then use the Agent to manage the state struct in a function handle_event that pattern matches on every possible combination current_state x event -> next_state. This simple pattern builds a simple state machine.

defmodule AgentDoor do
  defstruct state: :locked

  def start, do: ADoor.start(%__MODULE__{})
  def start(init), do: ADoor.start(%__MODULE__{state: init})

  def state(pid), do: ADoor.state(pid)
  def set(pid, state), do: ADoor.set(pid, %__MODULE__{state: state})

  def handle_event(%__MODULE__{state: :locked}, pid, :unlock) do
    set(pid, :unlocked)
    {:ok, state(pid)}
  end

  def handle_event(%__MODULE__{state: :unlocked}, pid, :open) do
    set(pid, :opened)
    {:ok, state(pid)}
  end

  def handle_event(%__MODULE__{state: :opened}, pid, :close) do
    set(pid, :unlocked)
    {:ok, state(pid)}
  end

  def handle_event(%__MODULE__{state: :unlocked}, pid, :lock) do
    set(pid, :locked)
    {:ok, state(pid)}
  end

  def handle_event(_, _, _), do: {:error, :event_not_allowed}
end

We can pass a sequence of events (the last transition is meant ot fail) and retrieve the state:

{:ok, pid} = AgentDoor.start()

result =
  AgentDoor.state(pid)
  |> AgentDoor.handle_event(pid, :unlock)
  |> elem(1)
  |> AgentDoor.handle_event(pid, :open)
  |> elem(1)
  |> AgentDoor.handle_event(pid, :lock)

{
  result,
  AgentDoor.state(pid)
}

Machinery

We can use the more robust package Machinery; it is based on a struct that holds the state. An article written by the author. Instead of evaluating an event on the state, you can use a transitions map:

transitions: %{
  "locked" => "unlocked",
  "unlocked" => ["locked", "opened"],
  "opened" => "unlocked"
}

You evaluate if a next possible state is possible given the current state with Machinery.transition_to/3:

iex> Machinery.transition_to(%MyModule{}, MyModule, "next_state")
{:ok, new_state_struct}
#or
{:error, "Transition to this state isn't declared"}

> Note that with Machinery, you cannot consider identity actions because you would have multiple transitions for the same state.

The struct can be an Ecto schema so you can get persistance into a database.

Lastly, besides being declarative, you use strings as state identifiers.

Below we display the same “deterministic” door process using the simple Machinery declarative style.

A transition has typically side-effects. $S\times E\to (A,S)$ . We can add anything useful to the struct, a name, a counter… that can be changed. For example, the struct holds a counter that is incremented by 1 each time the state goes to :opened. We also implement an async action triggered when the state leaves the value :locked. The obvious problem with this simple code is that some action could run to completion whilst our transition doesn’t. In particular if the action is run concurrently, ie async and you can’t cancel it.

defmodule MachineryDoor do
  use Machinery,
    # field: :door_state <-- you can alias the "state" here
    # The first state declared will be considered the initial state.
    states: ["locked", "opened", "unlocked"],
    transitions: %{
      "locked" => "unlocked",
      "unlocked" => ["locked", "opened"],
      "opened" => "unlocked"
    }
end

#### business logic module ####
defmodule MDoor do
  require Logger
  defstruct [:name, count: 0]

  def after_transition(%MDoor{} = struct, "opened") do
    Map.update!(struct, :count, &amp;(&amp;1 + 1))
  end

  def after_transition(%MDoor{} = struct, "unlocked") do
    Task.async(&amp;async_task/0) |> Task.await()
    struct
  end

  def async_task, do: Task.async(fn -> Logger.debug("warn the guard____") end)
end

We use Machinery.transition_to/3 to declare the next state we want to reach. We will test a pipeline of state transitions: [locked]->[unlocked]->[opened]->[locked]. The last one is meant to fail.

%MDoor{name: "kitchen"}
|> Machinery.transition_to(MachineryDoor, "unlocked")
|> elem(1)
|> Machinery.transition_to(MachineryDoor, "opened")
|> elem(1)
|> Machinery.transition_to(MachineryDoor, "locked")
|> dbg()

Ecto based example: Atomic side-effects with Fsmx

We want the state (and other meaningfull data) to be persisted to a database. We will use Ecto.Multi to run in transactions side-effects, such as sending emails, notifications.

The package Fsmx is convenient for this usage of running transactions with Ecto.

> They insist on the danger of running side-effects.

Set-up the SQLite database and Ecto.Repo

With Postgres, you need to run a server and have disk access. With SQLite3, we can run a local serverless RDB.

  1. We run it in memory with the :memory key. We set up the SQLite driver Exqlite to connect to the instance. This will give us a conn.

  2. We then create the table.

defmodule Door.Conn do
  def start do
    Exqlite.Sqlite3.open(":memory")
  end

  def create_table(conn) do
    Exqlite.Sqlite3.execute(
      conn,
      "CREATE TABLE IF NOT EXISTS doors (
          id integer primary key, 
          state string,
          count integer,
          inserted_at string,
          updated_at string
          )"
    )
  end

  def reset_table(conn) do
    Exqlite.Sqlite3.execute(conn, "DROP TABLE IF EXISTS doors")
    create_table(conn)
  end
end

{:ok, conn} = Door.Conn.start()
Door.Conn.create_table(conn)

The block below is useful to reset the database when you re-run the code several times.

# reset the database to erase the table
Door.Conn.reset_table(conn)
  • We define the Repo. We need an adapter EctoSQLite3 to convert Ecto routines into SQLite ones. When using the Ecto adapter, the code is database agnostic, independant from the database so we can change to say Postgres by changing the config and not the code.
defmodule Door.Repo do
  use Ecto.Repo, adapter: Ecto.Adapters.SQLite3, otp_app: :noop
end

We start the repo to whom we pass the database (you normally start the repo by passing it as a child to the Application supervisor).

opts = [database: ":memory", default_chunk_size: 100]

case Door.Repo.start_link(opts) do
  {:ok, pid} -> {:ok, pid}
  {:error, {_, pid}} -> {:ok, pid}
end
  • Set-up of the Ecto.Schema
defmodule EctoDoor do
  use Ecto.Schema
  require Logger

  schema "doors" do
    field(:state, :string, default: "locked")
    field(:count, :integer, default: 0)
    timestamps()
  end

  use Fsmx.Struct, fsm: EDoor
end
  • finally, the module that contains the business logic:
#### business logic module ##########
defmodule EDoor do
  use Fsmx.Struct,
    transitions: %{
      "locked" => "unlocked",
      "unlocked" => ["locked", "opened"],
      "opened" => "unlocked"
    }

  require Logger

  def before_transition(door, "locked", "unlocked") do
    chgset = transition_changeset(door, "locked", "unlocked", %{count: door.count + 1})
    notify("door is unlocked")
    {:ok, chgset}
  end

  def before_transition(door, "unlocked", "opened") do
    notify("door is open")
    chgset = transition_changeset(door, "unlocked", "opened", %{count: door.count + 10})
    {:ok, chgset}
  end

  # send a message/email etc..  when door enters the state "open"
  def after_transition_multi(_door, _, "opened") do
    notify("door is open from multi")
    {:ok, nil}
  end

  def transition_changeset(%EctoDoor{} = door, _from, _to, params) do
    Ecto.Changeset.cast(door, params, [:count, :inserted_at, :updated_at])
  end

  # we spawn a task to run async notifications, emails...
  defp notify(msg), do: Task.async(fn -> send_email(msg) end) |> Task.await()
  defp send_email(msg), do: Task.async(fn -> Logger.info("#{msg}________________") end)
end

We want to perform the following actions on transitions:

stateDiagram-v2
direction LR

  locked --> unlocked: unlock
  note right of locked
    on [locked] to [unlocked]
    incr count +1 & send email
    ok
  end note
  unlocked --> opened: open
  note left of opened
    on [unlocked] to [opened]
    inc count +10 & send email
    ok
  end note

Since use schemas, we will use Fsmx.transition_changeset to build a changeset to persist to the database. If the transition is not succesfull, the changeset will not be valid.

We will test a non-atomic pipeline of state transitions [locked]->[unlocked]->[locked]->[unlocked]. The last one is meant to fail. Note that the emails can be sent when the transition fails. We should get a count of 2 and 2 messages sent via the before_transition callback.

init_door =
  Ecto.Changeset.cast(%EctoDoor{state: "locked"}, %{}, [])
  |> Door.Repo.insert_or_update!()

Fsmx.transition_changeset(init_door, "unlocked")
|> Door.Repo.update!()
|> Fsmx.transition_changeset("locked")
|> Door.Repo.update!()
|> Fsmx.transition_changeset("unlocked")
|> Door.Repo.update!()
|> dbg()

We check what is persisted:

Door.Repo.all(EctoDoor) |> List.last()

We create an atomic transaction for the transition [unlocked]->[opened] with Fsmx.transition_multi and Ecto.Multi. The before_transition callback is triggered and we should get a count of 10. The after_transiion_multi callback should be triggered and we receive a message.

init_door =
  Ecto.Changeset.cast(%EctoDoor{state: "unlocked"}, %{}, [])
  |> Door.Repo.insert!()

# one transition
Ecto.Multi.new()
|> Fsmx.transition_multi(init_door, "t1", "opened")
|> Door.Repo.transaction()

We check if it is persisted:

Door.Repo.all(EctoDoor) |> List.last()

We now create an atomic pipeline for the same state transitions [locked]->[unlocked]->[opened]. We use Ecto.Multi.merge to chain. If the second transition fails, the first transition can still be completed, but the last email won’t be sent.

> we would normally start a job to send a mail (with Oban or EctoJob) rather than sending it with a transaction.

init_door =
  Ecto.Changeset.cast(%EctoDoor{state: "locked"}, %{}, [])
  |> Door.Repo.insert!()

# two chained transitions
Ecto.Multi.new()
|> Fsmx.transition_multi(init_door, "t1", "unlocked")
|> Ecto.Multi.merge(fn %{"t1" => unlocked} ->
  Ecto.Multi.new()
  |> Fsmx.transition_multi(unlocked, "t2", "opened")
end)
|> Door.Repo.transaction()

Let’s check

Door.Repo.all(EctoDoor) |> List.last()

Ecto based transitions vs “raw” transactions

> Below is an example of ExqLite transactions. You define a query in 2 or 3 steps: prepare, bind if inserting or updating data, and then run it with step.

# INSERT DATA
{:ok, statement} = Exqlite.Sqlite3.prepare(conn, "insert into doors (state) values (?1)")
Exqlite.Sqlite3.bind(conn, statement, ["unlocked"])
Exqlite.Sqlite3.step(conn, statement)

{:ok, statement2} = Exqlite.Sqlite3.prepare(conn, "select * from doors")
Exqlite.Sqlite3.step(conn, statement2)

# Run several times `.step` to see other results.

To be compared with Ecto.Repo which is database agnostic:

Door.Repo.insert(%EDoor{state: "unlocked"})
Door.Repo.all(EDoor)

Erlang’s “gen_statem” module

We will use the :gen_statem module (from Erlang). It is more process orientated than data orientated. It is a GenServer like behaviour so is designed for event-driven automata..

Two callback modes are supported:

  • state functions: each state (only atoms) is a callback function
  • handle_event_functions. There is one callback function for all states.

You declare the callback_mode choosed in your module. In our case,handle_events_functions.

Instead of using :gen_statem, we will use the Elixxir librabry GenStateMachine. The client function is:

:gen_statem.call(pid, :event) <=> GenStateMachine.Call(pid, :event)

The callback is handle_event. You pattern match on an event and a state and return the next possible state and a mandatory reply. The generic handle_call is:

handle_event({:call, from}, :event, :current_state, data)

The return contains a list of so-called “actions“: these transition actions can be invoked by returning them from the callback”. It is mandatory to reply.

{:next_state, :next_possible_state, data, [{:reply, from, }]

Let’s implement the door with :gen_statem. We implement the 4 events (the “verbs”) on the 3 states in the handle_event callback:

defmodule GSDoor do
  # @behaviour GSm
  use GenStateMachine, callback_mode: :handle_event_function
  alias GenStateMachine, as: GSm

  def start, do: GSm.start(__MODULE__, :ok, [])
  def init(_), do: {:ok, :locked, 0}

  def terminate(reason, _state, _data),
    do: IO.inspect(" :door terminated with reason - #{reason}")

  def handle_event({:call, from}, :unlock, :locked, data) do
    Task.async(&amp;call_guard/0) |> Task.await()
    {:next_state, :unlocked, data, [{:reply, from, {:ok, :unlocked}}]}
  end

  def handle_event({:call, from}, :lock, :unlocked, data) do
    {:next_state, :locked, data, [{:reply, from, {:ok, :locked}}]}
  end

  def handle_event({:call, from}, :open, :unlocked, data) do
    data = data + 1
    {:next_state, :opened, data, [{:reply, from, {:ok, :opened, data}}]}
  end

  def handle_event({:call, from}, :close, :opened, data) do
    {:next_state, :unlocked, data, [{:reply, from, {:ok, :unlocked}}]}
  end

  def handle_event({:call, from}, _event, _content, data) do
    {:keep_state, data, [{:reply, from, {:error, :invalid_transition}}]}
  end

  def call_guard, do: Task.async(fn -> Logger.info("warn the guards___") end)
end

> Note the side-effects, and add messages in the returned tuple: we count the number of times the door has been opened, and return the count.

> With the synchronous .call, all actions must exit before the callback returns respecting the Run To Completion standard semantics of processing events. If we run an Task.async (as a demo), it must run to completion so we can call it indirectly. This is normal since we want the operation to be atomic.

We can test it:

{:ok, pid} = GSDoor.start()
alias GenStateMachine, as: GSm

[:unlock, :close, :open, :open, :close, :open, :close, :lock, :unlock, :open]
|> Enum.map(fn event ->
  :sys.get_state(pid) |> IO.inspect(label: :check_state)
  GSm.call(pid, event)
end)

Time base example: Inactive user

We want some pages to be accessible only to logged-in users. For safety, we also want to log them out after four minutes of inactivity. The UX designer asks to show a warning one minute before logging out to inform the user.

This is a deterministic event-driven process where we have to consider timers and event-listeners.

    stateDiagram-v2
    direction LR

    user_listener--> user_listener: key,mouse

    idle --> logged_out: 1_min_inactivity
    note right of idle 
        on transition,
        send push notification
        _
    end note
   
    logged_out --> logged_in: login 
    logged_in --> idle: 3_min_inactivity
    idle --> logged_in: activate

We use Erlangs’ module :gen_statem, more precisely use the package GenStateMachine to implement this since we can implement “timeouts” return actions from the event handler (see event-timeouts).

The transition option of event timeout: it starts a timer which on expiration generates a timeout, and can be cancelled by any incoming event.

{:next_state, :logged_in, data, 
    [{:timeout, @timer1, :first_timeout}]
}

There is no need to set-up your own timers or counters. You add to the callback response the optional action {timeout, time(ms), :identifier}. It will be pattern matched by a handle_event(:timeout, :identifier, state, data) callback. The event-timeout is choosen because any event that arrives cancels this time-out.

When a user logs-in, we have a state transition on the event, and we let the callback return the event-timeout action. The timer goes on while no other event cancels it. It will end with a state transition, from “logged-in” to “idle”. Any activity will cancel the timer. The “login” state can be hold as long as the session is not “idle”. The transition to the “idle” state is monitored by the module and the messages can be captured in a handle_event(:info?..). This callback will return another event-timeout, now targetting the logged_out state. We can easily insert push-notifications to alert the user.

defmodule Inactive do
  use GenStateMachine, callback_mode: :handle_event_function
  alias GenStateMachine, as: GSm

  require Logger
  @timer1 5_000
  @timer2 5_000

  def start(name: user), do: GSm.start(__MODULE__, user, [:debug])
  def init(user), do: {:ok, :logged_out, %{name: user, state: :logged_out, pid: nil}}

  def terminate(reason, _state, _data) do
    push_notification("Your session has expired  due to inactivity")
    Logger.debug(" :inactive terminated with reason - #{inspect(reason)}")
  end

  # login handler triggers first timeout countdown
  def handle_event({:call, from}, :login, :logged_out, data) do
    {pid, _} = from
    Logger.debug("--- Logged in ---")
    data = %{data | state: :logged_in, pid: pid}

    {:next_state, :logged_in, data,
     [{:reply, from, :logged_in}, {:timeout, @timer1, :first_timeout}]}
  end

  # activate session from any moment, transitions to :logged_in
  def handle_event({:call, from}, :activate, _, data) do
    data = %{data | state: :logged_in}
    push_notification(":-- User reactive. Reset session timers")

    {:next_state, :logged_in, data,
     [{:reply, from, :renewed_session}, {:timeout, @timer1, :first_timeout}]}
  end

  # first notification when state transitions to :idle
  def handle_event(:timeout, :first_timeout, :logged_in, data) do
    push_notification(
      "-- Dear #{data.name}, your session will expire in one minute due to inactivity"
    )

    data = %{data | state: :idle}
    {:next_state, :idle, data}
  end

  # handler to push notification and send signal :shutdown after second timeout
  def handle_event(:timeout, :second_timeout, :idle, _data) do
    Logger.info(":-- Session expired")
    {:stop, :shutdown, :normal}
  end

  # when session activated after transition to :idle
  def handle_event(:timeout, :second_timeout, :logged_in, data) do
    {:next_state, :logged_in, data, [{:timeout, @timer1, :first_timeout}]}
  end

  # catch messages
  def handle_event(:info, {_ref, :ok}, _, data) do
    {:keep_state, data}
  end

  def handle_event(:info, {:DOWN, _, _, _, :normal} = _msg, status, data) do
    {:next_state, status, data, [{:timeout, @timer2, :second_timeout}]}
  end

  # push notifier
  def push_notification(message) do
    Task.async(fn -> Logger.info("#{inspect(message)}") end)
  end
end

A user logs into the app and then stays idle. We watch what is happening.

{:ok, pid} = Inactive.start(name: "John")
alias GenStateMachine, as: GS
GS.call(pid, :login)

Let’s run a random sequence to simulate a users’ activity:

{:ok, pid} = Inactive.start(name: "John")
alias GenStateMachine, as: GSm
GSm.call(pid, :login)

spawn(fn ->
  for _ <- 1..3 do
    if Process.alive?(pid) do
      GSm.call(pid, :activate)
      (Enum.random(2..20) * 1000) |> Process.sleep()
    end
  end
end)

NFA: Regex example

Source

This is the most commun type of machines to represent how “real world” systems are working, such as IOT, robots…

An NFA or “Non deterministic Finite Automata” is a machine designed to be able to filter a set of reponses, ending with an acceptable state or not. With NF machines, some states can have multiple next states for the same input.

Every DFA is an NFA, but not all NFAs are DFAs. The good news is that every NFA can be converted to a DFA. End of the game? Not really, it is complicated to do so, unless the case is simple :)

The most commun examples of NFA are when using string analysis like regex. Given an acceptance criteria, we are looking for which strings are accepted or not by our machine.

Let’s say we want to build a finite state machine that can recognize any strings of letters from the set Q = {"a","b","c","d"} that:

  • starts with “a”
  • can have zero or many “b” and “c”
  • finishes with the next letter.

The accepted strings ca be for example, “abbbc”, “accd”, “abc”, “ad”. The equivalent regex is ~r/(a(c*d))|(a(b*c))/:

  string |> String.match?(string, ~r/(a(c*d))|(a(b*c))/)

You can test the regex with :

We have a NFA because we have multiple outputs under c for the state M:

  stateDiagram-v2
  direction LR
  [*]--> M1: a
  M1 --> M1: b
  M1 --> [*]: c
  [*]-->M2: a
  M2-->M2: c
  M2-->[*]: d

The transition table of the NFA is:

a b c d
I M1,M2 x x x
M1 x M1 D
M2. x x. M2 D
D x x x D

The transition table converted into a DFA takes into account that a state can only have one next state. If there are several, you combine them into a new one. Here, we combine M1 and M2 into a new state, M12. Then we evaluate where goes M12 under an input and take the union of the states.

a b c d
I M12 x x x
M12 x. M1 M2D D
M2D x. x. M2 D.
M1 x M1 D x
M2. x. x. M2 D.
D x x x D

The corresponding DFA of our machine is:

  stateDiagram-v2
  direction LR
    [*]-->M12: a
    M12-->M1: b
    M12-->M2D: c
    M12-->[*]: d
    M2D-->M2: c
    M2D-->[*]: d
    M1-->M1: b
    M1-->[*]: c
    M2-->M2: c
    M2-->[*]: d

You observe of course an increase in the number of states.

{event, state} -> new_state
{:a, :i}-> :m12
{:c, :m12} -> :m2d
{:b, :m12} -> :m1
{:d, :m12} -> :d
{:c, :m2d} -> :m2
{:d, :m2d} -> :d
{:c, :m2} -> :m2
{:d, :m2} -> :d
{:b, :m1}-> :m1
{:c, :m1} -> :d

We can code this with :gen_statem where we use atoms. We have 10 events to handle.

defmodule DFA do
  @behaviour :gen_statem

  def start, do: :gen_statem.start(__MODULE__, :ok, [])

  @impl :gen_statem
  def init(_), do: {:ok, :i, nil}

  @impl :gen_statem
  def terminate(reason, _state, _data),
    do: IO.inspect(" :door terminated with reason - #{reason}")

  @impl :gen_statem
  def callback_mode, do: :handle_event_function

  defp handle(data, letter, next, from, step) do
    data = data <> letter
    {:next_state, next, data, [{:reply, from, {step, data}}]}
  end

  @impl :gen_statem
  def handle_event({:call, from}, :a, :i, _), do: handle("", "a", :m12, from, :continue)

  def handle_event({:call, from}, :b, :m12, data), do: handle(data, "b", :m1, from, :continue)

  def handle_event({:call, from}, :c, :m12, data), do: handle(data, "c", :m2d, from, :ok)

  def handle_event({:call, from}, :d, :m12, data), do: handle(data, "d", :d, from, :ok)

  def handle_event({:call, from}, :c, :m2d, data), do: handle(data, "c", :m2d, from, :continue)

  def handle_event({:call, from}, :d, :m2d, data), do: handle(data, "d", :d, from, :ok)

  def handle_event({:call, from}, :c, :m2, data), do: handle(data, "c", :m2, from, :continue)

  def handle_event({:call, from}, :d, :m2, data), do: handle(data, "d", :d, from, :ok)

  def handle_event({:call, from}, :b, :m1, data), do: handle(data, "b", :m1, from, :continue)

  def handle_event({:call, from}, :c, :m1, data), do: handle(data, "c", :d, from, :ok)

  def handle_event({:call, from}, _event, _content, data) do
    {:keep_state, data, [{:reply, from, {:error, :invalid_transition}}]}
  end
end

Let’s test if we have build the regex ~r/(a(c*d))|(a(b*c))/):

s1 = [:a, :c, :b, :b, :c, :e, :c, :d, :f]
s2 = [:a, :f, :b, :b, :c, :e, :c, :d, :f]
s3 = [:a, :c, :b, :c, :d, :d, :d]
s4 = [:a, :b, :b, :d]
s5 = [:a, :c, :c, :c]
s6 = [:a, :c, :c, :c, :d]

defmodule Run do
  alias :gen_statem, as: GS

  def test(s) do
    {:ok, pid} = DFA.start()

    Enum.reduce(s, "", fn letter, acc ->
      case GS.call(pid, letter) do
        {:continue, string} -> {:partial_match, string}
        {:ok, string} -> {:match, string}
        {:error, _} -> acc
      end
    end)
  end
end

{
  Run.test(s1),
  Run.test(s2),
  Run.test(s3),
  Run.test(s4),
  Run.test(s5),
  Run.test(s6)
}