Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

LiveKit Ingress Service - File Processing & URL Streams

ingress_file_processing.livemd

LiveKit Ingress Service - File Processing & URL Streams

Mix.install([
  {:livekit, path: "../.."},
  {:kino, "~> 0.12"},
  {:jason, "~> 1.4"},
  {:req, "~> 0.4.0"}
])

Introduction to File & URL Ingress

This Livebook demonstrates how to use LiveKit’s Ingress Service for file-based and URL-based streaming. This includes:

  • πŸ“Ή Video file ingestion (MP4, MOV, AVI, etc.)
  • 🎡 Audio file processing (MP3, WAV, AAC, etc.)
  • 🌐 URL stream ingestion (HLS, DASH, direct streams)
  • πŸ“Ί Live stream re-streaming from other platforms
  • πŸ”„ Batch processing workflows for multiple files
  • ⏰ Scheduled content delivery

What You’ll Learn:

  • πŸ”§ Setting up URL ingress endpoints
  • πŸ“ Processing local and remote video files
  • 🎬 Creating automated content workflows
  • πŸ“Š Monitoring file processing progress
  • πŸ› οΈ Troubleshooting file format issues
  • ⚑ Optimizing for different content types

Configuration & Client Setup

# LiveKit configuration
config_form = Kino.Control.form(
  [
    api_key: Kino.Input.password("LiveKit API Key"),
    api_secret: Kino.Input.password("LiveKit API Secret"),
    url: Kino.Input.text("LiveKit Server URL", default: "wss://your-server.livekit.cloud"),
    room_name: Kino.Input.text("Target Room Name", default: "file-processing-room")
  ],
  submit: "Connect to LiveKit"
)
# Establish connection
config = Kino.Control.read(config_form)
Process.put(:config, config)

case Livekit.IngressServiceClient.new(config.url, config.api_key, config.api_secret) do
  {:ok, client} ->
    Process.put(:client, client)
    IO.puts("βœ… Connected to LiveKit Ingress Service!")
    IO.puts("🎯 Target room: #{config.room_name}")
    
  {:error, reason} ->
    IO.puts("❌ Connection failed: #{reason}")
    IO.puts("Please verify your credentials and server URL.")
end

URL Stream Ingress

Let’s start by creating URL-based ingress endpoints for various stream types:

# URL ingress configuration form
url_form = Kino.Control.form(
  [
    stream_name: Kino.Input.text("Stream Name", default: "url-stream-#{System.system_time(:second)}"),
    source_url: Kino.Input.text("Source URL", default: "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"),
    url_type: Kino.Input.select("URL Type", [
      {"Direct Video File (MP4/MOV)", :direct_video},
      {"HLS Stream (m3u8)", :hls},
      {"DASH Stream", :dash},
      {"YouTube Live Stream", :youtube},
      {"Custom RTMP Stream", :rtmp_url}
    ]),
    participant_identity: Kino.Input.text("Participant Identity", default: "url-streamer"),
    loop_content: Kino.Input.checkbox("Loop Content", default: false)
  ],
  submit: "Create URL Ingress"
)
# Create URL ingress endpoint
url_params = Kino.Control.read(url_form)
client = Process.get(:client)
config = Process.get(:config)

# Validate URL
url_valid = case url_params.url_type do
  :direct_video -> String.ends_with?(url_params.source_url, [".mp4", ".mov", ".avi", ".mkv"])
  :hls -> String.ends_with?(url_params.source_url, ".m3u8")
  :dash -> String.contains?(url_params.source_url, ".mpd")
  :youtube -> String.contains?(url_params.source_url, "youtube.com") || String.contains?(url_params.source_url, "youtu.be")
  :rtmp_url -> String.starts_with?(url_params.source_url, "rtmp://")
  _ -> true
end

if url_valid do
  # Add metadata based on URL type
  metadata = %{
    "source_type" => "url_ingress",
    "url_type" => to_string(url_params.url_type),
    "source_url" => url_params.source_url,
    "loop_enabled" => url_params.loop_content,
    "processing_mode" => "streaming"
  } |> Jason.encode!()

  request = %Livekit.CreateIngressRequest{
    input_type: :URL_INPUT,
    name: url_params.stream_name,
    room_name: config.room_name,
    participant_identity: url_params.participant_identity,
    participant_name: "URL Stream: #{url_params.url_type}",
    participant_metadata: metadata,
    url: url_params.source_url,
    enable_transcoding: true
  }

  case Livekit.IngressServiceClient.create_ingress(client, request) do
    {:ok, ingress} ->
      Process.put(:url_ingress, ingress)
      
      IO.puts("πŸŽ‰ URL Ingress Created Successfully!")
      IO.puts("=" |> String.duplicate(50))
      IO.puts("πŸ“‘ Ingress ID: #{ingress.ingress_id}")
      IO.puts("🏷️  Name: #{ingress.name}")
      IO.puts("πŸ”— Source URL: #{ingress.url}")
      IO.puts("🏠 Room: #{ingress.room_name}")
      IO.puts("πŸ‘€ Participant: #{ingress.participant_identity}")
      IO.puts("🎭 Display Name: #{ingress.participant_name}")
      IO.puts("βš™οΈ  Transcoding: #{ingress.enable_transcoding}")
      IO.puts("πŸ“‹ Type: #{url_params.url_type}")
      IO.puts("=" |> String.duplicate(50))
      
    {:error, reason} ->
      IO.puts("❌ Failed to create URL ingress: #{inspect(reason)}")
  end
else
  IO.puts("⚠️  Invalid URL for selected type. Please check the URL format.")
end

