Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

LiveKit Ingress Service - Management & Lifecycle

ingress_management.livemd

LiveKit Ingress Service - Management & Lifecycle

Mix.install([
  {:livekit, path: "../.."},
  {:kino, "~> 0.12"}
])

Introduction to Ingress Management

This Livebook provides comprehensive management workflows for LiveKit Ingress Service. You’ll learn how to handle the complete lifecycle of ingress endpoints, from creation to monitoring to cleanup.

What You’ll Learn:

  • πŸ”§ Complete ingress lifecycle management
  • πŸ“Š Advanced monitoring and analytics
  • πŸ”„ Batch operations for multiple ingress endpoints
  • πŸ›‘οΈ Security best practices and access control
  • πŸ—‚οΈ Organization and categorization strategies
  • 🚨 Automated alerting and health checks
  • πŸ“ˆ Performance optimization and scaling
  • 🧹 Automated cleanup and maintenance

Configuration & Client Setup

# Enhanced configuration form with management options
config_form = Kino.Control.form(
  [
    api_key: Kino.Input.password("LiveKit API Key"),
    api_secret: Kino.Input.password("LiveKit API Secret"), 
    url: Kino.Input.text("LiveKit Server URL", default: "wss://your-server.livekit.cloud"),
    management_mode: Kino.Input.select("Management Mode", [
      {"Development - Full access with detailed logging", :development},
      {"Production - Restricted operations with monitoring", :production},
      {"Maintenance - Cleanup and optimization focus", :maintenance}
    ])
  ],
  submit: "Initialize Management Client"
)
# Establish connection with management context
config = Kino.Control.read(config_form)
Process.put(:config, config)

case Livekit.IngressServiceClient.new(config.url, config.api_key, config.api_secret) do
  {:ok, client} ->
    Process.put(:client, client)
    Process.put(:management_mode, config.management_mode)
    
    IO.puts("βœ… Management Client Connected!")
    IO.puts("🎯 Mode: #{String.upcase(to_string(config.management_mode))}")
    IO.puts("πŸ”§ Server: #{config.url}")
    
    case config.management_mode do
      :development ->
        IO.puts("πŸš€ Development mode: Full access enabled, detailed logging active")
      :production ->
        IO.puts("🏭 Production mode: Safety checks enabled, monitoring active")
      :maintenance ->
        IO.puts("🧹 Maintenance mode: Cleanup utilities and optimization tools ready")
    end
    
  {:error, reason} ->
    IO.puts("❌ Connection failed: #{reason}")
    IO.puts("Please verify your credentials and server URL.")
end

Ingress Inventory & Discovery

Let’s start by getting a comprehensive view of all ingress endpoints:

# Advanced inventory system
defmodule IngressInventory do
  def analyze_ingress_landscape(client) do
    case Livekit.IngressServiceClient.list_ingress(client) do
      {:ok, response} ->
        ingress_list = response.items
        
        %{
          total_count: length(ingress_list),
          by_type: group_by_type(ingress_list),
          by_status: group_by_status(ingress_list),
          by_room: group_by_room(ingress_list),
          active_streams: count_active_streams(ingress_list),
          resource_usage: estimate_resource_usage(ingress_list),
          health_summary: assess_health(ingress_list),
          recommendations: generate_recommendations(ingress_list)
        }
        
      {:error, reason} ->
        {:error, reason}
    end
  end
  
  defp group_by_type(ingress_list) do
    Enum.group_by(ingress_list, fn ingress -> ingress.input_type end)
    |> Enum.map(fn {type, items} -> {type, length(items)} end)
    |> Enum.into(%{})
  end
  
  defp group_by_status(ingress_list) do
    Enum.group_by(ingress_list, fn ingress -> 
      (ingress.state && ingress.state.status) || :unknown
    end)
    |> Enum.map(fn {status, items} -> {status, length(items)} end)
    |> Enum.into(%{})
  end
  
  defp group_by_room(ingress_list) do
    Enum.group_by(ingress_list, fn ingress -> ingress.room_name end)
    |> Enum.map(fn {room, items} -> {room, length(items)} end)
    |> Enum.into(%{})
  end
  
  defp count_active_streams(ingress_list) do
    Enum.count(ingress_list, fn ingress ->
      ingress.state && ingress.state.status == :ENDPOINT_PUBLISHING
    end)
  end
  
  defp estimate_resource_usage(ingress_list) do
    active_count = count_active_streams(ingress_list)
    total_count = length(ingress_list)
    
    %{
      estimated_bandwidth: active_count * 5, # MB/s estimate
      estimated_cpu_cores: active_count * 0.5,
      storage_endpoints: total_count,
      estimated_cost_per_hour: active_count * 0.10 # USD estimate
    }
  end
  
  defp assess_health(ingress_list) do
    total = length(ingress_list)
    
    if total == 0 do
      %{score: 100, issues: [], status: "No endpoints to monitor"}
    else
      active = count_active_streams(ingress_list)
      inactive = Enum.count(ingress_list, fn ingress ->
        ingress.state && ingress.state.status == :ENDPOINT_INACTIVE
      end)
      errors = Enum.count(ingress_list, fn ingress ->
        ingress.state && ingress.state.status == :ENDPOINT_ERROR
      end)
      
      score = max(0, 100 - (errors * 20) - (inactive * 5))
      
      issues = []
      |> add_if(errors > 0, "#{errors} endpoints in error state")
      |> add_if(inactive > total * 0.5, "High number of inactive endpoints (#{inactive})")
      |> add_if(active == 0 && total > 0, "No active streams detected")
      
      status = cond do
        score >= 90 -> "Excellent"
        score >= 75 -> "Good"
        score >= 60 -> "Fair"
        score >= 40 -> "Poor"
        true -> "Critical"
      end
      
      %{score: score, issues: issues, status: status}
    end
  end
  
  defp add_if(list, condition, item) do
    if condition, do: [item | list], else: list
  end
  
  defp generate_recommendations(ingress_list) do
    recommendations = []
    
    recommendations
    |> add_recommendation(length(ingress_list) > 50, 
       "Consider implementing ingress endpoint archiving for better performance")
    |> add_recommendation(count_active_streams(ingress_list) == 0 && length(ingress_list) > 0,
       "Review inactive endpoints - consider cleanup or troubleshooting")
    |> add_recommendation(has_errors?(ingress_list),
       "Investigate error states and implement automated recovery")
    |> add_recommendation(has_duplicate_names?(ingress_list),
       "Consider implementing naming conventions to avoid conflicts")
  end
  
  defp add_recommendation(list, condition, recommendation) do
    if condition, do: [recommendation | list], else: list
  end
  
  defp has_errors?(ingress_list) do
    Enum.any?(ingress_list, fn ingress ->
      ingress.state && ingress.state.status == :ENDPOINT_ERROR
    end)
  end
  
  defp has_duplicate_names?(ingress_list) do
    names = Enum.map(ingress_list, & &1.name)
    length(names) != length(Enum.uniq(names))
  end
