Powered by AppSignal & Oban Pro

Traffic Light

examples/traffic_light.livemd

Traffic Light

Mix.install(
  [
    {:coloured_flow, github: "Byzanteam/coloured_flow"},
    {:kino, "~> 0.14.1"},
    {:kino_excalidraw, "~> 0.5.0"}
  ],
  config: [
    coloured_flow: [
      {
        ColouredFlow.Runner.Storage,
        [
          storage: ColouredFlow.Runner.Storage.InMemory
        ]
      }
    ]
  ]
)

Coloured Petri Net

Kino.nothing()
Kino.nothing()
defmodule TrafficLight do
  use ColouredFlow.DSL, task_supervisor: TrafficLight.TaskSup

  alias ColouredFlow.Runner.Enactment.WorkitemTransition
  alias ColouredFlow.Runner.Storage

  name "TrafficLight"

  colset signal() :: {}

  var s :: signal()

  place :red_ew, :signal
  place :green_ew, :signal
  place :yellow_ew, :signal
  place :red_ns, :signal
  place :green_ns, :signal
  place :yellow_ns, :signal
  place :safe_ew, :signal
  place :safe_ns, :signal

  initial_marking :red_ew, ~MS[{}]
  initial_marking :red_ns, ~MS[{}]
  initial_marking :safe_ew, ~MS[{}]

  transition :turn_green_ew do
    input :red_ew, bind({1, s})
    input :safe_ew, bind({1, s})
    output :green_ew, {1, s}

    action do
      TrafficLight.render(options[:frames], event.markings)
      :timer.sleep(10_000)
      TrafficLight.drive_next(event.enactment_id, "turn_yellow_ew")
    end
  end

  transition :turn_yellow_ew do
    input :green_ew, bind({1, s})
    output :yellow_ew, {1, s}

    action do
      TrafficLight.render(options[:frames], event.markings)
      :timer.sleep(3_000)
      TrafficLight.drive_next(event.enactment_id, "turn_red_ew")
    end
  end

  transition :turn_red_ew do
    input :yellow_ew, bind({1, s})
    output :red_ew, {1, s}
    output :safe_ns, {1, s}

    action do
      TrafficLight.render(options[:frames], event.markings)
      TrafficLight.drive_next(event.enactment_id, "turn_green_ns")
    end
  end

  transition :turn_green_ns do
    input :red_ns, bind({1, s})
    input :safe_ns, bind({1, s})
    output :green_ns, {1, s}

    action do
      TrafficLight.render(options[:frames], event.markings)
      :timer.sleep(10_000)
      TrafficLight.drive_next(event.enactment_id, "turn_yellow_ns")
    end
  end

  transition :turn_yellow_ns do
    input :green_ns, bind({1, s})
    output :yellow_ns, {1, s}

    action do
      TrafficLight.render(options[:frames], event.markings)
      :timer.sleep(3_000)
      TrafficLight.drive_next(event.enactment_id, "turn_red_ns")
    end
  end

  transition :turn_red_ns do
    input :yellow_ns, bind({1, s})
    output :red_ns, {1, s}
    output :safe_ew, {1, s}

    action do
      TrafficLight.render(options[:frames], event.markings)
      TrafficLight.drive_next(event.enactment_id, "turn_green_ew")
    end
  end

  on_enactment_start do
    TrafficLight.render(options[:frames], event.markings)
    TrafficLight.drive_next(event.enactment_id, "turn_green_ew")
  end

  def to_mermaid do
    cpnet()
    |> ColouredFlow.Definition.Presentation.to_mermaid()
    |> Kino.Mermaid.new()
  end

  def to_kino do
    lights =
      for color <- [:red, :yellow, :green], dir <- [:ew, :ns] do
        {"#{color}_#{dir}", Kino.Frame.new(placeholder: false)}
      end

    headers = [Kino.Text.new("EW"), Kino.Text.new("NS")]
    grid = Kino.Layout.grid(headers ++ Keyword.values(lights), columns: 2)

    {grid, lights}
  end

  @doc false
  def render(nil, _markings), do: :ok

  def render(frames, markings) when is_map(frames) do
    occupied = MapSet.new(Map.keys(markings))

    Enum.each(frames, fn {place_name, frame} ->
      Kino.Frame.render(frame, light_symbol(place_name, MapSet.member?(occupied, place_name)))
    end)
  end

  defp light_symbol(_place, false), do: Kino.Text.new("⚫️", terminal: true)

  defp light_symbol(place_name, true) do
    emoji =
      case place_name do
        "red_" <> _ -> "🔴"
        "yellow_" <> _ -> "🟡"
        "green_" <> _ -> "🟢"
        _ -> "⚫️"
      end

    Kino.Text.new(emoji, terminal: true)
  end

  @doc false
  def drive_next(enactment_id, transition_name) when is_binary(transition_name) do
    enactment_id
    |> Storage.list_live_workitems()
    |> Enum.find(fn wi ->
      wi.state == :enabled and wi.binding_element.transition == transition_name
    end)
    |> case do
      nil ->
        :ok

      %{id: workitem_id} ->
        {:ok, _started} = WorkitemTransition.start_workitem(enactment_id, workitem_id)
        {:ok, _completed} = WorkitemTransition.complete_workitem(enactment_id, {workitem_id, []})
        :ok
    end
  end
end

Prepare the storage and supervisor

storage_pid = Kino.start_child!(ColouredFlow.Runner.Storage.InMemory)

Kino.inspect("Storage started: #{inspect(storage_pid)}")

supervisor_pid = Kino.start_child!(ColouredFlow.Runner.Supervisor)

Kino.inspect("Runner supervisor started: #{inspect(supervisor_pid)}")

:ok
defmodule TrafficLight.Supervisor do
  use Supervisor

  @spec start_link(keyword()) :: Supervisor.on_start()
  def start_link(init_arg \\ []) when is_list(init_arg) do
    Process.whereis(__MODULE__) &amp;&amp; Supervisor.stop(__MODULE__)
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl Supervisor
  def init(_init_arg) do
    Supervisor.init(
      [{Task.Supervisor, name: TrafficLight.TaskSup}],
      strategy: :one_for_one
    )
  end
end

Run

import ColouredFlow.Runner.Storage.InMemory, only: :macros

alias ColouredFlow.Runner.Storage.InMemory

flow = InMemory.insert_flow!(TrafficLight.cpnet())
{:ok, enactment_record} = TrafficLight.insert_enactment(flow(flow, :id))
enactment_id = enactment(enactment_record, :id)

{grid, lights} = TrafficLight.to_kino()
frames = Map.new(lights)

Kino.start_child!(TrafficLight.Supervisor)

{:ok, enactment_pid} =
  TrafficLight.start_enactment(enactment_id,
    lifecycle_hooks: {TrafficLight, [frames: frames]}
  )

Kino.inspect("Enactment is running: #{inspect(enactment_pid)}")

grid