LiveKit Ingress Service - File Processing & URL Streams
Mix.install([
{:livekit, path: "../.."},
{:kino, "~> 0.12"},
{:jason, "~> 1.4"},
{:req, "~> 0.4.0"}
])
Introduction to File & URL Ingress
This Livebook demonstrates how to use LiveKitβs Ingress Service for file-based and URL-based streaming. This includes:
- πΉ Video file ingestion (MP4, MOV, AVI, etc.)
- π΅ Audio file processing (MP3, WAV, AAC, etc.)
- π URL stream ingestion (HLS, DASH, direct streams)
- πΊ Live stream re-streaming from other platforms
- π Batch processing workflows for multiple files
- β° Scheduled content delivery
What Youβll Learn:
- π§ Setting up URL ingress endpoints
- π Processing local and remote video files
- π¬ Creating automated content workflows
- π Monitoring file processing progress
- π οΈ Troubleshooting file format issues
- β‘ Optimizing for different content types
Configuration & Client Setup
# LiveKit configuration
config_form = Kino.Control.form(
[
api_key: Kino.Input.password("LiveKit API Key"),
api_secret: Kino.Input.password("LiveKit API Secret"),
url: Kino.Input.text("LiveKit Server URL", default: "wss://your-server.livekit.cloud"),
room_name: Kino.Input.text("Target Room Name", default: "file-processing-room")
],
submit: "Connect to LiveKit"
)
# Establish connection
config = Kino.Control.read(config_form)
Process.put(:config, config)
case Livekit.IngressServiceClient.new(config.url, config.api_key, config.api_secret) do
{:ok, client} ->
Process.put(:client, client)
IO.puts("β
Connected to LiveKit Ingress Service!")
IO.puts("π― Target room: #{config.room_name}")
{:error, reason} ->
IO.puts("β Connection failed: #{reason}")
IO.puts("Please verify your credentials and server URL.")
end
URL Stream Ingress
Letβs start by creating URL-based ingress endpoints for various stream types:
# URL ingress configuration form
url_form = Kino.Control.form(
[
stream_name: Kino.Input.text("Stream Name", default: "url-stream-#{System.system_time(:second)}"),
source_url: Kino.Input.text("Source URL", default: "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"),
url_type: Kino.Input.select("URL Type", [
{"Direct Video File (MP4/MOV)", :direct_video},
{"HLS Stream (m3u8)", :hls},
{"DASH Stream", :dash},
{"YouTube Live Stream", :youtube},
{"Custom RTMP Stream", :rtmp_url}
]),
participant_identity: Kino.Input.text("Participant Identity", default: "url-streamer"),
loop_content: Kino.Input.checkbox("Loop Content", default: false)
],
submit: "Create URL Ingress"
)
# Create URL ingress endpoint
url_params = Kino.Control.read(url_form)
client = Process.get(:client)
config = Process.get(:config)
# Validate URL
url_valid = case url_params.url_type do
:direct_video -> String.ends_with?(url_params.source_url, [".mp4", ".mov", ".avi", ".mkv"])
:hls -> String.ends_with?(url_params.source_url, ".m3u8")
:dash -> String.contains?(url_params.source_url, ".mpd")
:youtube -> String.contains?(url_params.source_url, "youtube.com") || String.contains?(url_params.source_url, "youtu.be")
:rtmp_url -> String.starts_with?(url_params.source_url, "rtmp://")
_ -> true
end
if url_valid do
# Add metadata based on URL type
metadata = %{
"source_type" => "url_ingress",
"url_type" => to_string(url_params.url_type),
"source_url" => url_params.source_url,
"loop_enabled" => url_params.loop_content,
"processing_mode" => "streaming"
} |> Jason.encode!()
request = %Livekit.CreateIngressRequest{
input_type: :URL_INPUT,
name: url_params.stream_name,
room_name: config.room_name,
participant_identity: url_params.participant_identity,
participant_name: "URL Stream: #{url_params.url_type}",
participant_metadata: metadata,
url: url_params.source_url,
enable_transcoding: true
}
case Livekit.IngressServiceClient.create_ingress(client, request) do
{:ok, ingress} ->
Process.put(:url_ingress, ingress)
IO.puts("π URL Ingress Created Successfully!")
IO.puts("=" |> String.duplicate(50))
IO.puts("π‘ Ingress ID: #{ingress.ingress_id}")
IO.puts("π·οΈ Name: #{ingress.name}")
IO.puts("π Source URL: #{ingress.url}")
IO.puts("π Room: #{ingress.room_name}")
IO.puts("π€ Participant: #{ingress.participant_identity}")
IO.puts("π Display Name: #{ingress.participant_name}")
IO.puts("βοΈ Transcoding: #{ingress.enable_transcoding}")
IO.puts("π Type: #{url_params.url_type}")
IO.puts("=" |> String.duplicate(50))
{:error, reason} ->
IO.puts("β Failed to create URL ingress: #{inspect(reason)}")
end
else
IO.puts("β οΈ Invalid URL for selected type. Please check the URL format.")
end
Popular Streaming URLs for Testing
Letβs provide some sample URLs for testing different content types:
IO.puts("π Sample URLs for Testing")
IO.puts("=" |> String.duplicate(40))
IO.puts("")
sample_urls = [
{
"πΉ Direct Video Files",
[
"https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4",
"https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4",
"https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4",
"https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/Sintel.mp4"
]
},
{
"π΅ Audio Files",
[
"https://www.soundjay.com/misc/sounds/magic-chime-02.mp3",
"https://www.soundjay.com/buttons/sounds/button-09.mp3"
]
},
{
"πΊ HLS Streams (m3u8)",
[
"https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_4x3/bipbop_4x3_variant.m3u8",
"https://devstreaming-cdn.apple.com/videos/streaming/examples/img_bipbop_adv_example_fmp4/master.m3u8"
]
},
{
"π Live News Streams",
[
"https://cph-p2p-msl.akamaized.net/hls/live/2000341/test/master.m3u8",
"https://d2zihajmogu5jn.cloudfront.net/bipbop-advanced/bipbop_16x9_variant.m3u8"
]
}
]
for {category, urls} <- sample_urls do
IO.puts("#{category}:")
for url <- urls do
IO.puts(" β’ #{url}")
end
IO.puts("")
end
IO.puts("π‘ **Usage Tips:**")
IO.puts(" β’ Copy any URL above to test different content types")
IO.puts(" β’ HLS streams (.m3u8) work well for adaptive streaming")
IO.puts(" β’ Direct video files are good for testing file processing")
IO.puts(" β’ Live streams demonstrate real-time ingestion")
File Upload and Processing Workflow
Letβs create a file upload workflow (simulated since we canβt actually upload in Livebook):
# File processing configuration
file_form = Kino.Control.form(
[
file_type: Kino.Input.select("File Processing Type", [
{"Local Video File", :local_video},
{"Remote Video File", :remote_video},
{"Batch Processing", :batch},
{"Scheduled Playback", :scheduled}
]),
remote_file_url: Kino.Input.text("Remote File URL (if applicable)",
default: "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/WeAreGoingOnBullrun.mp4"),
processing_mode: Kino.Input.select("Processing Mode", [
{"Stream Once", :once},
{"Loop Continuously", :loop},
{"Scheduled Intervals", :scheduled}
]),
video_quality: Kino.Input.select("Video Quality", [
{"Original Quality", :original},
{"HD (1080p)", :hd},
{"SD (720p)", :sd},
{"Mobile (480p)", :mobile}
])
],
submit: "Configure File Processing"
)
# Process file configuration
file_params = Kino.Control.read(file_form)
IO.puts("π¬ File Processing Configuration")
IO.puts("=" |> String.duplicate(40))
IO.puts("")
case file_params.file_type do
:local_video ->
IO.puts("π **Local Video File Processing:**")
IO.puts(" 1. Place your video file in a web-accessible location")
IO.puts(" 2. Or use a local HTTP server: `python -m http.server 8000`")
IO.puts(" 3. Use URL like: http://localhost:8000/video.mp4")
IO.puts(" 4. Supported formats: MP4, MOV, AVI, MKV, WebM")
:remote_video ->
IO.puts("π **Remote Video File Processing:**")
IO.puts(" β’ Source URL: #{file_params.remote_file_url}")
IO.puts(" β’ Processing Mode: #{file_params.processing_mode}")
IO.puts(" β’ Target Quality: #{file_params.video_quality}")
# Create ingress for remote file
client = Process.get(:client)
config = Process.get(:config)
metadata = %{
"source_type" => "remote_file",
"processing_mode" => to_string(file_params.processing_mode),
"target_quality" => to_string(file_params.video_quality),
"file_url" => file_params.remote_file_url
} |> Jason.encode!()
request = %Livekit.CreateIngressRequest{
input_type: :URL_INPUT,
name: "file-proc-#{System.system_time(:second)}",
room_name: config.room_name,
participant_identity: "file-processor-#{System.system_time(:second)}",
participant_name: "File: #{Path.basename(file_params.remote_file_url)}",
participant_metadata: metadata,
url: file_params.remote_file_url,
enable_transcoding: file_params.video_quality != :original
}
case Livekit.IngressServiceClient.create_ingress(client, request) do
{:ok, ingress} ->
Process.put(:file_ingress, ingress)
IO.puts(" β
File processing ingress created: #{ingress.ingress_id}")
{:error, reason} ->
IO.puts(" β Failed to create file ingress: #{inspect(reason)}")
end
:batch ->
IO.puts("π **Batch Processing Setup:**")
IO.puts(" β’ Create multiple ingress endpoints for different files")
IO.puts(" β’ Use scheduling to process files in sequence")
IO.puts(" β’ Monitor processing status for each file")
IO.puts(" β’ Implement error handling and retry logic")
:scheduled ->
IO.puts("β° **Scheduled Playback Configuration:**")
IO.puts(" β’ Set up cron-like scheduling for content delivery")
IO.puts(" β’ Use external schedulers to trigger ingress creation")
IO.puts(" β’ Implement playlist management")
IO.puts(" β’ Consider timezone handling for global audiences")
end
IO.puts("")
IO.puts("π§ **Advanced File Processing Options:**")
IO.puts(" β’ Transcoding: Convert between formats automatically")
IO.puts(" β’ Quality adaptation: Multiple bitrates for different devices")
IO.puts(" β’ Content analysis: Extract metadata and thumbnails")
IO.puts(" β’ Error recovery: Automatic retry on processing failures")
FFmpeg Integration for File Processing
Letβs generate FFmpeg commands for advanced file processing:
# FFmpeg file processing generator
ffmpeg_form = Kino.Control.form(
[
operation: Kino.Input.select("FFmpeg Operation", [
{"Stream file to LiveKit", :stream_file},
{"Convert format before streaming", :convert_stream},
{"Extract audio from video", :extract_audio},
{"Create video thumbnail", :thumbnail},
{"Batch process multiple files", :batch_process}
]),
input_file: Kino.Input.text("Input File Path", default: "/path/to/input/video.mp4"),
output_format: Kino.Input.select("Output Format", [
{"Same as input", :same},
{"MP4 (H.264)", :mp4},
{"WebM (VP9)", :webm},
{"FLV (for RTMP)", :flv}
])
],
submit: "Generate FFmpeg Command"
)
# Generate FFmpeg commands
ffmpeg_params = Kino.Control.read(ffmpeg_form)
ingress = Process.get(:url_ingress) || Process.get(:file_ingress)
IO.puts("π§ FFmpeg File Processing Commands")
IO.puts("=" |> String.duplicate(50))
IO.puts("")
case ffmpeg_params.operation do
:stream_file ->
if ingress && ingress.input_type == :URL_INPUT do
# For URL ingress, we can stream directly
command = """
ffmpeg -re -i "#{ffmpeg_params.input_file}" \\
-c:v libx264 -preset veryfast -tune zerolatency \\
-c:a aac -ar 48000 \\
-f flv "#{ingress.url}"
"""
IO.puts("πΊ **Stream File to LiveKit URL Ingress:**")
IO.puts("```bash")
IO.puts(command)
IO.puts("```")
else
IO.puts("β οΈ Create a URL ingress endpoint first")
end
:convert_stream ->
format_settings = case ffmpeg_params.output_format do
:mp4 -> "-c:v libx264 -c:a aac -f mp4"
:webm -> "-c:v libvpx-vp9 -c:a libvorbis -f webm"
:flv -> "-c:v libx264 -c:a aac -f flv"
:same -> "-c copy"
end
command = """
ffmpeg -i "#{ffmpeg_params.input_file}" \\
#{format_settings} \\
-movflags +faststart \\
"output.#{ffmpeg_params.output_format}"
"""
IO.puts("π **Convert File Format:**")
IO.puts("```bash")
IO.puts(command)
IO.puts("```")
:extract_audio ->
command = """
ffmpeg -i "#{ffmpeg_params.input_file}" \\
-vn -c:a aac -b:a 192k \\
"#{Path.rootname(ffmpeg_params.input_file)}.aac"
"""
IO.puts("π΅ **Extract Audio from Video:**")
IO.puts("```bash")
IO.puts(command)
IO.puts("```")
:thumbnail ->
command = """
ffmpeg -i "#{ffmpeg_params.input_file}" \\
-vf "thumbnail,scale=320:240" \\
-frames:v 1 \\
"#{Path.rootname(ffmpeg_params.input_file)}_thumb.jpg"
"""
IO.puts("πΈ **Generate Video Thumbnail:**")
IO.puts("```bash")
IO.puts(command)
IO.puts("```")
:batch_process ->
IO.puts("π **Batch Process Multiple Files:**")
IO.puts("```bash")
IO.puts("# Process all MP4 files in a directory")
IO.puts("for file in *.mp4; do")
IO.puts(" echo \"Processing: $file\"")
IO.puts(" ffmpeg -i \"$file\" \\")
IO.puts(" -c:v libx264 -preset fast \\")
IO.puts(" -c:a aac -b:a 192k \\")
IO.puts(" \"processed_${file}\"")
IO.puts("done")
IO.puts("```")
end
IO.puts("")
IO.puts("π‘ **FFmpeg Tips for File Processing:**")
IO.puts(" β’ Use -re flag to read input at native frame rate")
IO.puts(" β’ Add -threads 0 for automatic thread optimization")
IO.puts(" β’ Use -preset fast/medium/slow for encoding speed vs quality")
IO.puts(" β’ Monitor with -progress for long operations")
IO.puts(" β’ Use -y flag to overwrite output files automatically")
Content Validation and Analysis
Letβs create tools to validate and analyze content before processing:
# Content analysis module
defmodule ContentAnalyzer do
def analyze_url(url) do
try do
case Req.head(url, connect_options: [timeout: 5000]) do
{:ok, %{status: 200, headers: headers}} ->
content_type = get_header(headers, "content-type")
content_length = get_header(headers, "content-length")
%{
valid: true,
content_type: content_type,
content_length: content_length,
size_mb: if(content_length, do: String.to_integer(content_length) / 1_048_576, else: nil),
media_type: classify_media_type(content_type)
}
{:ok, %{status: status}} ->
%{valid: false, error: "HTTP #{status}"}
{:error, reason} ->
%{valid: false, error: inspect(reason)}
end
rescue
e -> %{valid: false, error: "Request failed: #{inspect(e)}"}
end
end
defp get_header(headers, name) do
case Enum.find(headers, fn {key, _} -> String.downcase(key) == name end) do
{_, value} -> value
nil -> nil
end
end
defp classify_media_type(content_type) when is_binary(content_type) do
cond do
String.contains?(content_type, "video/") -> :video
String.contains?(content_type, "audio/") -> :audio
String.contains?(content_type, "application/vnd.apple.mpegurl") -> :hls
String.contains?(content_type, "application/dash+xml") -> :dash
true -> :unknown
end
end
defp classify_media_type(_), do: :unknown
end
# Content validation form
validation_form = Kino.Control.form(
[
url_to_analyze: Kino.Input.text("URL to Analyze",
default: "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4")
],
submit: "Analyze Content"
)
# Perform content analysis
validation_params = Kino.Control.read(validation_form)
IO.puts("π Content Analysis Results")
IO.puts("=" |> String.duplicate(40))
IO.puts("")
analysis = ContentAnalyzer.analyze_url(validation_params.url_to_analyze)
if analysis.valid do
IO.puts("β
**Content is accessible and valid**")
IO.puts(" π Content Type: #{analysis.content_type}")
IO.puts(" π¬ Media Type: #{analysis.media_type}")
if analysis.content_length do
IO.puts(" π Size: #{analysis.content_length} bytes")
if analysis.size_mb do
IO.puts(" πΎ Size: #{Float.round(analysis.size_mb, 2)} MB")
end
end
IO.puts("")
# Provide recommendations based on analysis
case analysis.media_type do
:video ->
IO.puts("πΉ **Video Content Recommendations:**")
IO.puts(" β’ Good for URL ingress streaming")
IO.puts(" β’ Consider transcoding for mobile compatibility")
IO.puts(" β’ Monitor bandwidth usage for large files")
:audio ->
IO.puts("π΅ **Audio Content Recommendations:**")
IO.puts(" β’ Perfect for audio-only streaming")
IO.puts(" β’ Lower bandwidth requirements")
IO.puts(" β’ Consider adding static image for visual element")
:hls ->
IO.puts("πΊ **HLS Stream Recommendations:**")
IO.puts(" β’ Excellent for adaptive streaming")
IO.puts(" β’ Built-in quality adaptation")
IO.puts(" β’ Good for live content ingestion")
:dash ->
IO.puts("π― **DASH Stream Recommendations:**")
IO.puts(" β’ Modern adaptive streaming format")
IO.puts(" β’ Supports multiple codecs")
IO.puts(" β’ Check browser compatibility")
_ ->
IO.puts("β **Unknown Content Type:**")
IO.puts(" β’ Verify the URL is a valid media stream")
IO.puts(" β’ Check if authentication is required")
IO.puts(" β’ Test with direct browser access")
end
else
IO.puts("β **Content analysis failed**")
IO.puts(" Error: #{analysis.error}")
IO.puts("")
IO.puts("π οΈ **Troubleshooting suggestions:**")
IO.puts(" β’ Check if the URL is publicly accessible")
IO.puts(" β’ Verify the URL format is correct")
IO.puts(" β’ Test the URL in a browser first")
IO.puts(" β’ Check if authentication is required")
IO.puts(" β’ Ensure the server supports HEAD requests")
end
Batch Processing Workflow
Letβs create a batch processing system for multiple files:
# Batch processing configuration
batch_form = Kino.Control.form(
[
batch_urls: Kino.Input.textarea("URLs to Process (one per line)", default: """
https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4
https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4
https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/Sintel.mp4
"""),
batch_mode: Kino.Input.select("Processing Mode", [
{"Sequential (one at a time)", :sequential},
{"Parallel (all at once)", :parallel},
{"Scheduled (with delays)", :scheduled}
]),
delay_seconds: Kino.Input.number("Delay between files (seconds)", default: 30)
],
submit: "Start Batch Processing"
)
# Execute batch processing
batch_params = Kino.Control.read(batch_form)
client = Process.get(:client)
config = Process.get(:config)
# Parse URLs
urls = batch_params.batch_urls
|> String.split("\n")
|> Enum.map(&String.trim/1)
|> Enum.reject(&(&1 == ""))
IO.puts("π Batch Processing Configuration")
IO.puts("=" |> String.duplicate(40))
IO.puts(" π Files to process: #{length(urls)}")
IO.puts(" π Processing mode: #{batch_params.batch_mode}")
if batch_params.batch_mode == :scheduled do
IO.puts(" β° Delay between files: #{batch_params.delay_seconds} seconds")
end
IO.puts("")
case batch_params.batch_mode do
:sequential ->
IO.puts("π **Sequential Processing Results:**")
created_ingresses = for {url, index} <- Enum.with_index(urls, 1) do
IO.puts(" #{index}. Processing: #{Path.basename(url)}")
metadata = %{
"batch_processing" => true,
"batch_index" => index,
"batch_total" => length(urls),
"source_url" => url
} |> Jason.encode!()
request = %Livekit.CreateIngressRequest{
input_type: :URL_INPUT,
name: "batch-#{index}-#{System.system_time(:second)}",
room_name: config.room_name,
participant_identity: "batch-processor-#{index}",
participant_name: "Batch #{index}: #{Path.basename(url)}",
participant_metadata: metadata,
url: url,
enable_transcoding: true
}
case Livekit.IngressServiceClient.create_ingress(client, request) do
{:ok, ingress} ->
IO.puts(" β
Created: #{ingress.ingress_id}")
ingress
{:error, reason} ->
IO.puts(" β Failed: #{inspect(reason)}")
nil
end
end
successful = Enum.count(created_ingresses, & &1)
IO.puts("")
IO.puts("π **Batch Results:** #{successful}/#{length(urls)} successful")
Process.put(:batch_ingresses, Enum.reject(created_ingresses, &is_nil/1))
:parallel ->
IO.puts("β‘ **Parallel Processing (Creating all ingresses simultaneously):**")
tasks = for {url, index} <- Enum.with_index(urls, 1) do
Task.async(fn ->
metadata = %{
"batch_processing" => true,
"batch_index" => index,
"batch_total" => length(urls),
"source_url" => url,
"processing_mode" => "parallel"
} |> Jason.encode!()
request = %Livekit.CreateIngressRequest{
input_type: :URL_INPUT,
name: "parallel-#{index}-#{System.system_time(:second)}",
room_name: config.room_name,
participant_identity: "parallel-processor-#{index}",
participant_name: "Parallel #{index}: #{Path.basename(url)}",
participant_metadata: metadata,
url: url,
enable_transcoding: true
}
case Livekit.IngressServiceClient.create_ingress(client, request) do
{:ok, ingress} ->
{index, :success, ingress}
{:error, reason} ->
{index, :error, reason}
end
end)
end
results = Task.await_many(tasks, 30_000) # 30 second timeout
for {index, status, result} <- results do
case status do
:success ->
IO.puts(" #{index}. β
#{result.name} (#{result.ingress_id})")
:error ->
IO.puts(" #{index}. β Failed: #{inspect(result)}")
end
end
successful_ingresses = for {_index, :success, ingress} <- results, do: ingress
IO.puts("")
IO.puts("π **Parallel Results:** #{length(successful_ingresses)}/#{length(urls)} successful")
Process.put(:batch_ingresses, successful_ingresses)
:scheduled ->
IO.puts("β° **Scheduled Processing (with delays):**")
IO.puts(" This would be implemented with a scheduler in a real application.")
IO.puts(" For demonstration, showing the concept:")
IO.puts("")
for {url, index} <- Enum.with_index(urls, 1) do
scheduled_time = DateTime.add(DateTime.utc_now(), (index - 1) * batch_params.delay_seconds, :second)
IO.puts(" #{index}. #{Path.basename(url)}")
IO.puts(" β° Scheduled for: #{DateTime.to_string(scheduled_time)}")
IO.puts(" π URL: #{url}")
IO.puts("")
end
IO.puts("π‘ **Scheduled Processing Implementation:**")
IO.puts(" β’ Use Elixir's Process.send_after/3 for delays")
IO.puts(" β’ Implement with GenServer for state management")
IO.puts(" β’ Add error recovery and retry logic")
IO.puts(" β’ Monitor processing status and completion")
end
Stream Status Monitoring
Letβs create a comprehensive monitoring system for file processing:
defmodule FileProcessingMonitor do
def monitor_batch_ingresses(client, ingresses) do
results = for ingress <- ingresses do
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
current = Enum.find(response.items, fn i -> i.ingress_id == ingress.ingress_id end)
if current do
%{
ingress_id: ingress.ingress_id,
name: ingress.name,
status: current.state && current.state.status,
tracks: current.state && current.state.tracks || [],
error: current.state && current.state.error
}
else
%{ingress_id: ingress.ingress_id, name: ingress.name, status: :not_found}
end
{:error, reason} ->
%{ingress_id: ingress.ingress_id, name: ingress.name, status: :error, error: reason}
end
end
results
end
def format_status_summary(monitors) do
status_counts = monitors
|> Enum.group_by(& &1.status)
|> Enum.map(fn {status, items} -> {status, length(items)} end)
|> Map.new()
total = length(monitors)
%{
total: total,
counts: status_counts,
active: Map.get(status_counts, :ENDPOINT_PUBLISHING, 0),
inactive: Map.get(status_counts, :ENDPOINT_INACTIVE, 0),
errors: Map.get(status_counts, :ENDPOINT_ERROR, 0)
}
end
end
# Create monitoring display
monitoring_frame = Kino.Frame.new()
# Start batch monitoring task
batch_monitor_task = Task.async(fn ->
batch_ingresses = Process.get(:batch_ingresses) || []
client = Process.get(:client)
if !Enum.empty?(batch_ingresses) && client do
Enum.each(1..24, fn iteration -> # Monitor for 2 minutes (24 * 5 seconds)
timestamp = DateTime.utc_now() |> DateTime.to_string()
monitor_results = FileProcessingMonitor.monitor_batch_ingresses(client, batch_ingresses)
summary = FileProcessingMonitor.format_status_summary(monitor_results)
content = """
## π File Processing Monitor (Update ##{iteration})
**Last Update:** #{timestamp}
**Total Files:** #{summary.total}
### π Status Summary:
- π’ **Publishing:** #{summary.active} files
- π΄ **Inactive:** #{summary.inactive} files
- β **Errors:** #{summary.errors} files
### π Detailed Status:
#{for monitor <- monitor_results do
status_emoji = case monitor.status do
:ENDPOINT_PUBLISHING -> "π’"
:ENDPOINT_INACTIVE -> "π΄"
:ENDPOINT_BUFFERING -> "π‘"
:ENDPOINT_ERROR -> "β"
:ENDPOINT_COMPLETE -> "β
"
_ -> "βͺ"
end
track_info = if length(monitor.tracks) > 0 do
" (#{length(monitor.tracks)} tracks)"
else
""
end
error_info = if monitor.error && monitor.error != "" do
" - Error: #{monitor.error}"
else
""
end
"- #{status_emoji} **#{monitor.name}**#{track_info}#{error_info}"
end |> Enum.join("\n")}
### π‘ Tips:
- Files may take 10-30 seconds to start publishing
- Large files require more processing time
- Check source URLs if files remain inactive
- Monitor network connectivity for streaming files
"""
Kino.Frame.render(monitoring_frame, Kino.Markdown.new(content))
Process.sleep(5000) # Update every 5 seconds
end)
# Final monitoring summary
final_results = FileProcessingMonitor.monitor_batch_ingresses(client, batch_ingresses)
final_summary = FileProcessingMonitor.format_status_summary(final_results)
Kino.Frame.render(monitoring_frame, Kino.Markdown.new("""
## π Final Batch Processing Summary
**Processing Complete!**
### π Final Results:
- π’ **Publishing:** #{final_summary.active} files
- π΄ **Inactive:** #{final_summary.inactive} files
- β **Errors:** #{final_summary.errors} files
**Success Rate:** #{if final_summary.total > 0, do: Float.round(final_summary.active / final_summary.total * 100, 1), else: 0}%
Re-run this cell to start a new monitoring session.
"""))
else
Kino.Frame.render(monitoring_frame, Kino.Markdown.new("""
## β οΈ Batch Monitoring Not Available
Please create some batch ingress endpoints first using the batch processing section above.
"""))
end
end)
monitoring_frame
Advanced File Processing Techniques
IO.puts("π¬ Advanced File Processing Techniques")
IO.puts("=" |> String.duplicate(50))
IO.puts("")
IO.puts("π¬ **Video Processing Optimizations:**")
IO.puts("")
IO.puts("1οΈβ£ **Multi-bitrate Streaming**")
IO.puts(" β’ Generate multiple quality versions")
IO.puts(" β’ Adaptive bitrate based on client capability")
IO.puts(" β’ Reduce bandwidth usage for mobile users")
IO.puts("")
IO.puts(" FFmpeg Command:")
IO.puts(" ```bash")
IO.puts(" ffmpeg -i input.mp4 \\")
IO.puts(" -map 0:v -map 0:a -map 0:v -map 0:a \\")
IO.puts(" -c:v:0 libx264 -b:v:0 4000k -s:v:0 1920x1080 \\")
IO.puts(" -c:v:1 libx264 -b:v:1 2000k -s:v:1 1280x720 \\")
IO.puts(" -c:a:0 aac -b:a:0 192k \\")
IO.puts(" -c:a:1 aac -b:a:1 128k \\")
IO.puts(" -f hls -hls_time 4 -hls_list_size 0 \\")
IO.puts(" -var_stream_map \"v:0,a:0 v:1,a:1\" \\")
IO.puts(" -master_pl_name master.m3u8 \\")
IO.puts(" stream_%v.m3u8")
IO.puts(" ```")
IO.puts("")
IO.puts("2οΈβ£ **Content Analysis and Metadata Extraction**")
IO.puts(" β’ Extract video duration, resolution, bitrate")
IO.puts(" β’ Generate thumbnails at specific timestamps")
IO.puts(" β’ Analyze audio levels and quality")
IO.puts(" β’ Extract subtitles and closed captions")
IO.puts("")
IO.puts(" FFmpeg Commands:")
IO.puts(" ```bash")
IO.puts(" # Get video information")
IO.puts(" ffprobe -v quiet -print_format json -show_format -show_streams input.mp4")
IO.puts("")
IO.puts(" # Generate thumbnails every 10 seconds")
IO.puts(" ffmpeg -i input.mp4 -vf fps=1/10 thumb_%03d.jpg")
IO.puts("")
IO.puts(" # Extract audio waveform")
IO.puts(" ffmpeg -i input.mp4 -filter_complex showwavespic=s=1920x1080 waveform.png")
IO.puts(" ```")
IO.puts("")
IO.puts("3οΈβ£ **Automated Quality Control**")
IO.puts(" β’ Detect corrupted or incomplete files")
IO.puts(" β’ Validate audio/video sync")
IO.puts(" β’ Check for proper encoding parameters")
IO.puts(" β’ Ensure files meet streaming requirements")
IO.puts("")
IO.puts("4οΈβ£ **Content Security and DRM**")
IO.puts(" β’ Encrypt video content for secure delivery")
IO.puts(" β’ Implement token-based access control")
IO.puts(" β’ Watermark videos for copyright protection")
IO.puts(" β’ Geographic content restrictions")
IO.puts("")
IO.puts("5οΈβ£ **Performance Optimization**")
IO.puts(" β’ Use hardware acceleration (GPU encoding)")
IO.puts(" β’ Parallel processing for batch operations")
IO.puts(" β’ Caching frequently accessed content")
IO.puts(" β’ Content Delivery Network (CDN) integration")
Troubleshooting Common Issues
IO.puts("π οΈ File Processing Troubleshooting Guide")
IO.puts("=" |> String.duplicate(50))
IO.puts("")
IO.puts("β **Common Problems and Solutions:**")
IO.puts("")
IO.puts("1οΈβ£ **File Not Found / Access Denied**")
IO.puts(" β’ Verify URL is publicly accessible")
IO.puts(" β’ Check for authentication requirements")
IO.puts(" β’ Test URL in browser first")
IO.puts(" β’ Ensure proper CORS headers for web content")
IO.puts("")
IO.puts("2οΈβ£ **Unsupported File Format**")
IO.puts(" β’ Convert to supported formats (MP4, WebM, etc.)")
IO.puts(" β’ Check codec compatibility")
IO.puts(" β’ Use FFmpeg for format conversion")
IO.puts(" β’ Verify container format vs codec format")
IO.puts("")
IO.puts("3οΈβ£ **Large File Processing Issues**")
IO.puts(" β’ Break large files into segments")
IO.puts(" β’ Use progressive download")
IO.puts(" β’ Implement resume capability")
IO.puts(" β’ Monitor memory usage during processing")
IO.puts("")
IO.puts("4οΈβ£ **Streaming Performance Problems**")
IO.puts(" β’ Reduce bitrate for better streaming")
IO.puts(" β’ Use adaptive streaming (HLS/DASH)")
IO.puts(" β’ Implement proper buffering")
IO.puts(" β’ Monitor network bandwidth")
IO.puts("")
IO.puts("5οΈβ£ **Audio/Video Sync Issues**")
IO.puts(" β’ Check source file for sync problems")
IO.puts(" β’ Use FFmpeg to fix timing issues")
IO.puts(" β’ Verify frame rate consistency")
IO.puts(" β’ Check audio sample rate compatibility")
IO.puts("")
IO.puts("π§ **Diagnostic Commands:**")
IO.puts("")
IO.puts("**Test file accessibility:**")
IO.puts("```bash")
IO.puts("curl -I \"https://example.com/video.mp4\"")
IO.puts("```")
IO.puts("")
IO.puts("**Analyze file properties:**")
IO.puts("```bash")
IO.puts("ffprobe -v quiet -print_format json -show_format -show_streams file.mp4")
IO.puts("```")
IO.puts("")
IO.puts("**Test streaming performance:**")
IO.puts("```bash")
IO.puts("ffmpeg -i file.mp4 -f null - 2>&1 | grep fps")
IO.puts("```")
IO.puts("")
IO.puts("**Check audio/video sync:**")
IO.puts("```bash")
IO.puts("ffmpeg -i file.mp4 -vf \"showinfo\" -f null - 2>&1 | grep pts_time")
IO.puts("```")
Cleanup and Resource Management
# File processing cleanup
cleanup_form = Kino.Control.form(
[
cleanup_action: Kino.Input.select("Cleanup Action", [
{"List all URL/file ingress endpoints", :list_url},
{"Delete batch processing endpoints", :delete_batch},
{"Delete all file processing endpoints", :delete_all_files},
{"Delete specific ingress by ID", :delete_specific}
]),
specific_id: Kino.Input.text("Ingress ID (for specific deletion)", default: "")
],
submit: "Execute Cleanup"
)
# Execute cleanup operations
cleanup_params = Kino.Control.read(cleanup_form)
client = Process.get(:client)
case cleanup_params.cleanup_action do
:list_url ->
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
url_ingresses = Enum.filter(response.items, fn ingress ->
ingress.input_type == :URL_INPUT
end)
IO.puts("π URL/File Ingress Endpoints (#{length(url_ingresses)} total):")
for ingress <- url_ingresses do
status_emoji = case ingress.state && ingress.state.status do
:ENDPOINT_PUBLISHING -> "π’"
:ENDPOINT_INACTIVE -> "π΄"
:ENDPOINT_BUFFERING -> "π‘"
:ENDPOINT_ERROR -> "β"
_ -> "βͺ"
end
IO.puts(" #{status_emoji} #{ingress.name}")
IO.puts(" ID: #{ingress.ingress_id}")
IO.puts(" URL: #{String.slice(ingress.url, 0, 60)}...")
IO.puts(" Room: #{ingress.room_name}")
IO.puts("")
end
{:error, reason} ->
IO.puts("β Failed to list ingress: #{inspect(reason)}")
end
:delete_batch ->
batch_ingresses = Process.get(:batch_ingresses) || []
if Enum.empty?(batch_ingresses) do
IO.puts("βͺ No batch ingress endpoints to clean up")
else
IO.puts("π§Ή Cleaning up #{length(batch_ingresses)} batch endpoints...")
for ingress <- batch_ingresses do
request = %Livekit.DeleteIngressRequest{ingress_id: ingress.ingress_id}
case Livekit.IngressServiceClient.delete_ingress(client, request) do
{:ok, _} ->
IO.puts(" β
Deleted: #{ingress.name}")
{:error, reason} ->
IO.puts(" β Failed to delete #{ingress.name}: #{inspect(reason)}")
end
end
Process.delete(:batch_ingresses)
IO.puts("β¨ Batch cleanup complete")
end
:delete_all_files ->
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
file_ingresses = Enum.filter(response.items, fn ingress ->
ingress.input_type == :URL_INPUT &&
(String.contains?(ingress.name, "file") ||
String.contains?(ingress.name, "batch") ||
String.contains?(ingress.name, "url"))
end)
if Enum.empty?(file_ingresses) do
IO.puts("β¨ No file processing endpoints to clean up")
else
IO.puts("π§Ή Cleaning up #{length(file_ingresses)} file processing endpoints...")
for ingress <- file_ingresses do
request = %Livekit.DeleteIngressRequest{ingress_id: ingress.ingress_id}
case Livekit.IngressServiceClient.delete_ingress(client, request) do
{:ok, _} ->
IO.puts(" β
Deleted: #{ingress.name}")
{:error, reason} ->
IO.puts(" β Failed to delete #{ingress.name}: #{inspect(reason)}")
end
end
end
{:error, reason} ->
IO.puts("β Failed to cleanup: #{inspect(reason)}")
end
:delete_specific ->
if cleanup_params.specific_id != "" do
request = %Livekit.DeleteIngressRequest{ingress_id: cleanup_params.specific_id}
case Livekit.IngressServiceClient.delete_ingress(client, request) do
{:ok, deleted} ->
IO.puts("β
Deleted ingress: #{deleted.name} (#{deleted.ingress_id})")
{:error, reason} ->
IO.puts("β Failed to delete: #{inspect(reason)}")
end
else
IO.puts("β οΈ Please provide an ingress ID to delete")
end
end
Summary and Best Practices
IO.puts("π File Processing Tutorial Complete!")
IO.puts("=" |> String.duplicate(50))
IO.puts("")
IO.puts("β
**What You've Accomplished:**")
IO.puts(" β’ Created URL ingress endpoints for various content types")
IO.puts(" β’ Analyzed and validated remote content")
IO.puts(" β’ Generated FFmpeg commands for file processing")
IO.puts(" β’ Implemented batch processing workflows")
IO.puts(" β’ Set up comprehensive monitoring systems")
IO.puts(" β’ Learned troubleshooting techniques")
IO.puts("")
IO.puts("π **Best Practices for File Processing:**")
IO.puts("")
IO.puts("1οΈβ£ **Content Validation**")
IO.puts(" β’ Always validate URLs before creating ingress")
IO.puts(" β’ Check file formats and codec compatibility")
IO.puts(" β’ Test accessibility and authentication requirements")
IO.puts(" β’ Verify content length and quality")
IO.puts("")
IO.puts("2οΈβ£ **Processing Optimization**")
IO.puts(" β’ Use appropriate transcoding settings")
IO.puts(" β’ Implement adaptive bitrate for different devices")
IO.puts(" β’ Monitor processing performance and errors")
IO.puts(" β’ Consider hardware acceleration for large volumes")
IO.puts("")
IO.puts("3οΈβ£ **Batch Operations**")
IO.puts(" β’ Process files in manageable batches")
IO.puts(" β’ Implement proper error handling and retries")
IO.puts(" β’ Monitor system resources during batch processing")
IO.puts(" β’ Use scheduling for non-urgent processing")
IO.puts("")
IO.puts("4οΈβ£ **Monitoring and Maintenance**")
IO.puts(" β’ Monitor ingress endpoint status regularly")
IO.puts(" β’ Clean up completed or failed endpoints")
IO.puts(" β’ Log processing metrics for analysis")
IO.puts(" β’ Set up alerts for processing failures")
IO.puts("")
IO.puts("π **Recommended Next Steps:**")
IO.puts(" 1. Try the Ingress Management Livebook for automation")
IO.puts(" 2. Explore the Troubleshooting Livebook for advanced debugging")
IO.puts(" 3. Implement custom file processing workflows")
IO.puts(" 4. Set up monitoring dashboards")
IO.puts(" 5. Create automated content delivery pipelines")
IO.puts("")
IO.puts("π **Additional Resources:**")
IO.puts(" β’ FFmpeg Documentation: https://ffmpeg.org/documentation.html")
IO.puts(" β’ HLS Specification: https://tools.ietf.org/html/rfc8216")
IO.puts(" β’ DASH Standard: https://dashif.org/")
IO.puts(" β’ LiveKit Ingress Guide: https://docs.livekit.io/ingress/")