Traffic Light
Mix.install(
[
{:coloured_flow, github: "Byzanteam/coloured_flow"},
{:kino, "~> 0.14.1"},
{:kino_excalidraw, "~> 0.5.0"}
],
config: [
coloured_flow: [
{
ColouredFlow.Runner.Storage,
[
storage: ColouredFlow.Runner.Storage.InMemory
]
}
]
]
)
Coloured Petri Net
Kino.nothing()
Kino.nothing()
defmodule TrafficLight do
use ColouredFlow.DSL, task_supervisor: TrafficLight.TaskSup
alias ColouredFlow.Runner.Enactment.WorkitemTransition
alias ColouredFlow.Runner.Storage
name "TrafficLight"
colset signal() :: {}
var s :: signal()
place :red_ew, :signal
place :green_ew, :signal
place :yellow_ew, :signal
place :red_ns, :signal
place :green_ns, :signal
place :yellow_ns, :signal
place :safe_ew, :signal
place :safe_ns, :signal
initial_marking :red_ew, ~MS[{}]
initial_marking :red_ns, ~MS[{}]
initial_marking :safe_ew, ~MS[{}]
transition :turn_green_ew do
input :red_ew, bind({1, s})
input :safe_ew, bind({1, s})
output :green_ew, {1, s}
action do
TrafficLight.render(options[:frames], event.markings)
:timer.sleep(10_000)
TrafficLight.drive_next(event.enactment_id, "turn_yellow_ew")
end
end
transition :turn_yellow_ew do
input :green_ew, bind({1, s})
output :yellow_ew, {1, s}
action do
TrafficLight.render(options[:frames], event.markings)
:timer.sleep(3_000)
TrafficLight.drive_next(event.enactment_id, "turn_red_ew")
end
end
transition :turn_red_ew do
input :yellow_ew, bind({1, s})
output :red_ew, {1, s}
output :safe_ns, {1, s}
action do
TrafficLight.render(options[:frames], event.markings)
TrafficLight.drive_next(event.enactment_id, "turn_green_ns")
end
end
transition :turn_green_ns do
input :red_ns, bind({1, s})
input :safe_ns, bind({1, s})
output :green_ns, {1, s}
action do
TrafficLight.render(options[:frames], event.markings)
:timer.sleep(10_000)
TrafficLight.drive_next(event.enactment_id, "turn_yellow_ns")
end
end
transition :turn_yellow_ns do
input :green_ns, bind({1, s})
output :yellow_ns, {1, s}
action do
TrafficLight.render(options[:frames], event.markings)
:timer.sleep(3_000)
TrafficLight.drive_next(event.enactment_id, "turn_red_ns")
end
end
transition :turn_red_ns do
input :yellow_ns, bind({1, s})
output :red_ns, {1, s}
output :safe_ew, {1, s}
action do
TrafficLight.render(options[:frames], event.markings)
TrafficLight.drive_next(event.enactment_id, "turn_green_ew")
end
end
on_enactment_start do
TrafficLight.render(options[:frames], event.markings)
TrafficLight.drive_next(event.enactment_id, "turn_green_ew")
end
def to_mermaid do
cpnet()
|> ColouredFlow.Definition.Presentation.to_mermaid()
|> Kino.Mermaid.new()
end
def to_kino do
lights =
for color <- [:red, :yellow, :green], dir <- [:ew, :ns] do
{"#{color}_#{dir}", Kino.Frame.new(placeholder: false)}
end
headers = [Kino.Text.new("EW"), Kino.Text.new("NS")]
grid = Kino.Layout.grid(headers ++ Keyword.values(lights), columns: 2)
{grid, lights}
end
@doc false
def render(nil, _markings), do: :ok
def render(frames, markings) when is_map(frames) do
occupied = MapSet.new(Map.keys(markings))
Enum.each(frames, fn {place_name, frame} ->
Kino.Frame.render(frame, light_symbol(place_name, MapSet.member?(occupied, place_name)))
end)
end
defp light_symbol(_place, false), do: Kino.Text.new("⚫️", terminal: true)
defp light_symbol(place_name, true) do
emoji =
case place_name do
"red_" <> _ -> "🔴"
"yellow_" <> _ -> "🟡"
"green_" <> _ -> "🟢"
_ -> "⚫️"
end
Kino.Text.new(emoji, terminal: true)
end
@doc false
def drive_next(enactment_id, transition_name) when is_binary(transition_name) do
enactment_id
|> Storage.list_live_workitems()
|> Enum.find(fn wi ->
wi.state == :enabled and wi.binding_element.transition == transition_name
end)
|> case do
nil ->
:ok
%{id: workitem_id} ->
{:ok, _started} = WorkitemTransition.start_workitem(enactment_id, workitem_id)
{:ok, _completed} = WorkitemTransition.complete_workitem(enactment_id, {workitem_id, []})
:ok
end
end
end
Prepare the storage and supervisor
storage_pid = Kino.start_child!(ColouredFlow.Runner.Storage.InMemory)
Kino.inspect("Storage started: #{inspect(storage_pid)}")
supervisor_pid = Kino.start_child!(ColouredFlow.Runner.Supervisor)
Kino.inspect("Runner supervisor started: #{inspect(supervisor_pid)}")
:ok
defmodule TrafficLight.Supervisor do
use Supervisor
@spec start_link(keyword()) :: Supervisor.on_start()
def start_link(init_arg \\ []) when is_list(init_arg) do
Process.whereis(__MODULE__) && Supervisor.stop(__MODULE__)
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl Supervisor
def init(_init_arg) do
Supervisor.init(
[{Task.Supervisor, name: TrafficLight.TaskSup}],
strategy: :one_for_one
)
end
end
Run
import ColouredFlow.Runner.Storage.InMemory, only: :macros
alias ColouredFlow.Runner.Storage.InMemory
flow = InMemory.insert_flow!(TrafficLight.cpnet())
{:ok, enactment_record} = TrafficLight.insert_enactment(flow(flow, :id))
enactment_id = enactment(enactment_record, :id)
{grid, lights} = TrafficLight.to_kino()
frames = Map.new(lights)
Kino.start_child!(TrafficLight.Supervisor)
{:ok, enactment_pid} =
TrafficLight.start_enactment(enactment_id,
lifecycle_hooks: {TrafficLight, [frames: frames]}
)
Kino.inspect("Enactment is running: #{inspect(enactment_pid)}")
grid