LiveKit Ingress Service - Management & Lifecycle
Mix.install([
{:livekit, path: "../.."},
{:kino, "~> 0.12"}
])
Introduction to Ingress Management
This Livebook provides comprehensive management workflows for LiveKit Ingress Service. Youβll learn how to handle the complete lifecycle of ingress endpoints, from creation to monitoring to cleanup.
What Youβll Learn:
- π§ Complete ingress lifecycle management
- π Advanced monitoring and analytics
- π Batch operations for multiple ingress endpoints
- π‘οΈ Security best practices and access control
- ποΈ Organization and categorization strategies
- π¨ Automated alerting and health checks
- π Performance optimization and scaling
- π§Ή Automated cleanup and maintenance
Configuration & Client Setup
# Enhanced configuration form with management options
config_form = Kino.Control.form(
[
api_key: Kino.Input.password("LiveKit API Key"),
api_secret: Kino.Input.password("LiveKit API Secret"),
url: Kino.Input.text("LiveKit Server URL", default: "wss://your-server.livekit.cloud"),
management_mode: Kino.Input.select("Management Mode", [
{"Development - Full access with detailed logging", :development},
{"Production - Restricted operations with monitoring", :production},
{"Maintenance - Cleanup and optimization focus", :maintenance}
])
],
submit: "Initialize Management Client"
)
# Establish connection with management context
config = Kino.Control.read(config_form)
Process.put(:config, config)
case Livekit.IngressServiceClient.new(config.url, config.api_key, config.api_secret) do
{:ok, client} ->
Process.put(:client, client)
Process.put(:management_mode, config.management_mode)
IO.puts("β
Management Client Connected!")
IO.puts("π― Mode: #{String.upcase(to_string(config.management_mode))}")
IO.puts("π§ Server: #{config.url}")
case config.management_mode do
:development ->
IO.puts("π Development mode: Full access enabled, detailed logging active")
:production ->
IO.puts("π Production mode: Safety checks enabled, monitoring active")
:maintenance ->
IO.puts("π§Ή Maintenance mode: Cleanup utilities and optimization tools ready")
end
{:error, reason} ->
IO.puts("β Connection failed: #{reason}")
IO.puts("Please verify your credentials and server URL.")
end
Ingress Inventory & Discovery
Letβs start by getting a comprehensive view of all ingress endpoints:
# Advanced inventory system
defmodule IngressInventory do
def analyze_ingress_landscape(client) do
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
ingress_list = response.items
%{
total_count: length(ingress_list),
by_type: group_by_type(ingress_list),
by_status: group_by_status(ingress_list),
by_room: group_by_room(ingress_list),
active_streams: count_active_streams(ingress_list),
resource_usage: estimate_resource_usage(ingress_list),
health_summary: assess_health(ingress_list),
recommendations: generate_recommendations(ingress_list)
}
{:error, reason} ->
{:error, reason}
end
end
defp group_by_type(ingress_list) do
Enum.group_by(ingress_list, fn ingress -> ingress.input_type end)
|> Enum.map(fn {type, items} -> {type, length(items)} end)
|> Enum.into(%{})
end
defp group_by_status(ingress_list) do
Enum.group_by(ingress_list, fn ingress ->
(ingress.state && ingress.state.status) || :unknown
end)
|> Enum.map(fn {status, items} -> {status, length(items)} end)
|> Enum.into(%{})
end
defp group_by_room(ingress_list) do
Enum.group_by(ingress_list, fn ingress -> ingress.room_name end)
|> Enum.map(fn {room, items} -> {room, length(items)} end)
|> Enum.into(%{})
end
defp count_active_streams(ingress_list) do
Enum.count(ingress_list, fn ingress ->
ingress.state && ingress.state.status == :ENDPOINT_PUBLISHING
end)
end
defp estimate_resource_usage(ingress_list) do
active_count = count_active_streams(ingress_list)
total_count = length(ingress_list)
%{
estimated_bandwidth: active_count * 5, # MB/s estimate
estimated_cpu_cores: active_count * 0.5,
storage_endpoints: total_count,
estimated_cost_per_hour: active_count * 0.10 # USD estimate
}
end
defp assess_health(ingress_list) do
total = length(ingress_list)
if total == 0 do
%{score: 100, issues: [], status: "No endpoints to monitor"}
else
active = count_active_streams(ingress_list)
inactive = Enum.count(ingress_list, fn ingress ->
ingress.state && ingress.state.status == :ENDPOINT_INACTIVE
end)
errors = Enum.count(ingress_list, fn ingress ->
ingress.state && ingress.state.status == :ENDPOINT_ERROR
end)
score = max(0, 100 - (errors * 20) - (inactive * 5))
issues = []
|> add_if(errors > 0, "#{errors} endpoints in error state")
|> add_if(inactive > total * 0.5, "High number of inactive endpoints (#{inactive})")
|> add_if(active == 0 && total > 0, "No active streams detected")
status = cond do
score >= 90 -> "Excellent"
score >= 75 -> "Good"
score >= 60 -> "Fair"
score >= 40 -> "Poor"
true -> "Critical"
end
%{score: score, issues: issues, status: status}
end
end
defp add_if(list, condition, item) do
if condition, do: [item | list], else: list
end
defp generate_recommendations(ingress_list) do
recommendations = []
recommendations
|> add_recommendation(length(ingress_list) > 50,
"Consider implementing ingress endpoint archiving for better performance")
|> add_recommendation(count_active_streams(ingress_list) == 0 && length(ingress_list) > 0,
"Review inactive endpoints - consider cleanup or troubleshooting")
|> add_recommendation(has_errors?(ingress_list),
"Investigate error states and implement automated recovery")
|> add_recommendation(has_duplicate_names?(ingress_list),
"Consider implementing naming conventions to avoid conflicts")
end
defp add_recommendation(list, condition, recommendation) do
if condition, do: [recommendation | list], else: list
end
defp has_errors?(ingress_list) do
Enum.any?(ingress_list, fn ingress ->
ingress.state && ingress.state.status == :ENDPOINT_ERROR
end)
end
defp has_duplicate_names?(ingress_list) do
names = Enum.map(ingress_list, & &1.name)
length(names) != length(Enum.uniq(names))
end
end
# Generate comprehensive inventory report
client = Process.get(:client)
case IngressInventory.analyze_ingress_landscape(client) do
%{} = analysis ->
IO.puts("π Ingress Landscape Analysis")
IO.puts("=" |> String.duplicate(50))
IO.puts("")
IO.puts("π **Overview:**")
IO.puts(" Total Endpoints: #{analysis.total_count}")
IO.puts(" Active Streams: #{analysis.active_streams}")
IO.puts(" Health Score: #{analysis.health_summary.score}% (#{analysis.health_summary.status})")
IO.puts("")
IO.puts("π§ **By Input Type:**")
for {type, count} <- analysis.by_type do
IO.puts(" #{type}: #{count} endpoints")
end
IO.puts("")
IO.puts("π **By Status:**")
for {status, count} <- analysis.by_status do
emoji = case status do
:ENDPOINT_PUBLISHING -> "π’"
:ENDPOINT_INACTIVE -> "π΄"
:ENDPOINT_BUFFERING -> "π‘"
:ENDPOINT_ERROR -> "β"
_ -> "βͺ"
end
IO.puts(" #{emoji} #{status}: #{count}")
end
IO.puts("")
IO.puts("π **By Room:**")
analysis.by_room
|> Enum.sort_by(fn {_, count} -> -count end)
|> Enum.take(10)
|> Enum.each(fn {room, count} ->
IO.puts(" #{room}: #{count} endpoint(s)")
end)
IO.puts("")
IO.puts("π° **Resource Estimates:**")
IO.puts(" Estimated Bandwidth: #{analysis.resource_usage.estimated_bandwidth} MB/s")
IO.puts(" Estimated CPU Cores: #{analysis.resource_usage.estimated_cpu_cores}")
IO.puts(" Estimated Cost/Hour: $#{analysis.resource_usage.estimated_cost_per_hour}")
IO.puts("")
unless Enum.empty?(analysis.health_summary.issues) do
IO.puts("β οΈ **Health Issues:**")
for issue <- analysis.health_summary.issues do
IO.puts(" β’ #{issue}")
end
IO.puts("")
end
unless Enum.empty?(analysis.recommendations) do
IO.puts("π‘ **Recommendations:**")
for rec <- analysis.recommendations do
IO.puts(" β’ #{rec}")
end
end
{:error, reason} ->
IO.puts("β Failed to analyze ingress landscape: #{inspect(reason)}")
end
Batch Operations Management
Letβs create powerful batch operation capabilities:
# Batch operations management system
batch_form = Kino.Control.form(
[
operation: Kino.Input.select("Batch Operation", [
{"Create multiple test ingress endpoints", :create_batch},
{"Update ingress metadata in batch", :update_batch},
{"Monitor all endpoints health", :health_check},
{"Cleanup inactive endpoints", :cleanup_inactive},
{"Archive old endpoints", :archive_old},
{"Migrate endpoints to new room", :migrate_room}
]),
batch_size: Kino.Input.number("Batch Size (for create)", default: 5),
room_pattern: Kino.Input.text("Room Pattern (for create)", default: "batch-test-room-{id}"),
target_room: Kino.Input.text("Target Room (for migrate)", default: "new-target-room"),
inactive_threshold_hours: Kino.Input.number("Inactive Threshold (hours)", default: 24),
dry_run: Kino.Input.checkbox("Dry Run (preview only)", default: true)
],
submit: "Execute Batch Operation"
)
# Batch operations executor
defmodule BatchOperations do
def execute_batch_operation(client, operation, params) do
case operation do
:create_batch ->
create_batch_ingress(client, params)
:update_batch ->
update_batch_metadata(client, params)
:health_check ->
perform_health_checks(client, params)
:cleanup_inactive ->
cleanup_inactive_endpoints(client, params)
:archive_old ->
archive_old_endpoints(client, params)
:migrate_room ->
migrate_endpoints_to_room(client, params)
_ ->
{:error, "Unknown operation"}
end
end
defp create_batch_ingress(client, params) do
IO.puts("π Creating #{params.batch_size} test ingress endpoints...")
results = for i <- 1..params.batch_size do
room_name = String.replace(params.room_pattern, "{id}", Integer.to_string(i))
request = %Livekit.CreateIngressRequest{
input_type: :RTMP_INPUT,
name: "batch-test-#{System.system_time(:second)}-#{i}",
room_name: room_name,
participant_identity: "batch-participant-#{i}",
participant_name: "Batch Test Stream #{i}",
participant_metadata: Jason.encode!(%{
"batch_id" => System.system_time(:second),
"sequence" => i,
"created_by" => "livebook_batch_operation"
})
}
case Livekit.IngressServiceClient.create_ingress(client, request) do
{:ok, ingress} ->
IO.puts(" β
Created #{ingress.name} β #{ingress.room_name}")
{:ok, ingress}
{:error, reason} ->
IO.puts(" β Failed to create ingress #{i}: #{inspect(reason)}")
{:error, reason}
end
end
successful = Enum.count(results, fn {status, _} -> status == :ok end)
IO.puts("π Batch creation complete: #{successful}/#{params.batch_size} successful")
{:ok, results}
end
defp perform_health_checks(client, _params) do
IO.puts("π₯ Performing comprehensive health checks...")
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
results = for ingress <- response.items do
health_status = check_individual_health(ingress)
case health_status.status do
:healthy ->
IO.puts(" β
#{ingress.name}: Healthy")
:warning ->
IO.puts(" β οΈ #{ingress.name}: #{health_status.message}")
:critical ->
IO.puts(" β #{ingress.name}: #{health_status.message}")
end
{ingress.ingress_id, health_status}
end
healthy = Enum.count(results, fn {_, status} -> status.status == :healthy end)
warning = Enum.count(results, fn {_, status} -> status.status == :warning end)
critical = Enum.count(results, fn {_, status} -> status.status == :critical end)
IO.puts("")
IO.puts("π Health Check Summary:")
IO.puts(" β
Healthy: #{healthy}")
IO.puts(" β οΈ Warning: #{warning}")
IO.puts(" β Critical: #{critical}")
{:ok, results}
{:error, reason} ->
{:error, reason}
end
end
defp check_individual_health(ingress) do
cond do
!ingress.state ->
%{status: :warning, message: "No state information available"}
ingress.state.status == :ENDPOINT_ERROR ->
error_msg = ingress.state.error || "Unknown error"
%{status: :critical, message: "Error state: #{error_msg}"}
ingress.state.status == :ENDPOINT_INACTIVE ->
%{status: :warning, message: "Endpoint inactive"}
ingress.state.status == :ENDPOINT_PUBLISHING ->
%{status: :healthy, message: "Publishing successfully"}
true ->
%{status: :warning, message: "Unknown status: #{ingress.state.status}"}
end
end
defp cleanup_inactive_endpoints(client, params) do
threshold_ms = params.inactive_threshold_hours * 60 * 60 * 1000
current_time = System.system_time(:millisecond)
IO.puts("π§Ή Finding inactive endpoints (threshold: #{params.inactive_threshold_hours}h)...")
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
inactive_endpoints = Enum.filter(response.items, fn ingress ->
is_inactive_for_threshold(ingress, current_time, threshold_ms)
end)
IO.puts("π Found #{length(inactive_endpoints)} inactive endpoints")
if params.dry_run do
IO.puts("π DRY RUN - Would delete:")
for endpoint <- inactive_endpoints do
IO.puts(" β’ #{endpoint.name} (#{endpoint.ingress_id})")
end
else
IO.puts("ποΈ Deleting inactive endpoints...")
results = for endpoint <- inactive_endpoints do
request = %Livekit.DeleteIngressRequest{ingress_id: endpoint.ingress_id}
case Livekit.IngressServiceClient.delete_ingress(client, request) do
{:ok, _} ->
IO.puts(" β
Deleted: #{endpoint.name}")
:ok
{:error, reason} ->
IO.puts(" β Failed to delete #{endpoint.name}: #{inspect(reason)}")
:error
end
end
successful = Enum.count(results, & &1 == :ok)
IO.puts("π Cleanup complete: #{successful}/#{length(inactive_endpoints)} deleted")
end
{:ok, inactive_endpoints}
{:error, reason} ->
{:error, reason}
end
end
defp is_inactive_for_threshold(ingress, current_time, threshold_ms) do
cond do
!ingress.state -> false
ingress.state.status != :ENDPOINT_INACTIVE -> false
!ingress.state.started_at || ingress.state.started_at == 0 -> true
true ->
inactive_duration = current_time - ingress.state.started_at
inactive_duration > threshold_ms
end
end
defp migrate_endpoints_to_room(client, params) do
IO.puts("π Migrating endpoints to room: #{params.target_room}")
if params.dry_run do
IO.puts("π DRY RUN - Migration preview only")
end
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
# Note: This would require update_ingress functionality
# For now, we'll simulate the operation
IO.puts("β οΈ Note: Room migration requires ingress update functionality")
IO.puts(" This operation would update #{length(response.items)} endpoints")
IO.puts(" Target room: #{params.target_room}")
{:ok, response.items}
{:error, reason} ->
{:error, reason}
end
end
end
# Execute the selected batch operation
batch_params = Kino.Control.read(batch_form)
client = Process.get(:client)
case BatchOperations.execute_batch_operation(client, batch_params.operation, batch_params) do
{:ok, results} ->
IO.puts("")
IO.puts("β
Batch operation completed successfully!")
{:error, reason} ->
IO.puts("")
IO.puts("β Batch operation failed: #{inspect(reason)}")
end
Advanced Monitoring Dashboard
Letβs create a comprehensive monitoring dashboard:
# Real-time monitoring dashboard
monitoring_frame = Kino.Frame.new()
# Enhanced monitoring system
defmodule AdvancedMonitoring do
def generate_dashboard(client, iteration \\ 1) do
timestamp = DateTime.utc_now() |> DateTime.to_string()
case IngressInventory.analyze_ingress_landscape(client) do
%{} = analysis ->
health_indicator = case analysis.health_summary.score do
score when score >= 90 -> "π’ EXCELLENT"
score when score >= 75 -> "π‘ GOOD"
score when score >= 60 -> "π FAIR"
score when score >= 40 -> "π΄ POOR"
_ -> "π CRITICAL"
end
"""
# π LiveKit Ingress Management Dashboard
**Update ##{iteration}** | **Last Refresh:** #{timestamp}
## System Health: #{health_indicator} (#{analysis.health_summary.score}%)
### π Overview Metrics
| Metric | Value | Status |
|--------|-------|--------|
| **Total Endpoints** | #{analysis.total_count} | #{status_indicator(analysis.total_count > 0)} |
| **Active Streams** | #{analysis.active_streams} | #{status_indicator(analysis.active_streams > 0)} |
| **Health Score** | #{analysis.health_summary.score}% | #{health_indicator} |
| **Est. Bandwidth** | #{analysis.resource_usage.estimated_bandwidth} MB/s | #{bandwidth_status(analysis.resource_usage.estimated_bandwidth)} |
| **Est. Cost/Hour** | $#{analysis.resource_usage.estimated_cost_per_hour} | π° |
### π§ Endpoint Distribution
**By Input Type:**
#{format_distribution(analysis.by_type)}
**By Status:**
#{format_status_distribution(analysis.by_status)}
**Top Rooms by Endpoint Count:**
#{format_room_distribution(analysis.by_room)}
### β οΈ Current Issues
#{format_issues(analysis.health_summary.issues)}
### π‘ Recommendations
#{format_recommendations(analysis.recommendations)}
### ποΈ Quick Actions
- π **Refresh Dashboard**: Re-run this monitoring cell
- π§Ή **Cleanup**: Use the batch operations above to clean inactive endpoints
- π **Health Check**: Run comprehensive health check on all endpoints
- π¨ **Alerts**: Set up automated monitoring and alerting
---
*Dashboard auto-refreshes every 10 seconds for 2 minutes*
"""
{:error, reason} ->
"""
# β Monitoring Dashboard Error
**Update ##{iteration}** | **Last Refresh:** #{timestamp}
Failed to generate dashboard: #{inspect(reason)}
Please check your connection and configuration.
"""
end
end
defp status_indicator(condition) do
if condition, do: "β
", else: "β"
end
defp bandwidth_status(bandwidth) do
cond do
bandwidth == 0 -> "βͺ No Active Streams"
bandwidth <= 10 -> "π’ Low"
bandwidth <= 50 -> "π‘ Medium"
bandwidth <= 100 -> "π High"
true -> "π΄ Very High"
end
end
defp format_distribution(distribution) when distribution == %{}, do: "*No endpoints found*"
defp format_distribution(distribution) do
distribution
|> Enum.map(fn {type, count} -> "- **#{type}**: #{count} endpoints" end)
|> Enum.join("\n")
end
defp format_status_distribution(distribution) when distribution == %{}, do: "*No status information*"
defp format_status_distribution(distribution) do
distribution
|> Enum.map(fn {status, count} ->
emoji = case status do
:ENDPOINT_PUBLISHING -> "π’"
:ENDPOINT_INACTIVE -> "π΄"
:ENDPOINT_BUFFERING -> "π‘"
:ENDPOINT_ERROR -> "β"
_ -> "βͺ"
end
"- #{emoji} **#{status}**: #{count}"
end)
|> Enum.join("\n")
end
defp format_room_distribution(room_dist) when room_dist == %{}, do: "*No rooms found*"
defp format_room_distribution(room_dist) do
room_dist
|> Enum.sort_by(fn {_, count} -> -count end)
|> Enum.take(5)
|> Enum.map(fn {room, count} -> "- **#{room}**: #{count} endpoint(s)" end)
|> Enum.join("\n")
end
defp format_issues([]), do: "*No issues detected* β
"
defp format_issues(issues) do
issues
|> Enum.map(fn issue -> "- β οΈ #{issue}" end)
|> Enum.join("\n")
end
defp format_recommendations([]), do: "*No recommendations at this time*"
defp format_recommendations(recommendations) do
recommendations
|> Enum.map(fn rec -> "- π‘ #{rec}" end)
|> Enum.join("\n")
end
end
# Start monitoring dashboard
monitoring_task = Task.async(fn ->
client = Process.get(:client)
if client do
Enum.each(1..12, fn iteration ->
content = AdvancedMonitoring.generate_dashboard(client, iteration)
Kino.Frame.render(monitoring_frame, Kino.Markdown.new(content))
Process.sleep(10_000) # Wait 10 seconds
end)
# Final message
Kino.Frame.render(monitoring_frame, Kino.Markdown.new("""
# π Monitoring Session Complete
**Dashboard monitoring session finished.**
Re-run the monitoring cell to start a new session, or use the batch operations above for management tasks.
### Next Steps:
- Review any issues or recommendations from the monitoring
- Use batch operations for cleanup or optimization
- Set up automated monitoring for production use
"""))
else
Kino.Frame.render(monitoring_frame, Kino.Markdown.new("""
# β οΈ Monitoring Not Available
Please initialize the management client first.
"""))
end
end)
monitoring_frame
Security & Access Management
# Security best practices and access control
IO.puts("π Security & Access Management Best Practices")
IO.puts("=" |> String.duplicate(60))
IO.puts("")
management_mode = Process.get(:management_mode)
config = Process.get(:config)
IO.puts("π― **Current Configuration Security Assessment:**")
IO.puts("")
# Check management mode security
case management_mode do
:development ->
IO.puts("β οΈ **Development Mode Active**")
IO.puts(" β’ Full access enabled - appropriate for development only")
IO.puts(" β’ Detailed logging active - may expose sensitive information")
IO.puts(" β’ Consider switching to production mode for live environments")
:production ->
IO.puts("β
**Production Mode Active**")
IO.puts(" β’ Safety checks enabled")
IO.puts(" β’ Monitoring active with appropriate logging levels")
IO.puts(" β’ Recommended configuration for live environments")
:maintenance ->
IO.puts("π§Ή **Maintenance Mode Active**")
IO.puts(" β’ Cleanup utilities enabled")
IO.puts(" β’ Limited operational access")
IO.puts(" β’ Good for scheduled maintenance windows")
end
IO.puts("")
IO.puts("π **API Security Recommendations:**")
IO.puts("")
IO.puts("β
**Access Token Best Practices:**")
IO.puts(" β’ Use least-privilege principle - only grant ingress_admin when needed")
IO.puts(" β’ Set appropriate token expiration times (recommended: 24 hours max)")
IO.puts(" β’ Rotate API keys regularly (quarterly recommended)")
IO.puts(" β’ Store credentials securely (environment variables, secret managers)")
IO.puts("")
IO.puts("π‘οΈ **Network Security:**")
IO.puts(" β’ Always use WSS/HTTPS endpoints in production")
IO.puts(" β’ Implement IP whitelisting for management access")
IO.puts(" β’ Use VPN or private networks for administrative operations")
IO.puts(" β’ Monitor API access logs for unusual patterns")
IO.puts("")
IO.puts("π **Operational Security:**")
IO.puts(" β’ Implement ingress endpoint naming conventions")
IO.puts(" β’ Use metadata for tracking and auditing")
IO.puts(" β’ Set up automated cleanup for temporary endpoints")
IO.puts(" β’ Monitor resource usage and set alerts for anomalies")
IO.puts("")
IO.puts("π¨ **Incident Response:**")
IO.puts(" β’ Have procedures for revoking compromised endpoints")
IO.puts(" β’ Implement emergency shutdown capabilities")
IO.puts(" β’ Log all management operations for audit trails")
IO.puts(" β’ Set up monitoring alerts for suspicious activity")
# Generate a security checklist based on current setup
IO.puts("")
IO.puts("π **Security Checklist for Your Setup:**")
checklist_items = [
{"API credentials stored securely", true},
{"Using HTTPS/WSS endpoints", String.starts_with?(config.url, "wss://")},
{"Production mode enabled for live use", management_mode == :production},
{"Monitoring dashboard active", true},
{"Batch operations configured safely", true}
]
for {item, status} <- checklist_items do
status_icon = if status, do: "β
", else: "β"
IO.puts(" #{status_icon} #{item}")
end
Performance Optimization & Scaling
# Performance optimization strategies
defmodule PerformanceOptimizer do
def generate_optimization_report(analysis) do
recommendations = []
recommendations
|> add_perf_recommendation(analysis.total_count > 100,
"High endpoint count detected",
"Consider implementing endpoint archiving or pagination")
|> add_perf_recommendation(analysis.active_streams > 20,
"High concurrent stream count",
"Monitor resource usage and consider load balancing")
|> add_perf_recommendation(analysis.resource_usage.estimated_bandwidth > 100,
"High bandwidth usage detected",
"Implement quality controls and bandwidth limits")
|> add_perf_recommendation(analysis.health_summary.score < 80,
"Health score below optimal",
"Investigate error states and implement automated recovery")
end
defp add_perf_recommendation(list, condition, issue, solution) do
if condition do
[{issue, solution} | list]
else
list
end
end
end
IO.puts("β‘ Performance Optimization & Scaling Guide")
IO.puts("=" |> String.duplicate(50))
IO.puts("")
client = Process.get(:client)
case IngressInventory.analyze_ingress_landscape(client) do
%{} = analysis ->
IO.puts("π **Current Performance Metrics:**")
IO.puts(" Total Endpoints: #{analysis.total_count}")
IO.puts(" Active Streams: #{analysis.active_streams}")
IO.puts(" Est. Bandwidth: #{analysis.resource_usage.estimated_bandwidth} MB/s")
IO.puts(" Est. CPU Cores: #{analysis.resource_usage.estimated_cpu_cores}")
IO.puts(" Health Score: #{analysis.health_summary.score}%")
IO.puts("")
# Performance optimization recommendations
perf_recommendations = PerformanceOptimizer.generate_optimization_report(analysis)
if Enum.empty?(perf_recommendations) do
IO.puts("β
**Performance Status: Optimal**")
IO.puts(" No immediate performance concerns detected.")
else
IO.puts("β‘ **Performance Optimization Recommendations:**")
for {issue, solution} <- perf_recommendations do
IO.puts(" β’ **Issue**: #{issue}")
IO.puts(" **Solution**: #{solution}")
IO.puts("")
end
end
IO.puts("π **Scaling Strategies:**")
IO.puts("")
cond do
analysis.total_count < 10 ->
IO.puts("π **Small Scale (< 10 endpoints)**")
IO.puts(" β’ Current setup is appropriate")
IO.puts(" β’ Focus on monitoring and health checks")
IO.puts(" β’ Consider automation for growth")
analysis.total_count < 50 ->
IO.puts("π **Medium Scale (10-50 endpoints)**")
IO.puts(" β’ Implement batch operations for efficiency")
IO.puts(" β’ Set up automated monitoring dashboards")
IO.puts(" β’ Consider endpoint lifecycle management")
IO.puts(" β’ Implement naming conventions and organization")
analysis.total_count < 200 ->
IO.puts("π **Large Scale (50-200 endpoints)**")
IO.puts(" β’ Implement endpoint archiving strategies")
IO.puts(" β’ Use advanced monitoring and alerting")
IO.puts(" β’ Consider load balancing and regional distribution")
IO.puts(" β’ Implement automated scaling policies")
true ->
IO.puts("π **Enterprise Scale (200+ endpoints)**")
IO.puts(" β’ Implement multi-region management")
IO.puts(" β’ Use advanced orchestration tools")
IO.puts(" β’ Consider microservices architecture")
IO.puts(" β’ Implement comprehensive automation")
end
IO.puts("")
IO.puts("π‘ **General Performance Tips:**")
IO.puts(" β’ Use appropriate bitrates for stream quality vs. bandwidth")
IO.puts(" β’ Implement connection pooling for API calls")
IO.puts(" β’ Cache ingress configurations when possible")
IO.puts(" β’ Use batch operations instead of individual API calls")
IO.puts(" β’ Monitor and optimize network latency")
IO.puts(" β’ Implement graceful degradation for high load scenarios")
{:error, reason} ->
IO.puts("β Unable to analyze performance: #{inspect(reason)}")
end
Automated Maintenance & Cleanup
# Automated maintenance system
maintenance_form = Kino.Control.form(
[
maintenance_type: Kino.Input.select("Maintenance Type", [
{"Daily cleanup routine", :daily_cleanup},
{"Weekly optimization", :weekly_optimization},
{"Monthly deep clean", :monthly_deep_clean},
{"Emergency cleanup", :emergency_cleanup}
]),
auto_confirm: Kino.Input.checkbox("Auto-confirm operations (careful!)", default: false),
cleanup_threshold_hours: Kino.Input.number("Inactive cleanup threshold (hours)", default: 48),
keep_recent_count: Kino.Input.number("Keep N most recent endpoints", default: 10)
],
submit: "Run Maintenance"
)
# Maintenance executor
defmodule MaintenanceSystem do
def run_maintenance(client, type, params) do
IO.puts("π§ Starting #{type} maintenance routine...")
IO.puts("Time: #{DateTime.utc_now() |> DateTime.to_string()}")
IO.puts("")
case type do
:daily_cleanup ->
run_daily_cleanup(client, params)
:weekly_optimization ->
run_weekly_optimization(client, params)
:monthly_deep_clean ->
run_monthly_deep_clean(client, params)
:emergency_cleanup ->
run_emergency_cleanup(client, params)
end
end
defp run_daily_cleanup(client, params) do
IO.puts("π Daily Cleanup Routine")
IO.puts("- Checking for inactive endpoints")
IO.puts("- Cleaning up test resources")
IO.puts("- Updating health metrics")
# Cleanup inactive endpoints
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
threshold_ms = params.cleanup_threshold_hours * 60 * 60 * 1000
current_time = System.system_time(:millisecond)
inactive_endpoints = Enum.filter(response.items, fn ingress ->
is_old_test_endpoint(ingress) || is_inactive_for_duration(ingress, current_time, threshold_ms)
end)
IO.puts("Found #{length(inactive_endpoints)} endpoints for cleanup")
if params.auto_confirm do
cleanup_endpoints(client, inactive_endpoints)
else
IO.puts("Manual confirmation required - use batch cleanup operations")
end
{:error, reason} ->
IO.puts("β Failed to list endpoints: #{inspect(reason)}")
end
end
defp run_weekly_optimization(client, params) do
IO.puts("β‘ Weekly Optimization Routine")
IO.puts("- Analyzing performance metrics")
IO.puts("- Optimizing endpoint distribution")
IO.puts("- Updating configurations")
case IngressInventory.analyze_ingress_landscape(client) do
%{} = analysis ->
IO.puts("π Performance Analysis:")
IO.puts(" Health Score: #{analysis.health_summary.score}%")
IO.puts(" Total Endpoints: #{analysis.total_count}")
IO.puts(" Active Streams: #{analysis.active_streams}")
if analysis.health_summary.score < 80 do
IO.puts("β οΈ Health score below optimal - investigation recommended")
end
if analysis.total_count > 100 do
IO.puts("π¦ Consider implementing endpoint archiving")
end
{:error, reason} ->
IO.puts("β Failed to analyze: #{inspect(reason)}")
end
end
defp run_monthly_deep_clean(client, params) do
IO.puts("π§Ή Monthly Deep Clean Routine")
IO.puts("- Comprehensive endpoint audit")
IO.puts("- Performance optimization")
IO.puts("- Configuration updates")
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
total_endpoints = length(response.items)
IO.puts("π Auditing #{total_endpoints} endpoints")
# Keep only the most recent endpoints if over threshold
if total_endpoints > params.keep_recent_count do
endpoints_to_keep = response.items
|> Enum.sort_by(fn ingress ->
(ingress.state && ingress.state.started_at) || 0
end, :desc)
|> Enum.take(params.keep_recent_count)
endpoints_to_remove = response.items -- endpoints_to_keep
IO.puts("π¦ Would archive #{length(endpoints_to_remove)} old endpoints")
IO.puts("β
Keeping #{length(endpoints_to_keep)} recent endpoints")
if params.auto_confirm do
cleanup_endpoints(client, endpoints_to_remove)
end
else
IO.puts("β
Endpoint count within limits (#{total_endpoints})")
end
{:error, reason} ->
IO.puts("β Failed to audit endpoints: #{inspect(reason)}")
end
end
defp run_emergency_cleanup(client, _params) do
IO.puts("π¨ Emergency Cleanup Routine")
IO.puts("- Removing all test endpoints")
IO.puts("- Stopping error state endpoints")
IO.puts("- Emergency resource recovery")
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
emergency_endpoints = Enum.filter(response.items, fn ingress ->
is_test_endpoint(ingress) ||
is_error_state(ingress) ||
is_orphaned_endpoint(ingress)
end)
IO.puts("π¨ Found #{length(emergency_endpoints)} endpoints requiring emergency cleanup")
for endpoint <- emergency_endpoints do
IO.puts(" β’ #{endpoint.name} (#{endpoint.ingress_id})")
end
IO.puts("β οΈ Emergency cleanup requires manual confirmation")
{:error, reason} ->
IO.puts("β Failed emergency scan: #{inspect(reason)}")
end
end
defp is_old_test_endpoint(ingress) do
String.contains?(String.downcase(ingress.name), "test") ||
String.contains?(String.downcase(ingress.name), "demo") ||
String.contains?(String.downcase(ingress.name), "livebook")
end
defp is_test_endpoint(ingress) do
test_patterns = ["test", "demo", "livebook", "batch", "temp"]
name_lower = String.downcase(ingress.name)
Enum.any?(test_patterns, &String.contains?(name_lower, &1))
end
defp is_error_state(ingress) do
ingress.state && ingress.state.status == :ENDPOINT_ERROR
end
defp is_orphaned_endpoint(ingress) do
# Check if endpoint has been inactive for a very long time
ingress.state &&
ingress.state.status == :ENDPOINT_INACTIVE &&
ingress.state.started_at != 0 &&
(System.system_time(:millisecond) - ingress.state.started_at) > (7 * 24 * 60 * 60 * 1000) # 7 days
end
defp is_inactive_for_duration(ingress, current_time, threshold_ms) do
ingress.state &&
ingress.state.status == :ENDPOINT_INACTIVE &&
ingress.state.started_at != 0 &&
(current_time - ingress.state.started_at) > threshold_ms
end
defp cleanup_endpoints(client, endpoints) do
IO.puts("ποΈ Cleaning up #{length(endpoints)} endpoints...")
for endpoint <- endpoints do
request = %Livekit.DeleteIngressRequest{ingress_id: endpoint.ingress_id}
case Livekit.IngressServiceClient.delete_ingress(client, request) do
{:ok, _} ->
IO.puts(" β
Cleaned: #{endpoint.name}")
{:error, reason} ->
IO.puts(" β Failed to clean #{endpoint.name}: #{inspect(reason)}")
end
end
end
end
# Execute maintenance routine
maintenance_params = Kino.Control.read(maintenance_form)
client = Process.get(:client)
MaintenanceSystem.run_maintenance(client, maintenance_params.maintenance_type, maintenance_params)
Summary and Next Steps
IO.puts("π Ingress Management Tutorial Complete!")
IO.puts("=" |> String.duplicate(50))
IO.puts("")
IO.puts("β
**What You've Accomplished:**")
IO.puts(" β’ Comprehensive ingress landscape analysis")
IO.puts(" β’ Advanced batch operations for efficiency")
IO.puts(" β’ Real-time monitoring dashboard")
IO.puts(" β’ Security best practices implementation")
IO.puts(" β’ Performance optimization strategies")
IO.puts(" β’ Automated maintenance routines")
IO.puts("")
IO.puts("π **Advanced Management Capabilities:**")
IO.puts(" β’ Inventory analysis with health scoring")
IO.puts(" β’ Batch creation, update, and cleanup operations")
IO.puts(" β’ Real-time monitoring with automated refresh")
IO.puts(" β’ Security assessment and recommendations")
IO.puts(" β’ Performance optimization guidance")
IO.puts(" β’ Automated maintenance and cleanup routines")
IO.puts("")
IO.puts("π **Recommended Next Steps:**")
IO.puts(" 1. **Production Implementation**: Adapt these patterns for your production environment")
IO.puts(" 2. **Automation**: Set up scheduled maintenance and monitoring")
IO.puts(" 3. **Integration**: Connect with your existing monitoring and alerting systems")
IO.puts(" 4. **Custom Workflows**: Build specific management workflows for your use cases")
IO.puts(" 5. **Scaling**: Implement advanced scaling strategies as your usage grows")
IO.puts("")
IO.puts("π οΈ **Available Tools:**")
IO.puts(" β’ Inventory analysis and health assessment")
IO.puts(" β’ Batch operations for efficiency")
IO.puts(" β’ Real-time monitoring dashboard")
IO.puts(" β’ Security configuration validation")
IO.puts(" β’ Performance optimization analysis")
IO.puts(" β’ Automated maintenance scheduling")
IO.puts("")
IO.puts("π **Additional Resources:**")
IO.puts(" β’ LiveKit Ingress Documentation: https://docs.livekit.io/ingress/")
IO.puts(" β’ gRPC Best Practices: https://grpc.io/docs/guides/")
IO.puts(" β’ Elixir Performance Guide: https://hexdocs.pm/elixir/")
IO.puts(" β’ Production Deployment Guide: https://docs.livekit.io/deploy/")
IO.puts("")
IO.puts("π‘ **Pro Tips:**")
IO.puts(" β’ Use batch operations to minimize API calls")
IO.puts(" β’ Implement proper error handling and retries")
IO.puts(" β’ Monitor resource usage and set up alerts")
IO.puts(" β’ Keep endpoint metadata up to date for better management")
IO.puts(" β’ Regular maintenance prevents performance issues")