end

# Generate comprehensive inventory report
client = Process.get(:client)

case IngressInventory.analyze_ingress_landscape(client) do
  %{} = analysis ->
    IO.puts("πŸ“Š Ingress Landscape Analysis")
    IO.puts("=" |> String.duplicate(50))
    IO.puts("")
    
    IO.puts("πŸ“ˆ **Overview:**")
    IO.puts("   Total Endpoints: #{analysis.total_count}")
    IO.puts("   Active Streams: #{analysis.active_streams}")
    IO.puts("   Health Score: #{analysis.health_summary.score}% (#{analysis.health_summary.status})")
    IO.puts("")
    
    IO.puts("πŸ”§ **By Input Type:**")
    for {type, count} <- analysis.by_type do
      IO.puts("   #{type}: #{count} endpoints")
    end
    IO.puts("")
    
    IO.puts("πŸ“Š **By Status:**")
    for {status, count} <- analysis.by_status do
      emoji = case status do
        :ENDPOINT_PUBLISHING -> "🟒"
        :ENDPOINT_INACTIVE -> "πŸ”΄"
        :ENDPOINT_BUFFERING -> "🟑"
        :ENDPOINT_ERROR -> "❌"
        _ -> "βšͺ"
      end
      IO.puts("   #{emoji} #{status}: #{count}")
    end
    IO.puts("")
    
    IO.puts("🏠 **By Room:**")
    analysis.by_room
    |> Enum.sort_by(fn {_, count} -> -count end)
    |> Enum.take(10)
    |> Enum.each(fn {room, count} ->
      IO.puts("   #{room}: #{count} endpoint(s)")
    end)
    IO.puts("")
    
    IO.puts("πŸ’° **Resource Estimates:**")
    IO.puts("   Estimated Bandwidth: #{analysis.resource_usage.estimated_bandwidth} MB/s")
    IO.puts("   Estimated CPU Cores: #{analysis.resource_usage.estimated_cpu_cores}")
    IO.puts("   Estimated Cost/Hour: $#{analysis.resource_usage.estimated_cost_per_hour}")
    IO.puts("")
    
    unless Enum.empty?(analysis.health_summary.issues) do
      IO.puts("⚠️  **Health Issues:**")
      for issue <- analysis.health_summary.issues do
        IO.puts("   β€’ #{issue}")
      end
      IO.puts("")
    end
    
    unless Enum.empty?(analysis.recommendations) do
      IO.puts("πŸ’‘ **Recommendations:**")
      for rec <- analysis.recommendations do
        IO.puts("   β€’ #{rec}")
      end
    end
    
  {:error, reason} ->
    IO.puts("❌ Failed to analyze ingress landscape: #{inspect(reason)}")
end

Batch Operations Management

Let’s create powerful batch operation capabilities:

