Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

:gen_statem is Awesome

state_machines_are_awesome.livemd

:gen_statem is Awesome

State Machines

Toggle Button State Machine

Event Driven State Machines

> For an Event-Driven State Machine, the input is an event that triggers a state transition and the output is actions executed during the state transition > > — :gen_statem Design Principles

Example: Toggle Button

defmodule ToggleButton do
  @behaviour :gen_statem

  def start_link do
    :gen_statem.start_link(__MODULE__, [], [])
  end

  @impl :gen_statem
  def init(_opts) do
    # return :ok and the initial state (:off) and some empty data
    {:ok, :off, nil}
  end

  @impl :gen_statem
  # Tell :gen_statem that we want to use state_functions. 
  # That is: For each state we have a function with a matching name.
  # The alternative is to have one single callback handle all states: :handle_event_function
  def callback_mode(), do: :state_functions

  # The public API. The second argument to :gen_statem is the event that occured
  def push(pid), do: :gen_statem.call(pid, :push)

  # A state handling function. 
  # This handles calls not casts - so someone is waiting for a reply
  # And it matches on the :push event
  def off({:call, from}, :push, data),
    # Returns from state handlers take the form of
    # What should happen next re. the state of the machine: :next_state, :keep_state, :stop
    # Here we move to the next state (:off) and pass on the data
    #
    # We also return a list of Actions - think side-effect: Like returning a reply
    do: {:next_state, :on, data, [{:reply, from, "Turning on"}]}

  # State handling function for when we get a :push event and we're in the :off state
  def on({:call, from}, :push, data),
    do: {:next_state, :off, data, [{:reply, from, "Turning off"}]}
end
ExUnit.start(autorun: false)

defmodule ToggleButtonTest do
  use ExUnit.Case, async: true

  describe "Toggle buttons can be toggled on and off" do
    setup [:start_toggle_button_state_machine]

    test "pushing the button once turns it on", %{pid: pid} do
      assert ToggleButton.push(pid) == "Turning on"
    end

    test "pushing the button twice leaves turned off", %{pid: pid} do
      ToggleButton.push(pid)
      assert ToggleButton.push(pid) == "Turning off"
    end
  end

  defp start_toggle_button_state_machine(_context) do
    # `start_link_supervised!/2` is _great_. Process crashes will fail the test.
    # Automatic cleanup, so no test pollution
    pid =
      start_link_supervised!(%{
        id: ToggleButton,
        start: {ToggleButton, :start_link, []}
      })

    %{pid: pid}
  end
end

ExUnit.run()

Message Parsing and Protocols

Example: The BITS Packet Format

> Every packet begins with a standard header: the first three bits encode the packet version, and the next three bits encode the packet type ID. These two values are numbers; all numbers encoded in any packet are represented as binary with the most significant bit first. For example, a version encoded as the binary sequence 100 represents the number 4. > > — Advent of Code 2022, Day 16

The Setup

    test "parser is ready once stream and then sink have been attached", %{
      pid: pid,
      stream: stream
    } do
      {:waiting, "Waiting for sink"} = BitsParser.attach_source(pid, stream)
      {:ok, "Ready"} = BitsParser.attach_sink(pid, self())
    end

    test "parser is ready once sink and then stream have been attached", %{
      pid: pid,
      stream: stream
    } do
      {:waiting, "Waiting for source"} = BitsParser.attach_sink(pid, self())
      {:ok, "Ready"} = BitsParser.attach_source(pid, stream)
    end

Machines within machines within machines…..

defmodule BitsParser do
  def start_link() do
    :gen_statem.start_link(WaitForAttachmentsStateMachine, [], [])
  end

  def attach_source(pid, stream) do
    :gen_statem.call(pid, {:attach_source, stream})
  end

  def attach_sink(pid, sink) do
    :gen_statem.call(pid, {:attach_sink, sink})
  end
