Traffic Light
Mix.install(
[
{:coloured_flow, github: "Byzanteam/coloured_flow"},
{:kino, "~> 0.14.1"}
],
config: [
coloured_flow: [
{
ColouredFlow.Runner.Storage,
[
storage: ColouredFlow.Runner.Storage.InMemory
]
}
]
]
)
Coloured Petri Net
defmodule TrafficLight do
alias ColouredFlow.Definition.ColouredPetriNet
alias ColouredFlow.Definition.Place
alias ColouredFlow.Definition.Variable
alias ColouredFlow.Enactment.Marking
import ColouredFlow.Builder.DefinitionHelper
import ColouredFlow.Notation.Colset
import Record, only: [is_record: 2]
@flow %ColouredPetriNet{
colour_sets: [
colset(unit() :: {})
],
places:
Enum.map(
~w[red_ew green_ew yellow_ew red_ns green_ns yellow_ns safe_ew safe_ns],
fn name ->
%Place{name: name, colour_set: :unit}
end
),
transitions:
Enum.map(
~w[turn_red_ew turn_green_ew turn_yellow_ew turn_red_ns turn_green_ns turn_yellow_ns],
&build_transition!(name: &1)
),
arcs:
[
arc(turn_green_ew <~ red_ew :: "bind {1, u}"),
arc(turn_green_ew ~> green_ew :: "{1, u}"),
arc(turn_yellow_ew <~ green_ew :: "bind {1, u}"),
arc(turn_yellow_ew ~> yellow_ew :: "{1, u}"),
arc(turn_red_ew <~ yellow_ew :: "bind {1, u}"),
arc(turn_red_ew ~> red_ew :: "{1, u}")
] ++
[
arc(turn_green_ns <~ red_ns :: "bind {1, u}"),
arc(turn_green_ns ~> green_ns :: "{1, u}"),
arc(turn_yellow_ns <~ green_ns :: "bind {1, u}"),
arc(turn_yellow_ns ~> yellow_ns :: "{1, u}"),
arc(turn_red_ns <~ yellow_ns :: "bind {1, u}"),
arc(turn_red_ns ~> red_ns :: "{1, u}")
] ++
[
arc(turn_red_ns ~> safe_ew :: "{1, u}"),
arc(turn_green_ew <~ safe_ew :: "bind {1, u}"),
arc(turn_red_ew ~> safe_ns :: "{1, u}"),
arc(turn_green_ns <~ safe_ns :: "bind {1, u}")
],
variables: [
%Variable{name: :u, colour_set: :unit}
]
}
def to_mermaid do
@flow
|> ColouredFlow.Definition.Presentation.to_mermaid()
|> Kino.Mermaid.new()
end
def insert_flow do
ColouredFlow.Runner.Storage.InMemory.insert_flow!(@flow)
end
def insert_enactment(flow) when is_record(flow, :flow) do
import ColouredFlow.MultiSet, only: :sigils
ColouredFlow.Runner.Storage.InMemory.insert_enactment!(flow, [
%Marking{place: "red_ew", tokens: ~MS[{}]},
%Marking{place: "red_ns", tokens: ~MS[{}]},
%Marking{place: "safe_ew", tokens: ~MS[{}]}
])
end
def to_kino do
lights =
for color <- [:red, :yellow, :green], dir <- [:ew, :ns] do
{"#{color}_#{dir}", Kino.Frame.new(placeholder: false)}
end
grid = Kino.Layout.grid([EW, NS] ++ Keyword.values(lights), columns: 2)
{grid, lights}
end
end
TrafficLight.to_mermaid()
Prepare the storage and supervisor
storage_pid = Kino.start_child!(ColouredFlow.Runner.Storage.InMemory)
Kino.inspect("Storage started: #{inspect(storage_pid)}")
supervisor_pid = Kino.start_child!(ColouredFlow.Runner.Supervisor)
Kino.inspect("Runner supervisor started: #{inspect(supervisor_pid)}")
:ok
Workitem handler
defmodule TrafficLight.WorkitemPubSub do
use GenServer
alias ColouredFlow.Runner.Enactment.Workitem
def handle_event(
[:coloured_flow, :runner, :enactment, _transition],
_measurements,
%{enactment_id: enactment_id} = metadata,
%{enactment_id: enactment_id} = options
) do
publish_tick(metadata.enactment_state, options.lights)
end
def handle_event(
[:coloured_flow, :runner, :enactment, _operation, :stop],
measurements,
%{enactment_id: enactment_id} = metadata,
%{enactment_id: enactment_id} = options
) do
Enum.each(metadata.workitems, fn workitem ->
light = get_light(workitem, options.lights)
time = measurements |> Map.fetch!(:system_time) |> DateTime.from_unix!(:native)
Process.send(light, {:workitem, workitem, time}, [])
end)
publish_tick(metadata.enactment_state, options.lights)
end
def handle_event(
[:coloured_flow, :runner, :enactment, _operation, _event],
_measurements,
%{enactment_id: enactment_id} = metadata,
%{enactment_id: enactment_id} = options
) do
publish_tick(metadata.enactment_state, options.lights)
end
def handle_event(_event_name, _measurements, _metadata, _options) do
# ignore events from other enactments
:ok
end
defp publish_tick(enactment_state, lights) do
lights
|> Map.values()
|> Enum.each(fn light ->
Process.send(light, {:tick, enactment_state}, [])
end)
end
def start_link(init_arg) do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl GenServer
def init(init_arg) do
enactment_id = Keyword.fetch!(init_arg, :enactment_id)
ew_pid = Keyword.fetch!(init_arg, :ew_pid)
ns_pid = Keyword.fetch!(init_arg, :ns_pid)
lights = %{"ew" => ew_pid, "ns" => ns_pid}
state = %{enactment_id: enactment_id, lights: lights}
ColouredFlow.Runner.Telemetry.detach(__MODULE__)
:ok = ColouredFlow.Runner.Telemetry.attach(__MODULE__, &__MODULE__.handle_event/4, state)
{:ok, state}
end
defp get_light(%Workitem{} = workitem, lights) do
dir =
case workitem.binding_element.transition do
"turn_red_" <> dir -> dir
"turn_yellow_" <> dir -> dir
"turn_green_" <> dir -> dir
end
Map.fetch!(lights, dir)
end
end
defmodule TrafficLight.DirectionalLight do
use GenServer
alias ColouredFlow.Runner.Enactment.WorkitemTransition
alias ColouredFlow.Runner.Enactment.Workitem
def start_link(init_arg) when is_list(init_arg) do
{name, init_arg} = Keyword.pop!(init_arg, :name)
GenServer.start_link(__MODULE__, init_arg, name: name)
end
@impl GenServer
def init(init_arg) do
enactment_id = Keyword.fetch!(init_arg, :enactment_id)
direction = Keyword.fetch!(init_arg, :direction)
lights = Keyword.fetch!(init_arg, :lights)
{
:ok,
%{
enactment_id: enactment_id,
direction: direction,
lights: lights
}
}
end
@impl GenServer
def handle_info({:workitem, %Workitem{state: :enabled} = workitem, inserted_at}, state) do
started_workitem = start_workitem(state.enactment_id, workitem.id)
schedule_turn(started_workitem, inserted_at)
{:noreply, state}
end
def handle_info({:workitem, %Workitem{}, _time}, state) do
# ignore other workitems
{:noreply, state}
end
def handle_info({:turn, %Workitem{} = workitem}, state) do
{:ok, _completed_workitem} =
WorkitemTransition.complete_workitem(state.enactment_id, {workitem.id, []})
{:noreply, state}
end
def handle_info({:tick, enactment_state}, state) do
direction = Atom.to_string(state.direction)
marking_places =
enactment_state
|> Map.get(:markings)
|> Map.keys()
|> Enum.filter(&String.ends_with?(&1, direction))
control(marking_places, state.lights)
{:noreply, state}
end
defp start_workitem(enactment_id, workitem_id) do
{:ok, started_workitem} =
WorkitemTransition.start_workitem(enactment_id, workitem_id)
started_workitem
end
@color_delay %{
red: 3_000,
yellow: 10_000,
green: 0
}
defp schedule_turn(%Workitem{} = workitem, started_at) do
color = get_color(workitem)
delay = Map.fetch!(@color_delay, color)
deplay =
NaiveDateTime.utc_now()
|> NaiveDateTime.diff(started_at, :millisecond)
|> then(&Kernel.-(delay, &1))
|> Kernel.max(0)
Process.send_after(self(), {:turn, workitem}, deplay)
end
defp get_color(%Workitem{} = workitem) do
case workitem.binding_element.transition do
"turn_red" <> _dir -> :red
"turn_yellow" <> _dir -> :yellow
"turn_green" <> _dir -> :green
end
end
defp control(marking_places, lights) do
Enum.each(lights, fn {place_name, frame} ->
light_symbol = light_symbol(place_name, place_name in marking_places)
Kino.Frame.render(frame, light_symbol)
end)
end
defp light_symbol(color, on?)
defp light_symbol(color, true) do
emoji =
case color do
"red" <> _dir -> "🔴"
"yellow" <> _dir -> "🟡"
"green" <> _dir -> "🟢"
end
Kino.Text.new(emoji, terminal: true)
end
defp light_symbol(_color, false) do
Kino.Text.new("⚫️", terminal: true)
end
end
Run
import ColouredFlow.Runner.Storage.InMemory, only: :macros
flow = TrafficLight.insert_flow()
enactment = TrafficLight.insert_enactment(flow)
{grid, lights} = TrafficLight.to_kino()
enactment_id = enactment(enactment, :id)
groups =
Enum.group_by(lights, fn {name, _frame} ->
cond do
String.ends_with?(name, "ew") -> :ew
String.ends_with?(name, "ns") -> :ns
true -> raise "Unknown light #{inspect(name)}"
end
end)
light_options = [enactment_id: enactment_id]
ew_pid =
Kino.start_child!({
TrafficLight.DirectionalLight,
[
name: TrafficLight.EWLight,
enactment_id: enactment_id,
direction: :ew,
lights: groups.ew
]
})
ns_pid =
Kino.start_child!({
TrafficLight.DirectionalLight,
[
name: TrafficLight.NSLight,
enactment_id: enactment_id,
direction: :ns,
lights: groups.ns
]
})
Kino.start_child!({
TrafficLight.WorkitemPubSub,
[
enactment_id: enactment_id,
ew_pid: ew_pid,
ns_pid: ns_pid
]
})
{:ok, enactment_pid} =
ColouredFlow.Runner.Enactment.Supervisor.start_enactment(enactment_id)
Kino.inspect("Enactment is running: #{inspect(enactment_pid)}")
grid