Popular Streaming URLs for Testing

Let’s provide some sample URLs for testing different content types:

IO.puts("🌐 Sample URLs for Testing")
IO.puts("=" |> String.duplicate(40))
IO.puts("")

sample_urls = [
  {
    "πŸ“Ή Direct Video Files",
    [
      "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4",
      "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4", 
      "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4",
      "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/Sintel.mp4"
    ]
  },
  {
    "🎡 Audio Files",
    [
      "https://www.soundjay.com/misc/sounds/magic-chime-02.mp3",
      "https://www.soundjay.com/buttons/sounds/button-09.mp3"
    ]
  },
  {
    "πŸ“Ί HLS Streams (m3u8)",
    [
      "https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_4x3/bipbop_4x3_variant.m3u8",
      "https://devstreaming-cdn.apple.com/videos/streaming/examples/img_bipbop_adv_example_fmp4/master.m3u8"
    ]
  },
  {
    "🌍 Live News Streams",
    [
      "https://cph-p2p-msl.akamaized.net/hls/live/2000341/test/master.m3u8",
      "https://d2zihajmogu5jn.cloudfront.net/bipbop-advanced/bipbop_16x9_variant.m3u8"
    ]
  }
]

for {category, urls} <- sample_urls do
  IO.puts("#{category}:")
  for url <- urls do
    IO.puts("  β€’ #{url}")
  end
  IO.puts("")
end

IO.puts("πŸ’‘ **Usage Tips:**")
IO.puts("   β€’ Copy any URL above to test different content types")
IO.puts("   β€’ HLS streams (.m3u8) work well for adaptive streaming")
IO.puts("   β€’ Direct video files are good for testing file processing")
IO.puts("   β€’ Live streams demonstrate real-time ingestion")

File Upload and Processing Workflow

Let’s create a file upload workflow (simulated since we can’t actually upload in Livebook):

# File processing configuration
file_form = Kino.Control.form(
  [
    file_type: Kino.Input.select("File Processing Type", [
      {"Local Video File", :local_video},
      {"Remote Video File", :remote_video},
      {"Batch Processing", :batch},
      {"Scheduled Playback", :scheduled}
    ]),
    remote_file_url: Kino.Input.text("Remote File URL (if applicable)", 
      default: "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/WeAreGoingOnBullrun.mp4"),
    processing_mode: Kino.Input.select("Processing Mode", [
      {"Stream Once", :once},
      {"Loop Continuously", :loop},
      {"Scheduled Intervals", :scheduled}
    ]),
    video_quality: Kino.Input.select("Video Quality", [
      {"Original Quality", :original},
      {"HD (1080p)", :hd},
      {"SD (720p)", :sd},
      {"Mobile (480p)", :mobile}
    ])
  ],
  submit: "Configure File Processing"
)
# Process file configuration
file_params = Kino.Control.read(file_form)

IO.puts("🎬 File Processing Configuration")
IO.puts("=" |> String.duplicate(40))
IO.puts("")

case file_params.file_type do
  :local_video ->
    IO.puts("πŸ“ **Local Video File Processing:**")
    IO.puts("   1. Place your video file in a web-accessible location")
    IO.puts("   2. Or use a local HTTP server: `python -m http.server 8000`")
    IO.puts("   3. Use URL like: http://localhost:8000/video.mp4")
    IO.puts("   4. Supported formats: MP4, MOV, AVI, MKV, WebM")
    
  :remote_video ->
    IO.puts("🌐 **Remote Video File Processing:**")
    IO.puts("   β€’ Source URL: #{file_params.remote_file_url}")
    IO.puts("   β€’ Processing Mode: #{file_params.processing_mode}")
    IO.puts("   β€’ Target Quality: #{file_params.video_quality}")
    
    # Create ingress for remote file
    client = Process.get(:client)
    config = Process.get(:config)
    
    metadata = %{
      "source_type" => "remote_file",
      "processing_mode" => to_string(file_params.processing_mode),
      "target_quality" => to_string(file_params.video_quality),
      "file_url" => file_params.remote_file_url
    } |> Jason.encode!()

    request = %Livekit.CreateIngressRequest{
      input_type: :URL_INPUT,
      name: "file-proc-#{System.system_time(:second)}",
      room_name: config.room_name,
      participant_identity: "file-processor-#{System.system_time(:second)}",
      participant_name: "File: #{Path.basename(file_params.remote_file_url)}",
      participant_metadata: metadata,
      url: file_params.remote_file_url,
      enable_transcoding: file_params.video_quality != :original
    }

    case Livekit.IngressServiceClient.create_ingress(client, request) do
      {:ok, ingress} ->
        Process.put(:file_ingress, ingress)
        IO.puts("   βœ… File processing ingress created: #{ingress.ingress_id}")
      {:error, reason} ->
        IO.puts("   ❌ Failed to create file ingress: #{inspect(reason)}")
    end
    
  :batch ->
    IO.puts("πŸ“š **Batch Processing Setup:**")
    IO.puts("   β€’ Create multiple ingress endpoints for different files")
    IO.puts("   β€’ Use scheduling to process files in sequence")
    IO.puts("   β€’ Monitor processing status for each file")
    IO.puts("   β€’ Implement error handling and retry logic")
    
  :scheduled ->
    IO.puts("⏰ **Scheduled Playback Configuration:**")
    IO.puts("   β€’ Set up cron-like scheduling for content delivery")
    IO.puts("   β€’ Use external schedulers to trigger ingress creation")
    IO.puts("   β€’ Implement playlist management")
    IO.puts("   β€’ Consider timezone handling for global audiences")
end

