Powered by AppSignal & Oban Pro

Forge Service Dashboard

elixir/service_dashboard.livemd

Forge Service Dashboard

# Livebook setup - copy this entire cell to run
Mix.install([
  {:zenohex, "~> 0.7.2"}
])

Service Dashboard

Monitor active Zenoh liveliness tokens in the Forge fabric.

defmodule Forge.ServiceDashboard do
  def run() do
    {:ok, session} = Zenohex.open()

    # Subscribe to liveliness queries under "forge/services/**"
    liveliness_subscriber = Zenohex.Session.declare_subscriber(session, "forge/services/**", liveliness: true)

    IO.puts("Forge Service Dashboard")
    IO.puts("========================")
    IO.puts("Active AI Services:")
    IO.puts("")

    # Initial query to get current liveliness tokens
    query_current_liveliness(session)

    # Listen for changes (in a loop, printing updates)
    loop(liveliness_subscriber)
  end

  defp query_current_liveliness(session) do
    # To get current state, we can query for liveliness
    queryable = Zenohex.Session.declare_queryable(session, "forge/services/dashboard-query")

    # Perform a get query for liveliness
    Zenohex.Session.get(session, "forge/services/**", liveliness: true, queryable: queryable)
  end

  defp loop(subscriber) do
    Zenohex.Subscriber.loop(subscriber, fn sample ->
      case sample.kind do
        :put ->
          IO.puts("[+] #{sample.key_expr}")
        :delete ->
          IO.puts("[-] #{sample.key_expr}")
      end
    end)
  end
end

# Run the dashboard (will run continuously)
# Forge.ServiceDashboard.run()

Interactive Dashboard

# For Livebook, we can create a more interactive version
defmodule InteractiveDashboard do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init([]) do
    {:ok, session} = Zenohex.open()
    subscriber = Zenohex.Session.declare_subscriber(session, "forge/services/**", liveliness: true)

    # Query initial state
    queryable = Zenohex.Session.declare_queryable(session, "forge/services/dashboard-query")
    Zenohex.Session.get(session, "forge/services/**", liveliness: true, queryable: queryable)

    {:ok, %{session: session, subscriber: subscriber, services: []}}
  end

  def handle_info({:zenoh_sample, sample}, state) do
    new_services = case sample.kind do
      :put ->
        IO.puts("[+] #{sample.key_expr}")
        [sample.key_expr | state.services]
      :delete ->
        IO.puts("[-] #{sample.key_expr}")
        List.delete(state.services, sample.key_expr)
    end

    {:noreply, %{state | services: new_services}}
  end

  def get_services() do
    GenServer.call(__MODULE__, :get_services)
  end

  def handle_call(:get_services, _from, state) do
    {:reply, state.services, state}
  end
end

# Start the interactive dashboard
{:ok, _pid} = InteractiveDashboard.start_link()

Current Services

# Get current list of active services
services = InteractiveDashboard.get_services()

IO.puts("Currently Active Services:")
IO.puts("==========================")

if Enum.empty?(services) do
  IO.puts("No active services detected.")
else
  Enum.each(services, fn service ->
    IO.puts("- #{service}")
  end)
end

IO.puts("\\nTotal: #{length(services)} services")

Usage Instructions

  1. Setup: Run the first cell to install Zenoh dependencies
  2. Start Dashboard: Uncomment and run the service dashboard cell
  3. Monitor: The dashboard will show services appearing/disappearing in real-time
  4. Interactive: Use the interactive version to programmatically query services

Notes

  • Requires Zenoh network connectivity
  • Monitors liveliness tokens under forge/services/**
  • Shows real-time service discovery and health monitoring