Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Traffic Light

examples/traffic_light.livemd

Traffic Light

Mix.install(
  [
    {:coloured_flow, github: "Byzanteam/coloured_flow"},
    {:kino, "~> 0.14.1"}
  ],
  config: [
    coloured_flow: [
      {
        ColouredFlow.Runner.Storage,
        [
          storage: ColouredFlow.Runner.Storage.InMemory
        ]
      }
    ]
  ]
)

Coloured Petri Net

defmodule TrafficLight do
  alias ColouredFlow.Definition.ColouredPetriNet
  alias ColouredFlow.Definition.Place
  alias ColouredFlow.Definition.Variable
  alias ColouredFlow.Enactment.Marking

  import ColouredFlow.Builder.DefinitionHelper
  import ColouredFlow.Notation.Colset

  import Record, only: [is_record: 2]

  @flow %ColouredPetriNet{
    colour_sets: [
      colset(unit() :: {})
    ],
    places:
      Enum.map(
        ~w[red_ew green_ew yellow_ew red_ns green_ns yellow_ns safe_ew safe_ns],
        fn name ->
          %Place{name: name, colour_set: :unit}
        end
      ),
    transitions:
      Enum.map(
        ~w[turn_red_ew turn_green_ew turn_yellow_ew turn_red_ns turn_green_ns turn_yellow_ns],
        &build_transition!(name: &1)
      ),
    arcs:
      [
        arc(turn_green_ew <~ red_ew :: "bind {1, u}"),
        arc(turn_green_ew ~> green_ew :: "{1, u}"),
        arc(turn_yellow_ew <~ green_ew :: "bind {1, u}"),
        arc(turn_yellow_ew ~> yellow_ew :: "{1, u}"),
        arc(turn_red_ew <~ yellow_ew :: "bind {1, u}"),
        arc(turn_red_ew ~> red_ew :: "{1, u}")
      ] ++
        [
          arc(turn_green_ns <~ red_ns :: "bind {1, u}"),
          arc(turn_green_ns ~> green_ns :: "{1, u}"),
          arc(turn_yellow_ns <~ green_ns :: "bind {1, u}"),
          arc(turn_yellow_ns ~> yellow_ns :: "{1, u}"),
          arc(turn_red_ns <~ yellow_ns :: "bind {1, u}"),
          arc(turn_red_ns ~> red_ns :: "{1, u}")
        ] ++
        [
          arc(turn_red_ns ~> safe_ew :: "{1, u}"),
          arc(turn_green_ew <~ safe_ew :: "bind {1, u}"),
          arc(turn_red_ew ~> safe_ns :: "{1, u}"),
          arc(turn_green_ns <~ safe_ns :: "bind {1, u}")
        ],
    variables: [
      %Variable{name: :u, colour_set: :unit}
    ]
  }

  def to_mermaid do
    @flow
    |> ColouredFlow.Definition.Presentation.to_mermaid()
    |> Kino.Mermaid.new()
  end

  def insert_flow do
    ColouredFlow.Runner.Storage.InMemory.insert_flow!(@flow)
  end

  def insert_enactment(flow) when is_record(flow, :flow) do
    import ColouredFlow.MultiSet, only: :sigils

    ColouredFlow.Runner.Storage.InMemory.insert_enactment!(flow, [
      %Marking{place: "red_ew", tokens: ~MS[{}]},
      %Marking{place: "red_ns", tokens: ~MS[{}]},
      %Marking{place: "safe_ew", tokens: ~MS[{}]}
    ])
  end

  def to_kino do
    lights =
      for color <- [:red, :yellow, :green], dir <- [:ew, :ns] do
        {"#{color}_#{dir}", Kino.Frame.new(placeholder: false)}
      end

    grid = Kino.Layout.grid([EW, NS] ++ Keyword.values(lights), columns: 2)

    {grid, lights}
  end
end

TrafficLight.to_mermaid()