IO.puts("")
IO.puts("πŸ”§ **Advanced File Processing Options:**")
IO.puts("   β€’ Transcoding: Convert between formats automatically")
IO.puts("   β€’ Quality adaptation: Multiple bitrates for different devices")
IO.puts("   β€’ Content analysis: Extract metadata and thumbnails")
IO.puts("   β€’ Error recovery: Automatic retry on processing failures")

FFmpeg Integration for File Processing

Let’s generate FFmpeg commands for advanced file processing:

# FFmpeg file processing generator
ffmpeg_form = Kino.Control.form(
  [
    operation: Kino.Input.select("FFmpeg Operation", [
      {"Stream file to LiveKit", :stream_file},
      {"Convert format before streaming", :convert_stream},
      {"Extract audio from video", :extract_audio},
      {"Create video thumbnail", :thumbnail},
      {"Batch process multiple files", :batch_process}
    ]),
    input_file: Kino.Input.text("Input File Path", default: "/path/to/input/video.mp4"),
    output_format: Kino.Input.select("Output Format", [
      {"Same as input", :same},
      {"MP4 (H.264)", :mp4},
      {"WebM (VP9)", :webm},
      {"FLV (for RTMP)", :flv}
    ])
  ],
  submit: "Generate FFmpeg Command"
)
# Generate FFmpeg commands
ffmpeg_params = Kino.Control.read(ffmpeg_form)
ingress = Process.get(:url_ingress) || Process.get(:file_ingress)

IO.puts("πŸ”§ FFmpeg File Processing Commands")
IO.puts("=" |> String.duplicate(50))
IO.puts("")

case ffmpeg_params.operation do
  :stream_file ->
    if ingress &amp;&amp; ingress.input_type == :URL_INPUT do
      # For URL ingress, we can stream directly
      command = """
      ffmpeg -re -i "#{ffmpeg_params.input_file}" \\
        -c:v libx264 -preset veryfast -tune zerolatency \\
        -c:a aac -ar 48000 \\
        -f flv "#{ingress.url}"
      """
      
      IO.puts("πŸ“Ί **Stream File to LiveKit URL Ingress:**")
      IO.puts("```bash")
      IO.puts(command)
      IO.puts("```")
      
    else
      IO.puts("⚠️  Create a URL ingress endpoint first")
    end
    
  :convert_stream ->
    format_settings = case ffmpeg_params.output_format do
      :mp4 -> "-c:v libx264 -c:a aac -f mp4"
      :webm -> "-c:v libvpx-vp9 -c:a libvorbis -f webm"
      :flv -> "-c:v libx264 -c:a aac -f flv"
      :same -> "-c copy"
    end
    
    command = """
    ffmpeg -i "#{ffmpeg_params.input_file}" \\
      #{format_settings} \\
      -movflags +faststart \\
      "output.#{ffmpeg_params.output_format}"
    """
    
    IO.puts("πŸ”„ **Convert File Format:**")
    IO.puts("```bash")
    IO.puts(command)
    IO.puts("```")
    
  :extract_audio ->
    command = """
    ffmpeg -i "#{ffmpeg_params.input_file}" \\
      -vn -c:a aac -b:a 192k \\
      "#{Path.rootname(ffmpeg_params.input_file)}.aac"
    """
    
    IO.puts("🎡 **Extract Audio from Video:**")
    IO.puts("```bash")
    IO.puts(command)
    IO.puts("```")
    
  :thumbnail ->
    command = """
    ffmpeg -i "#{ffmpeg_params.input_file}" \\
      -vf "thumbnail,scale=320:240" \\
      -frames:v 1 \\
      "#{Path.rootname(ffmpeg_params.input_file)}_thumb.jpg"
    """
    
    IO.puts("πŸ“Έ **Generate Video Thumbnail:**")
    IO.puts("```bash")
    IO.puts(command)
    IO.puts("```")
    
  :batch_process ->
    IO.puts("πŸ“š **Batch Process Multiple Files:**")
    IO.puts("```bash")
    IO.puts("# Process all MP4 files in a directory")
    IO.puts("for file in *.mp4; do")
    IO.puts("    echo \"Processing: $file\"")
    IO.puts("    ffmpeg -i \"$file\" \\")
    IO.puts("      -c:v libx264 -preset fast \\")
    IO.puts("      -c:a aac -b:a 192k \\")
    IO.puts("      \"processed_${file}\"")
    IO.puts("done")
    IO.puts("```")
end

IO.puts("")
IO.puts("πŸ’‘ **FFmpeg Tips for File Processing:**")
IO.puts("   β€’ Use -re flag to read input at native frame rate")
IO.puts("   β€’ Add -threads 0 for automatic thread optimization")
IO.puts("   β€’ Use -preset fast/medium/slow for encoding speed vs quality")
IO.puts("   β€’ Monitor with -progress for long operations")
IO.puts("   β€’ Use -y flag to overwrite output files automatically")

Content Validation and Analysis

Let’s create tools to validate and analyze content before processing:

# Content analysis module
defmodule ContentAnalyzer do
  def analyze_url(url) do
    try do
      case Req.head(url, connect_options: [timeout: 5000]) do
        {:ok, %{status: 200, headers: headers}} ->
          content_type = get_header(headers, "content-type")
          content_length = get_header(headers, "content-length")
          
          %{
            valid: true,
            content_type: content_type,
            content_length: content_length,
            size_mb: if(content_length, do: String.to_integer(content_length) / 1_048_576, else: nil),
            media_type: classify_media_type(content_type)
          }
          
        {:ok, %{status: status}} ->
          %{valid: false, error: "HTTP #{status}"}
          
        {:error, reason} ->
          %{valid: false, error: inspect(reason)}
      end
    rescue
      e -> %{valid: false, error: "Request failed: #{inspect(e)}"}
    end
  end
  
  defp get_header(headers, name) do
    case Enum.find(headers, fn {key, _} -> String.downcase(key) == name end) do
      {_, value} -> value
      nil -> nil
    end
  end
  
  defp classify_media_type(content_type) when is_binary(content_type) do
    cond do
      String.contains?(content_type, "video/") -> :video
      String.contains?(content_type, "audio/") -> :audio
      String.contains?(content_type, "application/vnd.apple.mpegurl") -> :hls
      String.contains?(content_type, "application/dash+xml") -> :dash
      true -> :unknown
    end
  end
  
  defp classify_media_type(_), do: :unknown
end

# Content validation form
validation_form = Kino.Control.form(
  [
    url_to_analyze: Kino.Input.text("URL to Analyze", 
      default: "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4")
  ],
  submit: "Analyze Content"
)
# Perform content analysis
validation_params = Kino.Control.read(validation_form)

IO.puts("πŸ” Content Analysis Results")
IO.puts("=" |> String.duplicate(40))
IO.puts("")

analysis = ContentAnalyzer.analyze_url(validation_params.url_to_analyze)

if analysis.valid do
  IO.puts("βœ… **Content is accessible and valid**")
  IO.puts("   πŸ“„ Content Type: #{analysis.content_type}")
  IO.puts("   🎬 Media Type: #{analysis.media_type}")
  
  if analysis.content_length do
    IO.puts("   πŸ“ Size: #{analysis.content_length} bytes")
    if analysis.size_mb do
      IO.puts("   πŸ’Ύ Size: #{Float.round(analysis.size_mb, 2)} MB")
    end
  end
  
  IO.puts("")
  
  # Provide recommendations based on analysis
  case analysis.media_type do
    :video ->
      IO.puts("πŸ“Ή **Video Content Recommendations:**")
      IO.puts("   β€’ Good for URL ingress streaming")
      IO.puts("   β€’ Consider transcoding for mobile compatibility")
      IO.puts("   β€’ Monitor bandwidth usage for large files")
      
    :audio ->
      IO.puts("🎡 **Audio Content Recommendations:**")
      IO.puts("   β€’ Perfect for audio-only streaming")
      IO.puts("   β€’ Lower bandwidth requirements")
      IO.puts("   β€’ Consider adding static image for visual element")
      
    :hls ->
      IO.puts("πŸ“Ί **HLS Stream Recommendations:**")
      IO.puts("   β€’ Excellent for adaptive streaming")
      IO.puts("   β€’ Built-in quality adaptation")
      IO.puts("   β€’ Good for live content ingestion")
      
    :dash ->
      IO.puts("🎯 **DASH Stream Recommendations:**")
      IO.puts("   β€’ Modern adaptive streaming format")
      IO.puts("   β€’ Supports multiple codecs")
      IO.puts("   β€’ Check browser compatibility")
      
    _ ->
      IO.puts("❓ **Unknown Content Type:**")
      IO.puts("   β€’ Verify the URL is a valid media stream")
      IO.puts("   β€’ Check if authentication is required")
      IO.puts("   β€’ Test with direct browser access")
  end
  
else
  IO.puts("❌ **Content analysis failed**")
  IO.puts("   Error: #{analysis.error}")
  IO.puts("")
  IO.puts("πŸ› οΈ  **Troubleshooting suggestions:**")
  IO.puts("   β€’ Check if the URL is publicly accessible")
  IO.puts("   β€’ Verify the URL format is correct")
  IO.puts("   β€’ Test the URL in a browser first")
  IO.puts("   β€’ Check if authentication is required")
  IO.puts("   β€’ Ensure the server supports HEAD requests")
end

Batch Processing Workflow

Let’s create a batch processing system for multiple files:

# Batch processing configuration
batch_form = Kino.Control.form(
  [
    batch_urls: Kino.Input.textarea("URLs to Process (one per line)", default: """
https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4
https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4
https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/Sintel.mp4
"""),
    batch_mode: Kino.Input.select("Processing Mode", [
      {"Sequential (one at a time)", :sequential},
      {"Parallel (all at once)", :parallel},
      {"Scheduled (with delays)", :scheduled}
    ]),
    delay_seconds: Kino.Input.number("Delay between files (seconds)", default: 30)
  ],
  submit: "Start Batch Processing"
)
# Execute batch processing
batch_params = Kino.Control.read(batch_form)
client = Process.get(:client)
config = Process.get(:config)

# Parse URLs
urls = batch_params.batch_urls
  |> String.split("\n")
  |> Enum.map(&amp;String.trim/1)
  |> Enum.reject(&amp;(&amp;1 == ""))

IO.puts("πŸš€ Batch Processing Configuration")
IO.puts("=" |> String.duplicate(40))
IO.puts("   πŸ“ Files to process: #{length(urls)}")
IO.puts("   πŸ”„ Processing mode: #{batch_params.batch_mode}")
if batch_params.batch_mode == :scheduled do
  IO.puts("   ⏰ Delay between files: #{batch_params.delay_seconds} seconds")
end
IO.puts("")