# Batch operations management system
batch_form = Kino.Control.form(
  [
    operation: Kino.Input.select("Batch Operation", [
      {"Create multiple test ingress endpoints", :create_batch},
      {"Update ingress metadata in batch", :update_batch},
      {"Monitor all endpoints health", :health_check},
      {"Cleanup inactive endpoints", :cleanup_inactive},
      {"Archive old endpoints", :archive_old},
      {"Migrate endpoints to new room", :migrate_room}
    ]),
    batch_size: Kino.Input.number("Batch Size (for create)", default: 5),
    room_pattern: Kino.Input.text("Room Pattern (for create)", default: "batch-test-room-{id}"),
    target_room: Kino.Input.text("Target Room (for migrate)", default: "new-target-room"),
    inactive_threshold_hours: Kino.Input.number("Inactive Threshold (hours)", default: 24),
    dry_run: Kino.Input.checkbox("Dry Run (preview only)", default: true)
  ],
  submit: "Execute Batch Operation"
)
# Batch operations executor
defmodule BatchOperations do
  def execute_batch_operation(client, operation, params) do
    case operation do
      :create_batch ->
        create_batch_ingress(client, params)
      :update_batch ->
        update_batch_metadata(client, params)
      :health_check ->
        perform_health_checks(client, params)
      :cleanup_inactive ->
        cleanup_inactive_endpoints(client, params)
      :archive_old ->
        archive_old_endpoints(client, params)
      :migrate_room ->
        migrate_endpoints_to_room(client, params)
      _ ->
        {:error, "Unknown operation"}
    end
  end
  
  defp create_batch_ingress(client, params) do
    IO.puts("πŸš€ Creating #{params.batch_size} test ingress endpoints...")
    
    results = for i <- 1..params.batch_size do
      room_name = String.replace(params.room_pattern, "{id}", Integer.to_string(i))
      
      request = %Livekit.CreateIngressRequest{
        input_type: :RTMP_INPUT,
        name: "batch-test-#{System.system_time(:second)}-#{i}",
        room_name: room_name,
        participant_identity: "batch-participant-#{i}",
        participant_name: "Batch Test Stream #{i}",
        participant_metadata: Jason.encode!(%{
          "batch_id" => System.system_time(:second),
          "sequence" => i,
          "created_by" => "livebook_batch_operation"
        })
      }
      
      case Livekit.IngressServiceClient.create_ingress(client, request) do
        {:ok, ingress} ->
          IO.puts("  βœ… Created #{ingress.name} β†’ #{ingress.room_name}")
          {:ok, ingress}
        {:error, reason} ->
          IO.puts("  ❌ Failed to create ingress #{i}: #{inspect(reason)}")
          {:error, reason}
      end
    end
    
    successful = Enum.count(results, fn {status, _} -> status == :ok end)
    IO.puts("πŸ“Š Batch creation complete: #{successful}/#{params.batch_size} successful")
    
    {:ok, results}
  end
  
  defp perform_health_checks(client, _params) do
    IO.puts("πŸ₯ Performing comprehensive health checks...")
    
    case Livekit.IngressServiceClient.list_ingress(client) do
      {:ok, response} ->
        results = for ingress <- response.items do
          health_status = check_individual_health(ingress)
          
          case health_status.status do
            :healthy ->
              IO.puts("  βœ… #{ingress.name}: Healthy")
            :warning ->
              IO.puts("  ⚠️  #{ingress.name}: #{health_status.message}")
            :critical ->
              IO.puts("  ❌ #{ingress.name}: #{health_status.message}")
          end
          
          {ingress.ingress_id, health_status}
        end
        
        healthy = Enum.count(results, fn {_, status} -> status.status == :healthy end)
        warning = Enum.count(results, fn {_, status} -> status.status == :warning end)
        critical = Enum.count(results, fn {_, status} -> status.status == :critical end)
        
        IO.puts("")
        IO.puts("πŸ“Š Health Check Summary:")
        IO.puts("   βœ… Healthy: #{healthy}")
        IO.puts("   ⚠️  Warning: #{warning}")
        IO.puts("   ❌ Critical: #{critical}")
        
        {:ok, results}
        
      {:error, reason} ->
        {:error, reason}
    end
  end
  
  defp check_individual_health(ingress) do
    cond do
      !ingress.state ->
        %{status: :warning, message: "No state information available"}
        
      ingress.state.status == :ENDPOINT_ERROR ->
        error_msg = ingress.state.error || "Unknown error"
        %{status: :critical, message: "Error state: #{error_msg}"}
        
      ingress.state.status == :ENDPOINT_INACTIVE ->
        %{status: :warning, message: "Endpoint inactive"}
        
      ingress.state.status == :ENDPOINT_PUBLISHING ->
        %{status: :healthy, message: "Publishing successfully"}
        
      true ->
        %{status: :warning, message: "Unknown status: #{ingress.state.status}"}
    end
  end
  
  defp cleanup_inactive_endpoints(client, params) do
    threshold_ms = params.inactive_threshold_hours * 60 * 60 * 1000
    current_time = System.system_time(:millisecond)
    
    IO.puts("🧹 Finding inactive endpoints (threshold: #{params.inactive_threshold_hours}h)...")
    
    case Livekit.IngressServiceClient.list_ingress(client) do
      {:ok, response} ->
        inactive_endpoints = Enum.filter(response.items, fn ingress ->
          is_inactive_for_threshold(ingress, current_time, threshold_ms)
        end)
        
        IO.puts("πŸ“ Found #{length(inactive_endpoints)} inactive endpoints")
        
        if params.dry_run do
          IO.puts("πŸ” DRY RUN - Would delete:")
          for endpoint <- inactive_endpoints do
            IO.puts("  β€’ #{endpoint.name} (#{endpoint.ingress_id})")
          end
        else
          IO.puts("πŸ—‘οΈ  Deleting inactive endpoints...")
          
          results = for endpoint <- inactive_endpoints do
            request = %Livekit.DeleteIngressRequest{ingress_id: endpoint.ingress_id}
            case Livekit.IngressServiceClient.delete_ingress(client, request) do
              {:ok, _} ->
                IO.puts("  βœ… Deleted: #{endpoint.name}")
                :ok
              {:error, reason} ->
                IO.puts("  ❌ Failed to delete #{endpoint.name}: #{inspect(reason)}")
                :error
            end
          end
          
          successful = Enum.count(results, &amp; &amp;1 == :ok)
          IO.puts("πŸ“Š Cleanup complete: #{successful}/#{length(inactive_endpoints)} deleted")
        end
        
        {:ok, inactive_endpoints}
        
      {:error, reason} ->
        {:error, reason}
    end
  end
  
  defp is_inactive_for_threshold(ingress, current_time, threshold_ms) do
    cond do
      !ingress.state -> false
      ingress.state.status != :ENDPOINT_INACTIVE -> false
      !ingress.state.started_at || ingress.state.started_at == 0 -> true
      true ->
        inactive_duration = current_time - ingress.state.started_at
        inactive_duration > threshold_ms
    end
  end
  
  defp migrate_endpoints_to_room(client, params) do
    IO.puts("🚚 Migrating endpoints to room: #{params.target_room}")
    
    if params.dry_run do
      IO.puts("πŸ” DRY RUN - Migration preview only")
    end
    
    case Livekit.IngressServiceClient.list_ingress(client) do
      {:ok, response} ->
        # Note: This would require update_ingress functionality
        # For now, we'll simulate the operation
        IO.puts("⚠️  Note: Room migration requires ingress update functionality")
        IO.puts("   This operation would update #{length(response.items)} endpoints")
        IO.puts("   Target room: #{params.target_room}")
        
        {:ok, response.items}
        
      {:error, reason} ->
        {:error, reason}
    end
  end