end
defmodule WaitForAttachmentsStateMachine do
  @behaviour :gen_statem

  @impl :gen_statem
  def init(_args) do
    {:ok, :idle, %{}}
  end

  @impl :gen_statem
  def callback_mode(), do: :state_functions

  def idle({:call, from}, {:attach_source, stream}, data) do
    new_data = Map.put(data, :source, stream)

    {:next_state, :waiting_for_sink, new_data, [{:reply, from, {:waiting, "Waiting for sink"}}]}
  end

  def idle({:call, from}, {:attach_sink, pid}, data) do
    new_data = Map.put(data, :sink, pid)

    {:next_state, :waiting_for_source, new_data,
     [{:reply, from, {:waiting, "Waiting for source"}}]}
  end

  def waiting_for_source({:call, from}, {:attach_source, source}, data) do
    new_data = Map.put(data, :source, source)

    handle_ready(from, new_data)
  end

  def waiting_for_sink({:call, from}, {:attach_sink, pid}, data) do
    new_data = Map.put(data, :sink, pid)

    handle_ready(from, new_data)
  end

  defp handle_ready(from, data) do
    {:next_state, :ready, data,
     [
       # Switch state machines
       {:change_callback_module, BitsPackageParserStateMachine},
       # Send an event to get the next state machine running
       {:next_event, :internal, :start_parsing},
       # Reply
       {:reply, from, {:ok, "Ready"}}
     ]}
  end
end

Split Field Parsing from Handling Packet Structure

defmodule BitsFieldParsers do
  @moduledoc false

  def parse_version_field(<>) do
    version
  end

  def parse_type_field(<>) do
    case field_type do
      0 -> :sum
      1 -> :product
      2 -> :min
      3 -> :max
      4 -> :literal
      5 -> :greater_than
      6 -> :less_than
      7 -> :equals
    end
  end

  def parse_sub_packet_length_type(<<0::1>>),
    do: :bit_length

  def parse_sub_packet_length_type(<<1::1>>),
    do: :sub_packet_count

  def parse_sub_packet_length_type_11(<>), do: length
end
ExUnit.start(autorun: false)

defmodule StateMachinesTest do
  use ExUnit.Case, async: true

  test "it can parse the version field" do
    assert BitsFieldParsers.parse_version_field(<<1::1, 1::1, 0::1>>) == 6
  end

  test "it can parse the type field into atoms" do
    assert BitsFieldParsers.parse_type_field(<<0::1, 0::1, 0::1>>) == :sum
    assert BitsFieldParsers.parse_type_field(<<0::1, 0::1, 1::1>>) == :product
    assert BitsFieldParsers.parse_type_field(<<0::1, 1::1, 0::1>>) == :min
    assert BitsFieldParsers.parse_type_field(<<0::1, 1::1, 1::1>>) == :max
    assert BitsFieldParsers.parse_type_field(<<1::1, 0::1, 0::1>>) == :literal
    assert BitsFieldParsers.parse_type_field(<<1::1, 0::1, 1::1>>) == :greater_than
    assert BitsFieldParsers.parse_type_field(<<1::1, 1::1, 0::1>>) == :less_than
    assert BitsFieldParsers.parse_type_field(<<1::1, 1::1, 1::1>>) == :equals
  end

  test "it can parse sub-packets length indicator type 15" do
    assert BitsFieldParsers.parse_sub_packet_length_type(<<0::1>>) ==
             :bit_length
  end

  test "it can parse sub-packets length indicator type 11" do
    assert BitsFieldParsers.parse_sub_packet_length_type(<<1::1>>) ==
             :sub_packet_count
  end

  test "it can parse sub-packets length of length type 11" do
    assert BitsFieldParsers.parse_sub_packet_length_type_11(<<0b10000000001::11>>) ==
             1025
  end
end

ExUnit.run()

Setting up the tests

defmodule BitsParserTest do
  use ExUnit.Case, async: true

  describe "Bits Protocol Parser" do
    setup [:start_bits_parser, :setup_sample_data_input_stream]

    test "a parsed package is returned", %{pid: pid, stream: stream} do
      # Using the test process as the sink
      {:waiting, "Waiting for source"} = BitsParser.attach_sink(pid, self())
      {:ok, "Ready"} = BitsParser.attach_source(pid, stream)

      # Use assert_receive to assert that BitsParser sent us a parsed packet. 
       assert_receive {:package, %{version: 7, type: :max, sub_packet_count: 3}}
    end
  end

  defp start_bits_parser(_context) do
    pid =
      start_link_supervised!(%{
        id: BitsParser,
        start: {BitsParser, :start_link, []}
      })

    %{pid: pid}
  end

  defp setup_sample_data_input_stream(_context) do
    # Example grabbed directly from the Stream.resource docs
    stream =
      Stream.resource(
        fn ->
          {:ok, pid} = StringIO.open("11101110000000001101010000001100100000100011000001100000")
          pid
        end,
        fn pid ->
          case IO.getn(pid, "", 1) do
            :eof -> {:halt, pid}
            # Really horrible way of turning the char into a one-bit bitstring
            # Open to suggestions 😂
            char -> {[< Integer.parse(2) |> elem(0)::1>>], pid}
          end
        end,
        fn pid -> StringIO.close(pid) end
      )

    %{stream: stream}
  end