case batch_params.batch_mode do
  :sequential ->
    IO.puts("πŸ“‹ **Sequential Processing Results:**")
    
    created_ingresses = for {url, index} <- Enum.with_index(urls, 1) do
      IO.puts("   #{index}. Processing: #{Path.basename(url)}")
      
      metadata = %{
        "batch_processing" => true,
        "batch_index" => index,
        "batch_total" => length(urls),
        "source_url" => url
      } |> Jason.encode!()

      request = %Livekit.CreateIngressRequest{
        input_type: :URL_INPUT,
        name: "batch-#{index}-#{System.system_time(:second)}",
        room_name: config.room_name,
        participant_identity: "batch-processor-#{index}",
        participant_name: "Batch #{index}: #{Path.basename(url)}",
        participant_metadata: metadata,
        url: url,
        enable_transcoding: true
      }

      case Livekit.IngressServiceClient.create_ingress(client, request) do
        {:ok, ingress} ->
          IO.puts("      βœ… Created: #{ingress.ingress_id}")
          ingress
        {:error, reason} ->
          IO.puts("      ❌ Failed: #{inspect(reason)}")
          nil
      end
    end
    
    successful = Enum.count(created_ingresses, &amp; &amp;1)
    IO.puts("")
    IO.puts("πŸ“Š **Batch Results:** #{successful}/#{length(urls)} successful")
    Process.put(:batch_ingresses, Enum.reject(created_ingresses, &amp;is_nil/1))
    
  :parallel ->
    IO.puts("⚑ **Parallel Processing (Creating all ingresses simultaneously):**")
    
    tasks = for {url, index} <- Enum.with_index(urls, 1) do
      Task.async(fn ->
        metadata = %{
          "batch_processing" => true,
          "batch_index" => index,
          "batch_total" => length(urls),
          "source_url" => url,
          "processing_mode" => "parallel"
        } |> Jason.encode!()

        request = %Livekit.CreateIngressRequest{
          input_type: :URL_INPUT,
          name: "parallel-#{index}-#{System.system_time(:second)}",
          room_name: config.room_name,
          participant_identity: "parallel-processor-#{index}",
          participant_name: "Parallel #{index}: #{Path.basename(url)}",
          participant_metadata: metadata,
          url: url,
          enable_transcoding: true
        }

        case Livekit.IngressServiceClient.create_ingress(client, request) do
          {:ok, ingress} ->
            {index, :success, ingress}
          {:error, reason} ->
            {index, :error, reason}
        end
      end)
    end
    
    results = Task.await_many(tasks, 30_000) # 30 second timeout
    
    for {index, status, result} <- results do
      case status do
        :success ->
          IO.puts("   #{index}. βœ… #{result.name} (#{result.ingress_id})")
        :error ->
          IO.puts("   #{index}. ❌ Failed: #{inspect(result)}")
      end
    end
    
    successful_ingresses = for {_index, :success, ingress} <- results, do: ingress
    IO.puts("")
    IO.puts("πŸ“Š **Parallel Results:** #{length(successful_ingresses)}/#{length(urls)} successful")
    Process.put(:batch_ingresses, successful_ingresses)
    
  :scheduled ->
    IO.puts("⏰ **Scheduled Processing (with delays):**")
    IO.puts("   This would be implemented with a scheduler in a real application.")
    IO.puts("   For demonstration, showing the concept:")
    IO.puts("")
    
    for {url, index} <- Enum.with_index(urls, 1) do
      scheduled_time = DateTime.add(DateTime.utc_now(), (index - 1) * batch_params.delay_seconds, :second)
      IO.puts("   #{index}. #{Path.basename(url)}")
      IO.puts("      ⏰ Scheduled for: #{DateTime.to_string(scheduled_time)}")
      IO.puts("      πŸ”— URL: #{url}")
      IO.puts("")
    end
    
    IO.puts("πŸ’‘ **Scheduled Processing Implementation:**")
    IO.puts("   β€’ Use Elixir's Process.send_after/3 for delays")
    IO.puts("   β€’ Implement with GenServer for state management")
    IO.puts("   β€’ Add error recovery and retry logic")
    IO.puts("   β€’ Monitor processing status and completion")
end

Stream Status Monitoring

Let’s create a comprehensive monitoring system for file processing:

defmodule FileProcessingMonitor do
  def monitor_batch_ingresses(client, ingresses) do
    results = for ingress <- ingresses do
      case Livekit.IngressServiceClient.list_ingress(client) do
        {:ok, response} ->
          current = Enum.find(response.items, fn i -> i.ingress_id == ingress.ingress_id end)
          
          if current do
            %{
              ingress_id: ingress.ingress_id,
              name: ingress.name,
              status: current.state &amp;&amp; current.state.status,
              tracks: current.state &amp;&amp; current.state.tracks || [],
              error: current.state &amp;&amp; current.state.error
            }
          else
            %{ingress_id: ingress.ingress_id, name: ingress.name, status: :not_found}
          end
          
        {:error, reason} ->
          %{ingress_id: ingress.ingress_id, name: ingress.name, status: :error, error: reason}
      end
    end
    
    results
  end
  
  def format_status_summary(monitors) do
    status_counts = monitors
    |> Enum.group_by(&amp; &amp;1.status)
    |> Enum.map(fn {status, items} -> {status, length(items)} end)
    |> Map.new()
    
    total = length(monitors)
    
    %{
      total: total,
      counts: status_counts,
      active: Map.get(status_counts, :ENDPOINT_PUBLISHING, 0),
      inactive: Map.get(status_counts, :ENDPOINT_INACTIVE, 0),
      errors: Map.get(status_counts, :ENDPOINT_ERROR, 0)
    }
  end
end

# Create monitoring display
monitoring_frame = Kino.Frame.new()