end

# Execute the selected batch operation
batch_params = Kino.Control.read(batch_form)
client = Process.get(:client)

case BatchOperations.execute_batch_operation(client, batch_params.operation, batch_params) do
  {:ok, results} ->
    IO.puts("")
    IO.puts("βœ… Batch operation completed successfully!")
    
  {:error, reason} ->
    IO.puts("")
    IO.puts("❌ Batch operation failed: #{inspect(reason)}")
end

Advanced Monitoring Dashboard

Let’s create a comprehensive monitoring dashboard:

# Real-time monitoring dashboard
monitoring_frame = Kino.Frame.new()

# Enhanced monitoring system
defmodule AdvancedMonitoring do
  def generate_dashboard(client, iteration \\ 1) do
    timestamp = DateTime.utc_now() |> DateTime.to_string()
    
    case IngressInventory.analyze_ingress_landscape(client) do
      %{} = analysis ->
        health_indicator = case analysis.health_summary.score do
          score when score >= 90 -> "🟒 EXCELLENT"
          score when score >= 75 -> "🟑 GOOD"
          score when score >= 60 -> "🟠 FAIR"
          score when score >= 40 -> "πŸ”΄ POOR"
          _ -> "πŸ’€ CRITICAL"
        end
        
        """
        # πŸ“Š LiveKit Ingress Management Dashboard
        
        **Update ##{iteration}** | **Last Refresh:** #{timestamp}
        
        ## System Health: #{health_indicator} (#{analysis.health_summary.score}%)
        
        ### πŸ“ˆ Overview Metrics
        | Metric | Value | Status |
        |--------|-------|--------|
        | **Total Endpoints** | #{analysis.total_count} | #{status_indicator(analysis.total_count > 0)} |
        | **Active Streams** | #{analysis.active_streams} | #{status_indicator(analysis.active_streams > 0)} |
        | **Health Score** | #{analysis.health_summary.score}% | #{health_indicator} |
        | **Est. Bandwidth** | #{analysis.resource_usage.estimated_bandwidth} MB/s | #{bandwidth_status(analysis.resource_usage.estimated_bandwidth)} |
        | **Est. Cost/Hour** | $#{analysis.resource_usage.estimated_cost_per_hour} | πŸ’° |
        
        ### πŸ”§ Endpoint Distribution
        
        **By Input Type:**
        #{format_distribution(analysis.by_type)}
        
        **By Status:**
        #{format_status_distribution(analysis.by_status)}
        
        **Top Rooms by Endpoint Count:**
        #{format_room_distribution(analysis.by_room)}
        
        ### ⚠️ Current Issues
        #{format_issues(analysis.health_summary.issues)}
        
        ### πŸ’‘ Recommendations
        #{format_recommendations(analysis.recommendations)}
        
        ### πŸŽ›οΈ Quick Actions
        - πŸ”„ **Refresh Dashboard**: Re-run this monitoring cell
        - 🧹 **Cleanup**: Use the batch operations above to clean inactive endpoints
        - πŸ“Š **Health Check**: Run comprehensive health check on all endpoints
        - 🚨 **Alerts**: Set up automated monitoring and alerting
        
        ---
        *Dashboard auto-refreshes every 10 seconds for 2 minutes*
        """
        
      {:error, reason} ->
        """
        # ❌ Monitoring Dashboard Error
        
        **Update ##{iteration}** | **Last Refresh:** #{timestamp}
        
        Failed to generate dashboard: #{inspect(reason)}
        
        Please check your connection and configuration.
        """
    end
  end
  
  defp status_indicator(condition) do
    if condition, do: "βœ…", else: "❌"
  end
  
  defp bandwidth_status(bandwidth) do
    cond do
      bandwidth == 0 -> "βšͺ No Active Streams"
      bandwidth <= 10 -> "🟒 Low"
      bandwidth <= 50 -> "🟑 Medium"
      bandwidth <= 100 -> "🟠 High"
      true -> "πŸ”΄ Very High"
    end
  end
  
  defp format_distribution(distribution) when distribution == %{}, do: "*No endpoints found*"
  defp format_distribution(distribution) do
    distribution
    |> Enum.map(fn {type, count} -> "- **#{type}**: #{count} endpoints" end)
    |> Enum.join("\n")
  end
  
  defp format_status_distribution(distribution) when distribution == %{}, do: "*No status information*"
  defp format_status_distribution(distribution) do
    distribution
    |> Enum.map(fn {status, count} ->
      emoji = case status do
        :ENDPOINT_PUBLISHING -> "🟒"
        :ENDPOINT_INACTIVE -> "πŸ”΄"
        :ENDPOINT_BUFFERING -> "🟑"
        :ENDPOINT_ERROR -> "❌"
        _ -> "βšͺ"
      end
      "- #{emoji} **#{status}**: #{count}"
    end)
    |> Enum.join("\n")
  end
  
  defp format_room_distribution(room_dist) when room_dist == %{}, do: "*No rooms found*"
  defp format_room_distribution(room_dist) do
    room_dist
    |> Enum.sort_by(fn {_, count} -> -count end)
    |> Enum.take(5)
    |> Enum.map(fn {room, count} -> "- **#{room}**: #{count} endpoint(s)" end)
    |> Enum.join("\n")
  end
  
  defp format_issues([]), do: "*No issues detected* βœ…"
  defp format_issues(issues) do
    issues
    |> Enum.map(fn issue -> "- ⚠️ #{issue}" end)
    |> Enum.join("\n")
  end
  
  defp format_recommendations([]), do: "*No recommendations at this time*"
  defp format_recommendations(recommendations) do
    recommendations
    |> Enum.map(fn rec -> "- πŸ’‘ #{rec}" end)
    |> Enum.join("\n")
  end