Prepare the storage and supervisor

storage_pid = Kino.start_child!(ColouredFlow.Runner.Storage.InMemory)

Kino.inspect("Storage started: #{inspect(storage_pid)}")

supervisor_pid = Kino.start_child!(ColouredFlow.Runner.Supervisor)

Kino.inspect("Runner supervisor started: #{inspect(supervisor_pid)}")

:ok

Workitem handler

defmodule TrafficLight.WorkitemPubSub do
  use GenServer

  alias ColouredFlow.Runner.Enactment.Workitem

  def handle_event(
        [:coloured_flow, :runner, :enactment, _transition],
        _measurements,
        %{enactment_id: enactment_id} = metadata,
        %{enactment_id: enactment_id} = options
      ) do
    publish_tick(metadata.enactment_state, options.lights)
  end

  def handle_event(
        [:coloured_flow, :runner, :enactment, _operation, :stop],
        measurements,
        %{enactment_id: enactment_id} = metadata,
        %{enactment_id: enactment_id} = options
      ) do
    Enum.each(metadata.workitems, fn workitem ->
      light = get_light(workitem, options.lights)
      time = measurements |> Map.fetch!(:system_time) |> DateTime.from_unix!(:native)

      Process.send(light, {:workitem, workitem, time}, [])
    end)

    publish_tick(metadata.enactment_state, options.lights)
  end

  def handle_event(
        [:coloured_flow, :runner, :enactment, _operation, _event],
        _measurements,
        %{enactment_id: enactment_id} = metadata,
        %{enactment_id: enactment_id} = options
      ) do
    publish_tick(metadata.enactment_state, options.lights)
  end

  def handle_event(_event_name, _measurements, _metadata, _options) do
    # ignore events from other enactments
    :ok
  end

  defp publish_tick(enactment_state, lights) do
    lights
    |> Map.values()
    |> Enum.each(fn light ->
      Process.send(light, {:tick, enactment_state}, [])
    end)
  end

  def start_link(init_arg) do
    GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl GenServer
  def init(init_arg) do
    enactment_id = Keyword.fetch!(init_arg, :enactment_id)
    ew_pid = Keyword.fetch!(init_arg, :ew_pid)
    ns_pid = Keyword.fetch!(init_arg, :ns_pid)

    lights = %{"ew" => ew_pid, "ns" => ns_pid}
    state = %{enactment_id: enactment_id, lights: lights}

    ColouredFlow.Runner.Telemetry.detach(__MODULE__)
    :ok = ColouredFlow.Runner.Telemetry.attach(__MODULE__, &amp;__MODULE__.handle_event/4, state)

    {:ok, state}
  end

  defp get_light(%Workitem{} = workitem, lights) do
    dir =
      case workitem.binding_element.transition do
        "turn_red_" <> dir -> dir
        "turn_yellow_" <> dir -> dir
        "turn_green_" <> dir -> dir
      end

    Map.fetch!(lights, dir)
  end
