Real-time Telemetry Dashboard
Mix.install([
{:kino, "~> 0.13.0"},
{:kino_vega_lite, "~> 0.1.11"},
{:kino_db, "~> 0.2.7"},
{:vega_lite, "~> 0.1.9"},
{:req, "~> 0.5.0"}
])
Introduction
This notebook provides real-time monitoring of the AI Self-Sustaining System telemetry data. It connects directly to the Phoenix application to stream live telemetry events and visualize system performance.
Connect to Phoenix Application
# Connect to the Phoenix application node
node = :"self_sustaining@localhost"
Node.connect(node)
# Get the integration module
alias SelfSustaining.LivebookIntegration
Live Telemetry Data Stream
# Subscribe to telemetry stream
LivebookIntegration.subscribe_to_telemetry_stream(self())
# Create a frame for real-time updates
frame = Kino.Frame.new()
# Real-time telemetry monitoring
telemetry_data_input = Kino.Input.select("Time Range", [
{"Last Hour", :last_hour},
{"Last Day", :last_day},
{"Last Week", :last_week}
])
time_range = Kino.Input.read(telemetry_data_input)
# Get telemetry data
telemetry_data = LivebookIntegration.get_telemetry_data(time_range)
# Display summary
Kino.Markdown.new("""
## Telemetry Summary
- **Total Events**: #{telemetry_data.summary.total_events}
- **Time Range**: #{time_range}
- **Collection Time**: #{telemetry_data.timestamp}
### Event Type Distribution
#{Enum.map(telemetry_data.summary.event_types, fn {type, count} ->
"- #{type}: #{count} events"
end) |> Enum.join("\n")}
""")
Interactive Performance Charts
# Create VegaLite chart for performance visualization
alias VegaLite, as: Vl
performance_chart =
Vl.new(width: 800, height: 400)
|> Vl.data_from_values(telemetry_data.charts_data)
|> Vl.mark(:line, point: true)
|> Vl.encode_field(:x, "timestamp", type: :temporal, title: "Time")
|> Vl.encode_field(:y, "duration", type: :quantitative, title: "Duration (ms)")
|> Vl.encode_field(:color, "event_type", type: :nominal, title: "Event Type")
|> Vl.config(title: [text: "System Performance Over Time", fontSize: 16])
Kino.VegaLite.new(performance_chart)
Memory Usage Analysis
memory_chart =
Vl.new(width: 800, height: 300)
|> Vl.data_from_values(telemetry_data.charts_data)
|> Vl.mark(:area, opacity: 0.7)
|> Vl.encode_field(:x, "timestamp", type: :temporal, title: "Time")
|> Vl.encode_field(:y, "memory", type: :quantitative, title: "Memory Usage (bytes)")
|> Vl.config(title: [text: "Memory Usage Trends", fontSize: 16])
Kino.VegaLite.new(memory_chart)
Agent Coordination Metrics
coordination_data = LivebookIntegration.get_agent_coordination_data()
# Display coordination summary
Kino.Markdown.new("""
## Agent Coordination Status
- **Active Agents**: #{coordination_data.metrics.active_agents}
- **Active Work Items**: #{coordination_data.metrics.active_work_items}
- **Completed Work Items**: #{coordination_data.metrics.completed_work_items}
- **Average Completion Time**: #{coordination_data.metrics.avg_completion_time}ms
### Recent Agent Activity
""")
# Agent coordination table
coordination_table_data =
coordination_data.agents
|> Enum.map(fn {agent_id, agent_info} ->
%{
agent_id: agent_id,
status: Map.get(agent_info, "status", "unknown"),
last_seen: Map.get(agent_info, "last_seen", "N/A"),
work_items: Map.get(agent_info, "work_items_claimed", 0)
}
end)
Kino.DataTable.new(coordination_table_data, name: "Active Agents")
Live Event Monitor
# Create live event monitor
event_monitor = Kino.Frame.new()
# Function to update the monitor
update_monitor = fn ->
current_time = DateTime.utc_now() |> DateTime.to_string()
latest_data = LivebookIntegration.get_telemetry_data(:last_hour)
recent_events =
latest_data.events
|> Enum.take(10)
|> Enum.map(fn event ->
"#{event.timestamp} - #{List.last(event.event)} (#{event.measurements.duration}ms)"
end)
|> Enum.join("\n")
content = Kino.Markdown.new("""
## Live Events (Updated: #{current_time})
#{recent_events}
""")
Kino.Frame.render(event_monitor, content)
end
# Update every 5 seconds
Task.async(fn ->
Stream.interval(5000)
|> Enum.each(fn _ -> update_monitor.() end)
end)
event_monitor
System Health Check
# System health indicators
health_check = fn ->
performance_data = LivebookIntegration.get_performance_data()
health_status = %{
memory_usage: performance_data.system_metrics.memory_usage.total,
process_count: performance_data.system_metrics.process_count,
telemetry_events: length(telemetry_data.events),
agents_active: coordination_data.metrics.active_agents
}
Kino.Markdown.new("""
## ๐ System Health Status
| Metric | Value | Status |
|--------|-------|--------|
| Memory Usage | #{Float.round(health_status.memory_usage / 1024 / 1024, 2)} MB | #{if health_status.memory_usage < 1_000_000_000, do: "โ
Good", else: "โ ๏ธ High"} |
| Process Count | #{health_status.process_count} | #{if health_status.process_count < 100_000, do: "โ
Normal", else: "โ ๏ธ High"} |
| Telemetry Events | #{health_status.telemetry_events} | #{if health_status.telemetry_events > 0, do: "โ
Active", else: "โ No Data"} |
| Active Agents | #{health_status.agents_active} | #{if health_status.agents_active > 0, do: "โ
Working", else: "โ ๏ธ Idle"} |
""")
end
health_check.()
Export Data for Analysis
# Export current telemetry data
export_button = Kino.Control.button("Export Telemetry Data")
export_output = Kino.Frame.new()
Kino.Control.stream(export_button)
|> Kino.listen(fn _event ->
timestamp = DateTime.utc_now() |> DateTime.to_iso8601()
filename = "telemetry_export_#{timestamp}.json"
export_data = %{
timestamp: timestamp,
telemetry: telemetry_data,
coordination: coordination_data,
system_health: LivebookIntegration.get_performance_data()
}
# Save to file
File.write!(filename, Jason.encode!(export_data, pretty: true))
content = Kino.Markdown.new("โ
Data exported to `#{filename}`")
Kino.Frame.render(export_output, content)
end)
Kino.Layout.grid([export_button, export_output], columns: 1)