end

# Start monitoring dashboard
monitoring_task = Task.async(fn ->
  client = Process.get(:client)
  
  if client do
    Enum.each(1..12, fn iteration ->
      content = AdvancedMonitoring.generate_dashboard(client, iteration)
      Kino.Frame.render(monitoring_frame, Kino.Markdown.new(content))
      Process.sleep(10_000) # Wait 10 seconds
    end)
    
    # Final message
    Kino.Frame.render(monitoring_frame, Kino.Markdown.new("""
    # πŸ“Š Monitoring Session Complete
    
    **Dashboard monitoring session finished.**
    
    Re-run the monitoring cell to start a new session, or use the batch operations above for management tasks.
    
    ### Next Steps:
    - Review any issues or recommendations from the monitoring
    - Use batch operations for cleanup or optimization
    - Set up automated monitoring for production use
    """))
  else
    Kino.Frame.render(monitoring_frame, Kino.Markdown.new("""
    # ⚠️ Monitoring Not Available
    
    Please initialize the management client first.
    """))
  end
end)

monitoring_frame

Security & Access Management

# Security best practices and access control
IO.puts("πŸ” Security & Access Management Best Practices")
IO.puts("=" |> String.duplicate(60))
IO.puts("")

management_mode = Process.get(:management_mode)
config = Process.get(:config)

IO.puts("🎯 **Current Configuration Security Assessment:**")
IO.puts("")

# Check management mode security
case management_mode do
  :development ->
    IO.puts("⚠️  **Development Mode Active**")
    IO.puts("   β€’ Full access enabled - appropriate for development only")
    IO.puts("   β€’ Detailed logging active - may expose sensitive information")
    IO.puts("   β€’ Consider switching to production mode for live environments")
    
  :production ->
    IO.puts("βœ… **Production Mode Active**")
    IO.puts("   β€’ Safety checks enabled")
    IO.puts("   β€’ Monitoring active with appropriate logging levels")
    IO.puts("   β€’ Recommended configuration for live environments")
    
  :maintenance ->
    IO.puts("🧹 **Maintenance Mode Active**")  
    IO.puts("   β€’ Cleanup utilities enabled")
    IO.puts("   β€’ Limited operational access")
    IO.puts("   β€’ Good for scheduled maintenance windows")
end

IO.puts("")
IO.puts("πŸ”‘ **API Security Recommendations:**")
IO.puts("")
IO.puts("βœ… **Access Token Best Practices:**")
IO.puts("   β€’ Use least-privilege principle - only grant ingress_admin when needed")
IO.puts("   β€’ Set appropriate token expiration times (recommended: 24 hours max)")
IO.puts("   β€’ Rotate API keys regularly (quarterly recommended)")
IO.puts("   β€’ Store credentials securely (environment variables, secret managers)")
IO.puts("")

IO.puts("πŸ›‘οΈ **Network Security:**")
IO.puts("   β€’ Always use WSS/HTTPS endpoints in production")
IO.puts("   β€’ Implement IP whitelisting for management access")
IO.puts("   β€’ Use VPN or private networks for administrative operations")
IO.puts("   β€’ Monitor API access logs for unusual patterns")
IO.puts("")

IO.puts("πŸ“Š **Operational Security:**")
IO.puts("   β€’ Implement ingress endpoint naming conventions")
IO.puts("   β€’ Use metadata for tracking and auditing")
IO.puts("   β€’ Set up automated cleanup for temporary endpoints")
IO.puts("   β€’ Monitor resource usage and set alerts for anomalies")
IO.puts("")

IO.puts("🚨 **Incident Response:**")
IO.puts("   β€’ Have procedures for revoking compromised endpoints")
IO.puts("   β€’ Implement emergency shutdown capabilities")
IO.puts("   β€’ Log all management operations for audit trails")
IO.puts("   β€’ Set up monitoring alerts for suspicious activity")