end

And Finally: The Parsing StateMachine

defmodule BitsPackageParserStateMachine do
  @behaviour :gen_statem

  # Does not actually get called when you set module as callback module
  # on an already running StateMachine
  @impl :gen_statem
  def init(_args) do
    {:ok, :ready, nil}
  end

  @impl :gen_statem
  def callback_mode(), do: :state_functions

  # This is the state we hit with the "hand over" event
  def ready(:internal, :start_parsing, data) do
    # First part of packet is the version field. Use action to push us to that state. 
    {:next_state, :parse_version, data, [{:next_event, :internal, :continue_parse}]}
  end

  def parse_version(:internal, _, data) do
    # Grab three bits from the Stream
    # StreamSplit is super need for when you need to read trough a strem
    {[bit1, bit2, bit3], tail} = StreamSplit.take_and_drop(data.source, 3)

    version_parsed =
      BitsFieldParsers.parse_version_field(<>)

    updated_data =
      data
      # Update the state with the "forwareded" stream
      |> Map.put(:source, tail)
      # Start building the new packet
      |> Map.put(:current_packet, %{version: version_parsed})

    # Next step is parsing the packet type
    {:next_state, :parse_type, updated_data, [{:next_event, :internal, :continue_parse}]}
  end

  def parse_type(:internal, _, data) do
    # Type is three bits
    {[bit1, bit2, bit3], tail} = StreamSplit.take_and_drop(data.source, 3)

    type_parsed =
      BitsFieldParsers.parse_type_field(<>)

    updated_data =
      data
      # Remember to send the "forwarded" stream forward
      |> Map.put(:source, tail)
      # Update the current packet
      |> put_in([:current_packet, :type], type_parsed)

    case type_parsed do
      :literal ->
        raise "we'll handle these later"

      _ ->
        # Next we need to figure out which lenght type this packet has
        {:next_state, :parse_length_type, updated_data,
         [{:next_event, :internal, :continue_parse}]}
    end
  end

  def parse_length_type(:internal, _, data) do
    # Length indicator is just one bit
    {[bit1], tail} = StreamSplit.take_and_drop(data.source, 1)

    length_type = BitsFieldParsers.parse_sub_packet_length_type(bit1)

    updated_data =
      data
      # Remember to send the "forwarded" stream forward
      |> Map.put(:source, tail)

    # Select the next state based on which lenght type we're looking at
    case length_type do
      :bit_length ->
        {:next_state, :parse_length_type_15, updated_data,
         [{:next_event, :internal, :continue_parse}]}

      :sub_packet_count ->
        {:next_state, :parse_length_type_11, updated_data,
         [{:next_event, :internal, :continue_parse}]}
    end
  end

  def parse_length_type_11(:internal, _, data) do
    {[bit1, bit2, bit3, bit4, bit5, bit6, bit7, bit8, bit9, bit10, bit11], tail} =
      StreamSplit.take_and_drop(data.source, 11)

    # Yeah, this looks dumb... and proably is :)
    length =
      BitsFieldParsers.parse_sub_packet_length_type_11(<<
        bit1::bitstring,
        bit2::bitstring,
        bit3::bitstring,
        bit4::bitstring,
        bit5::bitstring,
        bit6::bitstring,
        bit7::bitstring,
        bit8::bitstring,
        bit9::bitstring,
        bit10::bitstring,
        bit11::bitstring
      >>)

    updated_data =
      data
      |> Map.put(:source, tail)
      |> put_in([:current_packet, :sub_packet_count], length)

    # Pretend that we're done and send the parsed packet downstream
    send(data.sink, {:package, updated_data.current_packet})

    # Transition to next state... but this is where we stop today
    {:keep_state_and_data, []}
  end

  def parse_length_type_15(:internal, _, _data) do
    raise "Not implemented"
  end
end

Recap

Seperating concerns

Seperate the concerns of parsing a field from the parsing the packet.

Easier to test

Field parsers will mostly be pure functions. And small. And easy to test 🎉

The state functions are just functions. You should unit test them.

Is it instantaneously decodable?

If the parser can be described as a StateMachine then you can start parsing as soon as you have the first bits. This can make a difference.

ExUnit is just awesome

  • start_link_supervised
  • assert_recieve / assert_recieved

Additional Resources

Official Docs

Other Resouces