# Start batch monitoring task
batch_monitor_task = Task.async(fn ->
  batch_ingresses = Process.get(:batch_ingresses) || []
  client = Process.get(:client)
  
  if !Enum.empty?(batch_ingresses) &amp;&amp; client do
    Enum.each(1..24, fn iteration -> # Monitor for 2 minutes (24 * 5 seconds)
      timestamp = DateTime.utc_now() |> DateTime.to_string()
      
      monitor_results = FileProcessingMonitor.monitor_batch_ingresses(client, batch_ingresses)
      summary = FileProcessingMonitor.format_status_summary(monitor_results)
      
      content = """
      ## πŸ“Š File Processing Monitor (Update ##{iteration})
      **Last Update:** #{timestamp}
      **Total Files:** #{summary.total}
      
      ### πŸ“ˆ Status Summary:
      - 🟒 **Publishing:** #{summary.active} files
      - πŸ”΄ **Inactive:** #{summary.inactive} files
      - ❌ **Errors:** #{summary.errors} files
      
      ### πŸ“‹ Detailed Status:
      #{for monitor <- monitor_results do
        status_emoji = case monitor.status do
          :ENDPOINT_PUBLISHING -> "🟒"
          :ENDPOINT_INACTIVE -> "πŸ”΄"
          :ENDPOINT_BUFFERING -> "🟑"
          :ENDPOINT_ERROR -> "❌"
          :ENDPOINT_COMPLETE -> "βœ…"
          _ -> "βšͺ"
        end
        
        track_info = if length(monitor.tracks) > 0 do
          " (#{length(monitor.tracks)} tracks)"
        else
          ""
        end
        
        error_info = if monitor.error &amp;&amp; monitor.error != "" do
          " - Error: #{monitor.error}"
        else
          ""
        end
        
        "- #{status_emoji} **#{monitor.name}**#{track_info}#{error_info}"
      end |> Enum.join("\n")}
      
      ### πŸ’‘ Tips:
      - Files may take 10-30 seconds to start publishing
      - Large files require more processing time
      - Check source URLs if files remain inactive
      - Monitor network connectivity for streaming files
      """
      
      Kino.Frame.render(monitoring_frame, Kino.Markdown.new(content))
      Process.sleep(5000) # Update every 5 seconds
    end)
    
    # Final monitoring summary
    final_results = FileProcessingMonitor.monitor_batch_ingresses(client, batch_ingresses)
    final_summary = FileProcessingMonitor.format_status_summary(final_results)
    
    Kino.Frame.render(monitoring_frame, Kino.Markdown.new("""
    ## πŸ“Š Final Batch Processing Summary
    
    **Processing Complete!**
    
    ### πŸ“ˆ Final Results:
    - 🟒 **Publishing:** #{final_summary.active} files
    - πŸ”΄ **Inactive:** #{final_summary.inactive} files  
    - ❌ **Errors:** #{final_summary.errors} files
    
    **Success Rate:** #{if final_summary.total > 0, do: Float.round(final_summary.active / final_summary.total * 100, 1), else: 0}%
    
    Re-run this cell to start a new monitoring session.
    """))
  else
    Kino.Frame.render(monitoring_frame, Kino.Markdown.new("""
    ## ⚠️  Batch Monitoring Not Available
    Please create some batch ingress endpoints first using the batch processing section above.
    """))
  end
end)

monitoring_frame

Advanced File Processing Techniques

IO.puts("πŸ”¬ Advanced File Processing Techniques")
IO.puts("=" |> String.duplicate(50))
IO.puts("")

IO.puts("🎬 **Video Processing Optimizations:**")
IO.puts("")

IO.puts("1️⃣ **Multi-bitrate Streaming**")
IO.puts("   β€’ Generate multiple quality versions")
IO.puts("   β€’ Adaptive bitrate based on client capability")
IO.puts("   β€’ Reduce bandwidth usage for mobile users")
IO.puts("")
IO.puts("   FFmpeg Command:")
IO.puts("   ```bash")
IO.puts("   ffmpeg -i input.mp4 \\")
IO.puts("     -map 0:v -map 0:a -map 0:v -map 0:a \\")
IO.puts("     -c:v:0 libx264 -b:v:0 4000k -s:v:0 1920x1080 \\")
IO.puts("     -c:v:1 libx264 -b:v:1 2000k -s:v:1 1280x720 \\")
IO.puts("     -c:a:0 aac -b:a:0 192k \\")
IO.puts("     -c:a:1 aac -b:a:1 128k \\")
IO.puts("     -f hls -hls_time 4 -hls_list_size 0 \\")
IO.puts("     -var_stream_map \"v:0,a:0 v:1,a:1\" \\")
IO.puts("     -master_pl_name master.m3u8 \\")
IO.puts("     stream_%v.m3u8")
IO.puts("   ```")
IO.puts("")

IO.puts("2️⃣ **Content Analysis and Metadata Extraction**")
IO.puts("   β€’ Extract video duration, resolution, bitrate")
IO.puts("   β€’ Generate thumbnails at specific timestamps")
IO.puts("   β€’ Analyze audio levels and quality")
IO.puts("   β€’ Extract subtitles and closed captions")
IO.puts("")
IO.puts("   FFmpeg Commands:")
IO.puts("   ```bash")
IO.puts("   # Get video information")
IO.puts("   ffprobe -v quiet -print_format json -show_format -show_streams input.mp4")
IO.puts("")
IO.puts("   # Generate thumbnails every 10 seconds")
IO.puts("   ffmpeg -i input.mp4 -vf fps=1/10 thumb_%03d.jpg")
IO.puts("")
IO.puts("   # Extract audio waveform")
IO.puts("   ffmpeg -i input.mp4 -filter_complex showwavespic=s=1920x1080 waveform.png")
IO.puts("   ```")
IO.puts("")