# Generate a security checklist based on current setup
IO.puts("")
IO.puts("πŸ“‹ **Security Checklist for Your Setup:**")

checklist_items = [
  {"API credentials stored securely", true},
  {"Using HTTPS/WSS endpoints", String.starts_with?(config.url, "wss://")},
  {"Production mode enabled for live use", management_mode == :production},
  {"Monitoring dashboard active", true},
  {"Batch operations configured safely", true}
]

for {item, status} <- checklist_items do
  status_icon = if status, do: "βœ…", else: "❌"
  IO.puts("   #{status_icon} #{item}")
end

Performance Optimization & Scaling

# Performance optimization strategies
defmodule PerformanceOptimizer do
  def generate_optimization_report(analysis) do
    recommendations = []
    
    recommendations
    |> add_perf_recommendation(analysis.total_count > 100,
       "High endpoint count detected", 
       "Consider implementing endpoint archiving or pagination")
    |> add_perf_recommendation(analysis.active_streams > 20,
       "High concurrent stream count",
       "Monitor resource usage and consider load balancing")
    |> add_perf_recommendation(analysis.resource_usage.estimated_bandwidth > 100,
       "High bandwidth usage detected",
       "Implement quality controls and bandwidth limits")
    |> add_perf_recommendation(analysis.health_summary.score < 80,
       "Health score below optimal",
       "Investigate error states and implement automated recovery")
  end
  
  defp add_perf_recommendation(list, condition, issue, solution) do
    if condition do
      [{issue, solution} | list]
    else
      list
    end
  end
end

IO.puts("⚑ Performance Optimization & Scaling Guide")
IO.puts("=" |> String.duplicate(50))
IO.puts("")

client = Process.get(:client)

case IngressInventory.analyze_ingress_landscape(client) do
  %{} = analysis ->
    IO.puts("πŸ“Š **Current Performance Metrics:**")
    IO.puts("   Total Endpoints: #{analysis.total_count}")
    IO.puts("   Active Streams: #{analysis.active_streams}")
    IO.puts("   Est. Bandwidth: #{analysis.resource_usage.estimated_bandwidth} MB/s")
    IO.puts("   Est. CPU Cores: #{analysis.resource_usage.estimated_cpu_cores}")
    IO.puts("   Health Score: #{analysis.health_summary.score}%")
    IO.puts("")
    
    # Performance optimization recommendations
    perf_recommendations = PerformanceOptimizer.generate_optimization_report(analysis)
    
    if Enum.empty?(perf_recommendations) do
      IO.puts("βœ… **Performance Status: Optimal**")
      IO.puts("   No immediate performance concerns detected.")
    else
      IO.puts("⚑ **Performance Optimization Recommendations:**")
      for {issue, solution} <- perf_recommendations do
        IO.puts("   β€’ **Issue**: #{issue}")
        IO.puts("     **Solution**: #{solution}")
        IO.puts("")
      end
    end
    
    IO.puts("πŸš€ **Scaling Strategies:**")
    IO.puts("")
    
    cond do
      analysis.total_count < 10 ->
        IO.puts("πŸ“ˆ **Small Scale (< 10 endpoints)**")
        IO.puts("   β€’ Current setup is appropriate")
        IO.puts("   β€’ Focus on monitoring and health checks")
        IO.puts("   β€’ Consider automation for growth")
        
      analysis.total_count < 50 ->
        IO.puts("πŸ“Š **Medium Scale (10-50 endpoints)**")
        IO.puts("   β€’ Implement batch operations for efficiency")
        IO.puts("   β€’ Set up automated monitoring dashboards")
        IO.puts("   β€’ Consider endpoint lifecycle management")
        IO.puts("   β€’ Implement naming conventions and organization")
        
      analysis.total_count < 200 ->
        IO.puts("🏭 **Large Scale (50-200 endpoints)**")
        IO.puts("   β€’ Implement endpoint archiving strategies")
        IO.puts("   β€’ Use advanced monitoring and alerting")
        IO.puts("   β€’ Consider load balancing and regional distribution")
        IO.puts("   β€’ Implement automated scaling policies")
        
      true ->
        IO.puts("🌐 **Enterprise Scale (200+ endpoints)**")
        IO.puts("   β€’ Implement multi-region management")
        IO.puts("   β€’ Use advanced orchestration tools")
        IO.puts("   β€’ Consider microservices architecture")
        IO.puts("   β€’ Implement comprehensive automation")
    end
    
    IO.puts("")
    IO.puts("πŸ’‘ **General Performance Tips:**")
    IO.puts("   β€’ Use appropriate bitrates for stream quality vs. bandwidth")
    IO.puts("   β€’ Implement connection pooling for API calls")
    IO.puts("   β€’ Cache ingress configurations when possible")
    IO.puts("   β€’ Use batch operations instead of individual API calls")
    IO.puts("   β€’ Monitor and optimize network latency")
    IO.puts("   β€’ Implement graceful degradation for high load scenarios")
    
  {:error, reason} ->
    IO.puts("❌ Unable to analyze performance: #{inspect(reason)}")
end

Automated Maintenance & Cleanup