end
defmodule TrafficLight.DirectionalLight do
  use GenServer

  alias ColouredFlow.Runner.Enactment.WorkitemTransition
  alias ColouredFlow.Runner.Enactment.Workitem

  def start_link(init_arg) when is_list(init_arg) do
    {name, init_arg} = Keyword.pop!(init_arg, :name)

    GenServer.start_link(__MODULE__, init_arg, name: name)
  end

  @impl GenServer
  def init(init_arg) do
    enactment_id = Keyword.fetch!(init_arg, :enactment_id)
    direction = Keyword.fetch!(init_arg, :direction)
    lights = Keyword.fetch!(init_arg, :lights)

    {
      :ok,
      %{
        enactment_id: enactment_id,
        direction: direction,
        lights: lights
      }
    }
  end

  @impl GenServer
  def handle_info({:workitem, %Workitem{state: :enabled} = workitem, inserted_at}, state) do
    started_workitem = start_workitem(state.enactment_id, workitem.id)

    schedule_turn(started_workitem, inserted_at)

    {:noreply, state}
  end

  def handle_info({:workitem, %Workitem{}, _time}, state) do
    # ignore other workitems

    {:noreply, state}
  end

  def handle_info({:turn, %Workitem{} = workitem}, state) do
    {:ok, _completed_workitem} =
      WorkitemTransition.complete_workitem(state.enactment_id, {workitem.id, []})

    {:noreply, state}
  end

  def handle_info({:tick, enactment_state}, state) do
    direction = Atom.to_string(state.direction)

    marking_places =
      enactment_state
      |> Map.get(:markings)
      |> Map.keys()
      |> Enum.filter(&amp;String.ends_with?(&amp;1, direction))

    control(marking_places, state.lights)

    {:noreply, state}
  end

  defp start_workitem(enactment_id, workitem_id) do
    {:ok, started_workitem} =
      WorkitemTransition.start_workitem(enactment_id, workitem_id)

    started_workitem
  end

  @color_delay %{
    red: 3_000,
    yellow: 10_000,
    green: 0
  }

  defp schedule_turn(%Workitem{} = workitem, started_at) do
    color = get_color(workitem)
    delay = Map.fetch!(@color_delay, color)

    deplay =
      NaiveDateTime.utc_now()
      |> NaiveDateTime.diff(started_at, :millisecond)
      |> then(&amp;Kernel.-(delay, &amp;1))
      |> Kernel.max(0)

    Process.send_after(self(), {:turn, workitem}, deplay)
  end

  defp get_color(%Workitem{} = workitem) do
    case workitem.binding_element.transition do
      "turn_red" <> _dir -> :red
      "turn_yellow" <> _dir -> :yellow
      "turn_green" <> _dir -> :green
    end
  end

  defp control(marking_places, lights) do
    Enum.each(lights, fn {place_name, frame} ->
      light_symbol = light_symbol(place_name, place_name in marking_places)

      Kino.Frame.render(frame, light_symbol)
    end)
  end

  defp light_symbol(color, on?)

  defp light_symbol(color, true) do
    emoji =
      case color do
        "red" <> _dir -> "🔴"
        "yellow" <> _dir -> "🟡"
        "green" <> _dir -> "🟢"
      end

    Kino.Text.new(emoji, terminal: true)
  end

  defp light_symbol(_color, false) do
    Kino.Text.new("⚫️", terminal: true)
  end
end

Run

import ColouredFlow.Runner.Storage.InMemory, only: :macros

flow = TrafficLight.insert_flow()
enactment = TrafficLight.insert_enactment(flow)

{grid, lights} = TrafficLight.to_kino()

enactment_id = enactment(enactment, :id)

groups =
  Enum.group_by(lights, fn {name, _frame} ->
    cond do
      String.ends_with?(name, "ew") -> :ew
      String.ends_with?(name, "ns") -> :ns
      true -> raise "Unknown light #{inspect(name)}"
    end
  end)

light_options = [enactment_id: enactment_id]

ew_pid =
  Kino.start_child!({
    TrafficLight.DirectionalLight,
    [
      name: TrafficLight.EWLight,
      enactment_id: enactment_id,
      direction: :ew,
      lights: groups.ew
    ]
  })

ns_pid =
  Kino.start_child!({
    TrafficLight.DirectionalLight,
    [
      name: TrafficLight.NSLight,
      enactment_id: enactment_id,
      direction: :ns,
      lights: groups.ns
    ]
  })

Kino.start_child!({
  TrafficLight.WorkitemPubSub,
  [
    enactment_id: enactment_id,
    ew_pid: ew_pid,
    ns_pid: ns_pid
  ]
})

{:ok, enactment_pid} =
  ColouredFlow.Runner.Enactment.Supervisor.start_enactment(enactment_id)

Kino.inspect("Enactment is running: #{inspect(enactment_pid)}")

grid