IO.puts("3️⃣ **Automated Quality Control**")
IO.puts("   β€’ Detect corrupted or incomplete files")
IO.puts("   β€’ Validate audio/video sync")
IO.puts("   β€’ Check for proper encoding parameters")
IO.puts("   β€’ Ensure files meet streaming requirements")
IO.puts("")

IO.puts("4️⃣ **Content Security and DRM**")
IO.puts("   β€’ Encrypt video content for secure delivery")
IO.puts("   β€’ Implement token-based access control")
IO.puts("   β€’ Watermark videos for copyright protection")
IO.puts("   β€’ Geographic content restrictions")
IO.puts("")

IO.puts("5️⃣ **Performance Optimization**")
IO.puts("   β€’ Use hardware acceleration (GPU encoding)")
IO.puts("   β€’ Parallel processing for batch operations")
IO.puts("   β€’ Caching frequently accessed content")
IO.puts("   β€’ Content Delivery Network (CDN) integration")

Troubleshooting Common Issues

IO.puts("πŸ› οΈ  File Processing Troubleshooting Guide")
IO.puts("=" |> String.duplicate(50))
IO.puts("")

IO.puts("❌ **Common Problems and Solutions:**")
IO.puts("")

IO.puts("1️⃣ **File Not Found / Access Denied**")
IO.puts("   β€’ Verify URL is publicly accessible")
IO.puts("   β€’ Check for authentication requirements")
IO.puts("   β€’ Test URL in browser first")
IO.puts("   β€’ Ensure proper CORS headers for web content")
IO.puts("")

IO.puts("2️⃣ **Unsupported File Format**")
IO.puts("   β€’ Convert to supported formats (MP4, WebM, etc.)")
IO.puts("   β€’ Check codec compatibility")
IO.puts("   β€’ Use FFmpeg for format conversion")
IO.puts("   β€’ Verify container format vs codec format")
IO.puts("")

IO.puts("3️⃣ **Large File Processing Issues**")
IO.puts("   β€’ Break large files into segments")
IO.puts("   β€’ Use progressive download")
IO.puts("   β€’ Implement resume capability")
IO.puts("   β€’ Monitor memory usage during processing")
IO.puts("")

IO.puts("4️⃣ **Streaming Performance Problems**")
IO.puts("   β€’ Reduce bitrate for better streaming")
IO.puts("   β€’ Use adaptive streaming (HLS/DASH)")
IO.puts("   β€’ Implement proper buffering")
IO.puts("   β€’ Monitor network bandwidth")
IO.puts("")

IO.puts("5️⃣ **Audio/Video Sync Issues**")
IO.puts("   β€’ Check source file for sync problems")
IO.puts("   β€’ Use FFmpeg to fix timing issues")
IO.puts("   β€’ Verify frame rate consistency")
IO.puts("   β€’ Check audio sample rate compatibility")
IO.puts("")

IO.puts("πŸ”§ **Diagnostic Commands:**")
IO.puts("")

IO.puts("**Test file accessibility:**")
IO.puts("```bash")
IO.puts("curl -I \"https://example.com/video.mp4\"")
IO.puts("```")
IO.puts("")

IO.puts("**Analyze file properties:**") 
IO.puts("```bash")
IO.puts("ffprobe -v quiet -print_format json -show_format -show_streams file.mp4")
IO.puts("```")
IO.puts("")

IO.puts("**Test streaming performance:**")
IO.puts("```bash")
IO.puts("ffmpeg -i file.mp4 -f null - 2>&1 | grep fps")
IO.puts("```")
IO.puts("")

IO.puts("**Check audio/video sync:**")
IO.puts("```bash")
IO.puts("ffmpeg -i file.mp4 -vf \"showinfo\" -f null - 2>&1 | grep pts_time")
IO.puts("```")

Cleanup and Resource Management

# File processing cleanup
cleanup_form = Kino.Control.form(
  [
    cleanup_action: Kino.Input.select("Cleanup Action", [
      {"List all URL/file ingress endpoints", :list_url},
      {"Delete batch processing endpoints", :delete_batch},
      {"Delete all file processing endpoints", :delete_all_files},
      {"Delete specific ingress by ID", :delete_specific}
    ]),
    specific_id: Kino.Input.text("Ingress ID (for specific deletion)", default: "")
  ],
  submit: "Execute Cleanup"
)
# Execute cleanup operations
cleanup_params = Kino.Control.read(cleanup_form)
client = Process.get(:client)