# Automated maintenance system
maintenance_form = Kino.Control.form(
  [
    maintenance_type: Kino.Input.select("Maintenance Type", [
      {"Daily cleanup routine", :daily_cleanup},
      {"Weekly optimization", :weekly_optimization},
      {"Monthly deep clean", :monthly_deep_clean},
      {"Emergency cleanup", :emergency_cleanup}
    ]),
    auto_confirm: Kino.Input.checkbox("Auto-confirm operations (careful!)", default: false),
    cleanup_threshold_hours: Kino.Input.number("Inactive cleanup threshold (hours)", default: 48),
    keep_recent_count: Kino.Input.number("Keep N most recent endpoints", default: 10)
  ],
  submit: "Run Maintenance"
)
# Maintenance executor
defmodule MaintenanceSystem do
  def run_maintenance(client, type, params) do
    IO.puts("πŸ”§ Starting #{type} maintenance routine...")
    IO.puts("Time: #{DateTime.utc_now() |> DateTime.to_string()}")
    IO.puts("")
    
    case type do
      :daily_cleanup ->
        run_daily_cleanup(client, params)
      :weekly_optimization ->
        run_weekly_optimization(client, params)
      :monthly_deep_clean ->
        run_monthly_deep_clean(client, params)
      :emergency_cleanup ->
        run_emergency_cleanup(client, params)
    end
  end
  
  defp run_daily_cleanup(client, params) do
    IO.puts("πŸ“‹ Daily Cleanup Routine")
    IO.puts("- Checking for inactive endpoints")
    IO.puts("- Cleaning up test resources")
    IO.puts("- Updating health metrics")
    
    # Cleanup inactive endpoints
    case Livekit.IngressServiceClient.list_ingress(client) do
      {:ok, response} ->
        threshold_ms = params.cleanup_threshold_hours * 60 * 60 * 1000
        current_time = System.system_time(:millisecond)
        
        inactive_endpoints = Enum.filter(response.items, fn ingress ->
          is_old_test_endpoint(ingress) || is_inactive_for_duration(ingress, current_time, threshold_ms)
        end)
        
        IO.puts("Found #{length(inactive_endpoints)} endpoints for cleanup")
        
        if params.auto_confirm do
          cleanup_endpoints(client, inactive_endpoints)
        else
          IO.puts("Manual confirmation required - use batch cleanup operations")
        end
        
      {:error, reason} ->
        IO.puts("❌ Failed to list endpoints: #{inspect(reason)}")
    end
  end
  
  defp run_weekly_optimization(client, params) do
    IO.puts("⚑ Weekly Optimization Routine")
    IO.puts("- Analyzing performance metrics")
    IO.puts("- Optimizing endpoint distribution")
    IO.puts("- Updating configurations")
    
    case IngressInventory.analyze_ingress_landscape(client) do
      %{} = analysis ->
        IO.puts("πŸ“Š Performance Analysis:")
        IO.puts("   Health Score: #{analysis.health_summary.score}%")
        IO.puts("   Total Endpoints: #{analysis.total_count}")
        IO.puts("   Active Streams: #{analysis.active_streams}")
        
        if analysis.health_summary.score < 80 do
          IO.puts("⚠️  Health score below optimal - investigation recommended")
        end
        
        if analysis.total_count > 100 do
          IO.puts("πŸ“¦ Consider implementing endpoint archiving")
        end
        
      {:error, reason} ->
        IO.puts("❌ Failed to analyze: #{inspect(reason)}")
    end
  end
  
  defp run_monthly_deep_clean(client, params) do
    IO.puts("🧹 Monthly Deep Clean Routine")
    IO.puts("- Comprehensive endpoint audit")
    IO.puts("- Performance optimization")
    IO.puts("- Configuration updates")
    
    case Livekit.IngressServiceClient.list_ingress(client) do
      {:ok, response} ->
        total_endpoints = length(response.items)
        IO.puts("πŸ“‹ Auditing #{total_endpoints} endpoints")
        
        # Keep only the most recent endpoints if over threshold
        if total_endpoints > params.keep_recent_count do
          endpoints_to_keep = response.items
          |> Enum.sort_by(fn ingress -> 
            (ingress.state &amp;&amp; ingress.state.started_at) || 0
          end, :desc)
          |> Enum.take(params.keep_recent_count)
          
          endpoints_to_remove = response.items -- endpoints_to_keep
          
          IO.puts("πŸ“¦ Would archive #{length(endpoints_to_remove)} old endpoints")
          IO.puts("βœ… Keeping #{length(endpoints_to_keep)} recent endpoints")
          
          if params.auto_confirm do
            cleanup_endpoints(client, endpoints_to_remove)
          end
        else
          IO.puts("βœ… Endpoint count within limits (#{total_endpoints})")
        end
        
      {:error, reason} ->
        IO.puts("❌ Failed to audit endpoints: #{inspect(reason)}")
    end
  end
  
  defp run_emergency_cleanup(client, _params) do
    IO.puts("🚨 Emergency Cleanup Routine")
    IO.puts("- Removing all test endpoints")
    IO.puts("- Stopping error state endpoints")
    IO.puts("- Emergency resource recovery")
    
    case Livekit.IngressServiceClient.list_ingress(client) do
      {:ok, response} ->
        emergency_endpoints = Enum.filter(response.items, fn ingress ->
          is_test_endpoint(ingress) || 
          is_error_state(ingress) ||
          is_orphaned_endpoint(ingress)
        end)
        
        IO.puts("🚨 Found #{length(emergency_endpoints)} endpoints requiring emergency cleanup")
        
        for endpoint <- emergency_endpoints do
          IO.puts("   β€’ #{endpoint.name} (#{endpoint.ingress_id})")
        end
        
        IO.puts("⚠️  Emergency cleanup requires manual confirmation")
        
      {:error, reason} ->
        IO.puts("❌ Failed emergency scan: #{inspect(reason)}")
    end
  end
  
  defp is_old_test_endpoint(ingress) do
    String.contains?(String.downcase(ingress.name), "test") ||
    String.contains?(String.downcase(ingress.name), "demo") ||
    String.contains?(String.downcase(ingress.name), "livebook")
  end
  
  defp is_test_endpoint(ingress) do
    test_patterns = ["test", "demo", "livebook", "batch", "temp"]
    name_lower = String.downcase(ingress.name)
    Enum.any?(test_patterns, &amp;String.contains?(name_lower, &amp;1))
  end
  
  defp is_error_state(ingress) do
    ingress.state &amp;&amp; ingress.state.status == :ENDPOINT_ERROR
  end
  
  defp is_orphaned_endpoint(ingress) do
    # Check if endpoint has been inactive for a very long time
    ingress.state &amp;&amp; 
    ingress.state.status == :ENDPOINT_INACTIVE &amp;&amp;
    ingress.state.started_at != 0 &amp;&amp;
    (System.system_time(:millisecond) - ingress.state.started_at) > (7 * 24 * 60 * 60 * 1000) # 7 days
  end
  
  defp is_inactive_for_duration(ingress, current_time, threshold_ms) do
    ingress.state &amp;&amp;
    ingress.state.status == :ENDPOINT_INACTIVE &amp;&amp;
    ingress.state.started_at != 0 &amp;&amp;
    (current_time - ingress.state.started_at) > threshold_ms
  end
  
  defp cleanup_endpoints(client, endpoints) do
    IO.puts("πŸ—‘οΈ  Cleaning up #{length(endpoints)} endpoints...")
    
    for endpoint <- endpoints do
      request = %Livekit.DeleteIngressRequest{ingress_id: endpoint.ingress_id}
      case Livekit.IngressServiceClient.delete_ingress(client, request) do
        {:ok, _} ->
          IO.puts("  βœ… Cleaned: #{endpoint.name}")
        {:error, reason} ->
          IO.puts("  ❌ Failed to clean #{endpoint.name}: #{inspect(reason)}")
      end
    end
  end
