:gen_statem is Awesome
State Machines
Toggle Button State Machine
Event Driven State Machines
> For an Event-Driven State Machine, the input is an event that triggers a state transition and the output is actions executed during the state transition > > — :gen_statem Design Principles
Example: Toggle Button
defmodule ToggleButton do
@behaviour :gen_statem
def start_link do
:gen_statem.start_link(__MODULE__, [], [])
end
@impl :gen_statem
def init(_opts) do
# return :ok and the initial state (:off) and some empty data
{:ok, :off, nil}
end
@impl :gen_statem
# Tell :gen_statem that we want to use state_functions.
# That is: For each state we have a function with a matching name.
# The alternative is to have one single callback handle all states: :handle_event_function
def callback_mode(), do: :state_functions
# The public API. The second argument to :gen_statem is the event that occured
def push(pid), do: :gen_statem.call(pid, :push)
# A state handling function.
# This handles calls not casts - so someone is waiting for a reply
# And it matches on the :push event
def off({:call, from}, :push, data),
# Returns from state handlers take the form of
# What should happen next re. the state of the machine: :next_state, :keep_state, :stop
# Here we move to the next state (:off) and pass on the data
#
# We also return a list of Actions - think side-effect: Like returning a reply
do: {:next_state, :on, data, [{:reply, from, "Turning on"}]}
# State handling function for when we get a :push event and we're in the :off state
def on({:call, from}, :push, data),
do: {:next_state, :off, data, [{:reply, from, "Turning off"}]}
end
ExUnit.start(autorun: false)
defmodule ToggleButtonTest do
use ExUnit.Case, async: true
describe "Toggle buttons can be toggled on and off" do
setup [:start_toggle_button_state_machine]
test "pushing the button once turns it on", %{pid: pid} do
assert ToggleButton.push(pid) == "Turning on"
end
test "pushing the button twice leaves turned off", %{pid: pid} do
ToggleButton.push(pid)
assert ToggleButton.push(pid) == "Turning off"
end
end
defp start_toggle_button_state_machine(_context) do
# `start_link_supervised!/2` is _great_. Process crashes will fail the test.
# Automatic cleanup, so no test pollution
pid =
start_link_supervised!(%{
id: ToggleButton,
start: {ToggleButton, :start_link, []}
})
%{pid: pid}
end
end
ExUnit.run()
Message Parsing and Protocols
Example: The BITS Packet Format
> Every packet begins with a standard header: the first three bits encode the packet version, and the next three bits encode the packet type ID. These two values are numbers; all numbers encoded in any packet are represented as binary with the most significant bit first. For example, a version encoded as the binary sequence 100 represents the number 4. > > — Advent of Code 2022, Day 16
The Setup
test "parser is ready once stream and then sink have been attached", %{
pid: pid,
stream: stream
} do
{:waiting, "Waiting for sink"} = BitsParser.attach_source(pid, stream)
{:ok, "Ready"} = BitsParser.attach_sink(pid, self())
end
test "parser is ready once sink and then stream have been attached", %{
pid: pid,
stream: stream
} do
{:waiting, "Waiting for source"} = BitsParser.attach_sink(pid, self())
{:ok, "Ready"} = BitsParser.attach_source(pid, stream)
end
Machines within machines within machines…..
defmodule BitsParser do
def start_link() do
:gen_statem.start_link(WaitForAttachmentsStateMachine, [], [])
end
def attach_source(pid, stream) do
:gen_statem.call(pid, {:attach_source, stream})
end
def attach_sink(pid, sink) do
:gen_statem.call(pid, {:attach_sink, sink})
end
end
defmodule WaitForAttachmentsStateMachine do
@behaviour :gen_statem
@impl :gen_statem
def init(_args) do
{:ok, :idle, %{}}
end
@impl :gen_statem
def callback_mode(), do: :state_functions
def idle({:call, from}, {:attach_source, stream}, data) do
new_data = Map.put(data, :source, stream)
{:next_state, :waiting_for_sink, new_data, [{:reply, from, {:waiting, "Waiting for sink"}}]}
end
def idle({:call, from}, {:attach_sink, pid}, data) do
new_data = Map.put(data, :sink, pid)
{:next_state, :waiting_for_source, new_data,
[{:reply, from, {:waiting, "Waiting for source"}}]}
end
def waiting_for_source({:call, from}, {:attach_source, source}, data) do
new_data = Map.put(data, :source, source)
handle_ready(from, new_data)
end
def waiting_for_sink({:call, from}, {:attach_sink, pid}, data) do
new_data = Map.put(data, :sink, pid)
handle_ready(from, new_data)
end
defp handle_ready(from, data) do
{:next_state, :ready, data,
[
# Switch state machines
{:change_callback_module, BitsPackageParserStateMachine},
# Send an event to get the next state machine running
{:next_event, :internal, :start_parsing},
# Reply
{:reply, from, {:ok, "Ready"}}
]}
end
end
Split Field Parsing from Handling Packet Structure
defmodule BitsFieldParsers do
@moduledoc false
def parse_version_field(<>) do
version
end
def parse_type_field(<>) do
case field_type do
0 -> :sum
1 -> :product
2 -> :min
3 -> :max
4 -> :literal
5 -> :greater_than
6 -> :less_than
7 -> :equals
end
end
def parse_sub_packet_length_type(<<0::1>>),
do: :bit_length
def parse_sub_packet_length_type(<<1::1>>),
do: :sub_packet_count
def parse_sub_packet_length_type_11(<>), do: length
end
ExUnit.start(autorun: false)
defmodule StateMachinesTest do
use ExUnit.Case, async: true
test "it can parse the version field" do
assert BitsFieldParsers.parse_version_field(<<1::1, 1::1, 0::1>>) == 6
end
test "it can parse the type field into atoms" do
assert BitsFieldParsers.parse_type_field(<<0::1, 0::1, 0::1>>) == :sum
assert BitsFieldParsers.parse_type_field(<<0::1, 0::1, 1::1>>) == :product
assert BitsFieldParsers.parse_type_field(<<0::1, 1::1, 0::1>>) == :min
assert BitsFieldParsers.parse_type_field(<<0::1, 1::1, 1::1>>) == :max
assert BitsFieldParsers.parse_type_field(<<1::1, 0::1, 0::1>>) == :literal
assert BitsFieldParsers.parse_type_field(<<1::1, 0::1, 1::1>>) == :greater_than
assert BitsFieldParsers.parse_type_field(<<1::1, 1::1, 0::1>>) == :less_than
assert BitsFieldParsers.parse_type_field(<<1::1, 1::1, 1::1>>) == :equals
end
test "it can parse sub-packets length indicator type 15" do
assert BitsFieldParsers.parse_sub_packet_length_type(<<0::1>>) ==
:bit_length
end
test "it can parse sub-packets length indicator type 11" do
assert BitsFieldParsers.parse_sub_packet_length_type(<<1::1>>) ==
:sub_packet_count
end
test "it can parse sub-packets length of length type 11" do
assert BitsFieldParsers.parse_sub_packet_length_type_11(<<0b10000000001::11>>) ==
1025
end
end
ExUnit.run()
Setting up the tests
defmodule BitsParserTest do
use ExUnit.Case, async: true
describe "Bits Protocol Parser" do
setup [:start_bits_parser, :setup_sample_data_input_stream]
test "a parsed package is returned", %{pid: pid, stream: stream} do
# Using the test process as the sink
{:waiting, "Waiting for source"} = BitsParser.attach_sink(pid, self())
{:ok, "Ready"} = BitsParser.attach_source(pid, stream)
# Use assert_receive to assert that BitsParser sent us a parsed packet.
assert_receive {:package, %{version: 7, type: :max, sub_packet_count: 3}}
end
end
defp start_bits_parser(_context) do
pid =
start_link_supervised!(%{
id: BitsParser,
start: {BitsParser, :start_link, []}
})
%{pid: pid}
end
defp setup_sample_data_input_stream(_context) do
# Example grabbed directly from the Stream.resource docs
stream =
Stream.resource(
fn ->
{:ok, pid} = StringIO.open("11101110000000001101010000001100100000100011000001100000")
pid
end,
fn pid ->
case IO.getn(pid, "", 1) do
:eof -> {:halt, pid}
# Really horrible way of turning the char into a one-bit bitstring
# Open to suggestions 😂
char -> {[< Integer.parse(2) |> elem(0)::1>>], pid}
end
end,
fn pid -> StringIO.close(pid) end
)
%{stream: stream}
end
end
And Finally: The Parsing StateMachine
defmodule BitsPackageParserStateMachine do
@behaviour :gen_statem
# Does not actually get called when you set module as callback module
# on an already running StateMachine
@impl :gen_statem
def init(_args) do
{:ok, :ready, nil}
end
@impl :gen_statem
def callback_mode(), do: :state_functions
# This is the state we hit with the "hand over" event
def ready(:internal, :start_parsing, data) do
# First part of packet is the version field. Use action to push us to that state.
{:next_state, :parse_version, data, [{:next_event, :internal, :continue_parse}]}
end
def parse_version(:internal, _, data) do
# Grab three bits from the Stream
# StreamSplit is super need for when you need to read trough a strem
{[bit1, bit2, bit3], tail} = StreamSplit.take_and_drop(data.source, 3)
version_parsed =
BitsFieldParsers.parse_version_field(<>)
updated_data =
data
# Update the state with the "forwareded" stream
|> Map.put(:source, tail)
# Start building the new packet
|> Map.put(:current_packet, %{version: version_parsed})
# Next step is parsing the packet type
{:next_state, :parse_type, updated_data, [{:next_event, :internal, :continue_parse}]}
end
def parse_type(:internal, _, data) do
# Type is three bits
{[bit1, bit2, bit3], tail} = StreamSplit.take_and_drop(data.source, 3)
type_parsed =
BitsFieldParsers.parse_type_field(<>)
updated_data =
data
# Remember to send the "forwarded" stream forward
|> Map.put(:source, tail)
# Update the current packet
|> put_in([:current_packet, :type], type_parsed)
case type_parsed do
:literal ->
raise "we'll handle these later"
_ ->
# Next we need to figure out which lenght type this packet has
{:next_state, :parse_length_type, updated_data,
[{:next_event, :internal, :continue_parse}]}
end
end
def parse_length_type(:internal, _, data) do
# Length indicator is just one bit
{[bit1], tail} = StreamSplit.take_and_drop(data.source, 1)
length_type = BitsFieldParsers.parse_sub_packet_length_type(bit1)
updated_data =
data
# Remember to send the "forwarded" stream forward
|> Map.put(:source, tail)
# Select the next state based on which lenght type we're looking at
case length_type do
:bit_length ->
{:next_state, :parse_length_type_15, updated_data,
[{:next_event, :internal, :continue_parse}]}
:sub_packet_count ->
{:next_state, :parse_length_type_11, updated_data,
[{:next_event, :internal, :continue_parse}]}
end
end
def parse_length_type_11(:internal, _, data) do
{[bit1, bit2, bit3, bit4, bit5, bit6, bit7, bit8, bit9, bit10, bit11], tail} =
StreamSplit.take_and_drop(data.source, 11)
# Yeah, this looks dumb... and proably is :)
length =
BitsFieldParsers.parse_sub_packet_length_type_11(<<
bit1::bitstring,
bit2::bitstring,
bit3::bitstring,
bit4::bitstring,
bit5::bitstring,
bit6::bitstring,
bit7::bitstring,
bit8::bitstring,
bit9::bitstring,
bit10::bitstring,
bit11::bitstring
>>)
updated_data =
data
|> Map.put(:source, tail)
|> put_in([:current_packet, :sub_packet_count], length)
# Pretend that we're done and send the parsed packet downstream
send(data.sink, {:package, updated_data.current_packet})
# Transition to next state... but this is where we stop today
{:keep_state_and_data, []}
end
def parse_length_type_15(:internal, _, _data) do
raise "Not implemented"
end
end
Recap
Seperating concerns
Seperate the concerns of parsing a field from the parsing the packet.
Easier to test
Field parsers will mostly be pure functions. And small. And easy to test 🎉
The state functions
are just functions. You should unit test them.
Is it instantaneously decodable?
If the parser can be described as a StateMachine
then you can start parsing as soon as you have the first bits
. This can make a difference.
ExUnit is just awesome
-
start_link_supervised
-
assert_recieve
/assert_recieved