case cleanup_params.cleanup_action do
  :list_url ->
    case Livekit.IngressServiceClient.list_ingress(client) do
      {:ok, response} ->
        url_ingresses = Enum.filter(response.items, fn ingress ->
          ingress.input_type == :URL_INPUT
        end)
        
        IO.puts("πŸ“ URL/File Ingress Endpoints (#{length(url_ingresses)} total):")
        for ingress <- url_ingresses do
          status_emoji = case ingress.state &amp;&amp; ingress.state.status do
            :ENDPOINT_PUBLISHING -> "🟒"
            :ENDPOINT_INACTIVE -> "πŸ”΄"
            :ENDPOINT_BUFFERING -> "🟑"
            :ENDPOINT_ERROR -> "❌"
            _ -> "βšͺ"
          end
          
          IO.puts("  #{status_emoji} #{ingress.name}")
          IO.puts("     ID: #{ingress.ingress_id}")
          IO.puts("     URL: #{String.slice(ingress.url, 0, 60)}...")
          IO.puts("     Room: #{ingress.room_name}")
          IO.puts("")
        end
        
      {:error, reason} ->
        IO.puts("❌ Failed to list ingress: #{inspect(reason)}")
    end
    
  :delete_batch ->
    batch_ingresses = Process.get(:batch_ingresses) || []
    
    if Enum.empty?(batch_ingresses) do
      IO.puts("βšͺ No batch ingress endpoints to clean up")
    else
      IO.puts("🧹 Cleaning up #{length(batch_ingresses)} batch endpoints...")
      
      for ingress <- batch_ingresses do
        request = %Livekit.DeleteIngressRequest{ingress_id: ingress.ingress_id}
        case Livekit.IngressServiceClient.delete_ingress(client, request) do
          {:ok, _} ->
            IO.puts("  βœ… Deleted: #{ingress.name}")
          {:error, reason} ->
            IO.puts("  ❌ Failed to delete #{ingress.name}: #{inspect(reason)}")
        end
      end
      
      Process.delete(:batch_ingresses)
      IO.puts("✨ Batch cleanup complete")
    end
    
  :delete_all_files ->
    case Livekit.IngressServiceClient.list_ingress(client) do
      {:ok, response} ->
        file_ingresses = Enum.filter(response.items, fn ingress ->
          ingress.input_type == :URL_INPUT &amp;&amp; 
          (String.contains?(ingress.name, "file") || 
           String.contains?(ingress.name, "batch") ||
           String.contains?(ingress.name, "url"))
        end)
        
        if Enum.empty?(file_ingresses) do
          IO.puts("✨ No file processing endpoints to clean up")
        else
          IO.puts("🧹 Cleaning up #{length(file_ingresses)} file processing endpoints...")
          
          for ingress <- file_ingresses do
            request = %Livekit.DeleteIngressRequest{ingress_id: ingress.ingress_id}
            case Livekit.IngressServiceClient.delete_ingress(client, request) do
              {:ok, _} ->
                IO.puts("  βœ… Deleted: #{ingress.name}")
              {:error, reason} ->
                IO.puts("  ❌ Failed to delete #{ingress.name}: #{inspect(reason)}")
            end
          end
        end
        
      {:error, reason} ->
        IO.puts("❌ Failed to cleanup: #{inspect(reason)}")
    end
    
  :delete_specific ->
    if cleanup_params.specific_id != "" do
      request = %Livekit.DeleteIngressRequest{ingress_id: cleanup_params.specific_id}
      case Livekit.IngressServiceClient.delete_ingress(client, request) do
        {:ok, deleted} ->
          IO.puts("βœ… Deleted ingress: #{deleted.name} (#{deleted.ingress_id})")
        {:error, reason} ->
          IO.puts("❌ Failed to delete: #{inspect(reason)}")
      end
    else
      IO.puts("⚠️  Please provide an ingress ID to delete")
    end
end

Summary and Best Practices

IO.puts("πŸŽ‰ File Processing Tutorial Complete!")
IO.puts("=" |> String.duplicate(50))
IO.puts("")

IO.puts("βœ… **What You've Accomplished:**")
IO.puts("   β€’ Created URL ingress endpoints for various content types")
IO.puts("   β€’ Analyzed and validated remote content")
IO.puts("   β€’ Generated FFmpeg commands for file processing")
IO.puts("   β€’ Implemented batch processing workflows")
IO.puts("   β€’ Set up comprehensive monitoring systems")
IO.puts("   β€’ Learned troubleshooting techniques")
IO.puts("")

IO.puts("πŸ“‹ **Best Practices for File Processing:**")
IO.puts("")

IO.puts("1️⃣ **Content Validation**")
IO.puts("   β€’ Always validate URLs before creating ingress")
IO.puts("   β€’ Check file formats and codec compatibility")
IO.puts("   β€’ Test accessibility and authentication requirements")
IO.puts("   β€’ Verify content length and quality")
IO.puts("")

IO.puts("2️⃣ **Processing Optimization**")
IO.puts("   β€’ Use appropriate transcoding settings")
IO.puts("   β€’ Implement adaptive bitrate for different devices")
IO.puts("   β€’ Monitor processing performance and errors")
IO.puts("   β€’ Consider hardware acceleration for large volumes")
IO.puts("")

IO.puts("3️⃣ **Batch Operations**")
IO.puts("   β€’ Process files in manageable batches")
IO.puts("   β€’ Implement proper error handling and retries")
IO.puts("   β€’ Monitor system resources during batch processing")
IO.puts("   β€’ Use scheduling for non-urgent processing")
IO.puts("")

IO.puts("4️⃣ **Monitoring and Maintenance**")
IO.puts("   β€’ Monitor ingress endpoint status regularly")
IO.puts("   β€’ Clean up completed or failed endpoints")
IO.puts("   β€’ Log processing metrics for analysis")
IO.puts("   β€’ Set up alerts for processing failures")
IO.puts("")

IO.puts("πŸš€ **Recommended Next Steps:**")
IO.puts("   1. Try the Ingress Management Livebook for automation")
IO.puts("   2. Explore the Troubleshooting Livebook for advanced debugging")
IO.puts("   3. Implement custom file processing workflows")
IO.puts("   4. Set up monitoring dashboards")
IO.puts("   5. Create automated content delivery pipelines")
IO.puts("")

IO.puts("πŸ“š **Additional Resources:**")
IO.puts("   β€’ FFmpeg Documentation: https://ffmpeg.org/documentation.html")
IO.puts("   β€’ HLS Specification: https://tools.ietf.org/html/rfc8216")
IO.puts("   β€’ DASH Standard: https://dashif.org/")
IO.puts("   β€’ LiveKit Ingress Guide: https://docs.livekit.io/ingress/")