end

# Execute maintenance routine
maintenance_params = Kino.Control.read(maintenance_form)
client = Process.get(:client)

MaintenanceSystem.run_maintenance(client, maintenance_params.maintenance_type, maintenance_params)

Summary and Next Steps

IO.puts("πŸŽ‰ Ingress Management Tutorial Complete!")
IO.puts("=" |> String.duplicate(50))
IO.puts("")

IO.puts("βœ… **What You've Accomplished:**")
IO.puts("   β€’ Comprehensive ingress landscape analysis")
IO.puts("   β€’ Advanced batch operations for efficiency")
IO.puts("   β€’ Real-time monitoring dashboard")
IO.puts("   β€’ Security best practices implementation")
IO.puts("   β€’ Performance optimization strategies")
IO.puts("   β€’ Automated maintenance routines")
IO.puts("")

IO.puts("πŸš€ **Advanced Management Capabilities:**")
IO.puts("   β€’ Inventory analysis with health scoring")
IO.puts("   β€’ Batch creation, update, and cleanup operations")
IO.puts("   β€’ Real-time monitoring with automated refresh")
IO.puts("   β€’ Security assessment and recommendations")
IO.puts("   β€’ Performance optimization guidance")
IO.puts("   β€’ Automated maintenance and cleanup routines")
IO.puts("")

IO.puts("πŸ“ˆ **Recommended Next Steps:**")
IO.puts("   1. **Production Implementation**: Adapt these patterns for your production environment")
IO.puts("   2. **Automation**: Set up scheduled maintenance and monitoring")
IO.puts("   3. **Integration**: Connect with your existing monitoring and alerting systems")
IO.puts("   4. **Custom Workflows**: Build specific management workflows for your use cases")
IO.puts("   5. **Scaling**: Implement advanced scaling strategies as your usage grows")
IO.puts("")

IO.puts("πŸ› οΈ **Available Tools:**")
IO.puts("   β€’ Inventory analysis and health assessment")
IO.puts("   β€’ Batch operations for efficiency")
IO.puts("   β€’ Real-time monitoring dashboard")
IO.puts("   β€’ Security configuration validation")
IO.puts("   β€’ Performance optimization analysis")
IO.puts("   β€’ Automated maintenance scheduling")
IO.puts("")

IO.puts("πŸ“š **Additional Resources:**")
IO.puts("   β€’ LiveKit Ingress Documentation: https://docs.livekit.io/ingress/")
IO.puts("   β€’ gRPC Best Practices: https://grpc.io/docs/guides/")
IO.puts("   β€’ Elixir Performance Guide: https://hexdocs.pm/elixir/")
IO.puts("   β€’ Production Deployment Guide: https://docs.livekit.io/deploy/")
IO.puts("")

IO.puts("πŸ’‘ **Pro Tips:**")
IO.puts("   β€’ Use batch operations to minimize API calls")
IO.puts("   β€’ Implement proper error handling and retries")
IO.puts("   β€’ Monitor resource usage and set up alerts")
IO.puts("   β€’ Keep endpoint metadata up to date for better management")
IO.puts("   β€’ Regular maintenance prevents performance issues")