Forge Service Dashboard
# Livebook setup - copy this entire cell to run
Mix.install([
{:zenohex, "~> 0.7.2"}
])
Service Dashboard
Monitor active Zenoh liveliness tokens in the Forge fabric.
defmodule Forge.ServiceDashboard do
def run() do
{:ok, session} = Zenohex.open()
# Subscribe to liveliness queries under "forge/services/**"
liveliness_subscriber = Zenohex.Session.declare_subscriber(session, "forge/services/**", liveliness: true)
IO.puts("Forge Service Dashboard")
IO.puts("========================")
IO.puts("Active AI Services:")
IO.puts("")
# Initial query to get current liveliness tokens
query_current_liveliness(session)
# Listen for changes (in a loop, printing updates)
loop(liveliness_subscriber)
end
defp query_current_liveliness(session) do
# To get current state, we can query for liveliness
queryable = Zenohex.Session.declare_queryable(session, "forge/services/dashboard-query")
# Perform a get query for liveliness
Zenohex.Session.get(session, "forge/services/**", liveliness: true, queryable: queryable)
end
defp loop(subscriber) do
Zenohex.Subscriber.loop(subscriber, fn sample ->
case sample.kind do
:put ->
IO.puts("[+] #{sample.key_expr}")
:delete ->
IO.puts("[-] #{sample.key_expr}")
end
end)
end
end
# Run the dashboard (will run continuously)
# Forge.ServiceDashboard.run()
Interactive Dashboard
# For Livebook, we can create a more interactive version
defmodule InteractiveDashboard do
use GenServer
def start_link() do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def init([]) do
{:ok, session} = Zenohex.open()
subscriber = Zenohex.Session.declare_subscriber(session, "forge/services/**", liveliness: true)
# Query initial state
queryable = Zenohex.Session.declare_queryable(session, "forge/services/dashboard-query")
Zenohex.Session.get(session, "forge/services/**", liveliness: true, queryable: queryable)
{:ok, %{session: session, subscriber: subscriber, services: []}}
end
def handle_info({:zenoh_sample, sample}, state) do
new_services = case sample.kind do
:put ->
IO.puts("[+] #{sample.key_expr}")
[sample.key_expr | state.services]
:delete ->
IO.puts("[-] #{sample.key_expr}")
List.delete(state.services, sample.key_expr)
end
{:noreply, %{state | services: new_services}}
end
def get_services() do
GenServer.call(__MODULE__, :get_services)
end
def handle_call(:get_services, _from, state) do
{:reply, state.services, state}
end
end
# Start the interactive dashboard
{:ok, _pid} = InteractiveDashboard.start_link()
Current Services
# Get current list of active services
services = InteractiveDashboard.get_services()
IO.puts("Currently Active Services:")
IO.puts("==========================")
if Enum.empty?(services) do
IO.puts("No active services detected.")
else
Enum.each(services, fn service ->
IO.puts("- #{service}")
end)
end
IO.puts("\\nTotal: #{length(services)} services")
Usage Instructions
- Setup: Run the first cell to install Zenoh dependencies
- Start Dashboard: Uncomment and run the service dashboard cell
- Monitor: The dashboard will show services appearing/disappearing in real-time
- Interactive: Use the interactive version to programmatically query services
Notes
- Requires Zenoh network connectivity
-
Monitors liveliness tokens under
forge/services/** - Shows real-time service discovery and health monitoring