LiveKit Ingress Service - RTMP Streaming Tutorial
Mix.install([
{:livekit, path: "../.."},
{:kino, "~> 0.12"}
])
Introduction to RTMP Ingress
This Livebook provides a comprehensive tutorial on setting up RTMP (Real-Time Messaging Protocol) streaming with LiveKitβs Ingress Service. RTMP is the traditional streaming protocol used by platforms like Twitch, YouTube Live, and Facebook Live.
What Youβll Learn:
- π₯ Setting up RTMP ingress endpoints
- π§ Configuring OBS Studio for RTMP streaming
- βοΈ Using FFmpeg for programmatic streaming
- ποΈ Advanced RTMP configuration options
- π Monitoring stream health and performance
- π οΈ Troubleshooting common RTMP issues
Configuration & Client Setup
# Configuration form for LiveKit connection
config_form = Kino.Control.form(
[
api_key: Kino.Input.password("LiveKit API Key"),
api_secret: Kino.Input.password("LiveKit API Secret"),
url: Kino.Input.text("LiveKit Server URL", default: "wss://your-server.livekit.cloud"),
room_name: Kino.Input.text("Target Room Name", default: "rtmp-demo-room")
],
submit: "Connect to LiveKit"
)
# Establish connection
config = Kino.Control.read(config_form)
Process.put(:config, config)
case Livekit.IngressServiceClient.new(config.url, config.api_key, config.api_secret) do
{:ok, client} ->
Process.put(:client, client)
IO.puts("β
Connected to LiveKit Ingress Service!")
IO.puts("π― Target room: #{config.room_name}")
{:error, reason} ->
IO.puts("β Connection failed: #{reason}")
IO.puts("Please verify your credentials and server URL.")
end
Creating an RTMP Ingress Endpoint
Letβs create a comprehensive RTMP ingress with all the configuration options:
# RTMP Ingress Configuration Form
rtmp_form = Kino.Control.form(
[
name: Kino.Input.text("Stream Name", default: "obs-stream-#{System.system_time(:second)}"),
participant_identity: Kino.Input.text("Streamer Identity", default: "streamer-1"),
participant_name: Kino.Input.text("Streamer Display Name", default: "Demo Streamer"),
participant_metadata: Kino.Input.text("Metadata (JSON)", default: "{\"source\": \"OBS Studio\", \"quality\": \"HD\"}"),
enable_transcoding: Kino.Input.checkbox("Enable Transcoding", default: true)
],
submit: "Create RTMP Ingress"
)
# Create the RTMP ingress endpoint
rtmp_params = Kino.Control.read(rtmp_form)
client = Process.get(:client)
config = Process.get(:config)
request = %Livekit.CreateIngressRequest{
input_type: :RTMP_INPUT,
name: rtmp_params.name,
room_name: config.room_name,
participant_identity: rtmp_params.participant_identity,
participant_name: rtmp_params.participant_name,
participant_metadata: rtmp_params.participant_metadata,
enable_transcoding: rtmp_params.enable_transcoding
}
case Livekit.IngressServiceClient.create_ingress(client, request) do
{:ok, ingress} ->
Process.put(:rtmp_ingress, ingress)
IO.puts("π RTMP Ingress Created Successfully!")
IO.puts("=" |> String.duplicate(50))
IO.puts("π‘ Ingress ID: #{ingress.ingress_id}")
IO.puts("π·οΈ Name: #{ingress.name}")
IO.puts("π RTMP URL: #{ingress.url}")
IO.puts("π Stream Key: #{ingress.stream_key}")
IO.puts("π Room: #{ingress.room_name}")
IO.puts("π€ Participant: #{ingress.participant_identity}")
IO.puts("π Display Name: #{ingress.participant_name}")
IO.puts("π Metadata: #{ingress.participant_metadata}")
IO.puts("βοΈ Transcoding: #{ingress.enable_transcoding}")
IO.puts("=" |> String.duplicate(50))
{:error, reason} ->
IO.puts("β Failed to create RTMP ingress: #{inspect(reason)}")
end
OBS Studio Configuration Guide
Now that we have our RTMP endpoint, letβs set up OBS Studio to stream to it:
ingress = Process.get(:rtmp_ingress)
if ingress do
IO.puts("π₯ OBS Studio Configuration Guide")
IO.puts("=" |> String.duplicate(40))
IO.puts("")
IO.puts("1οΈβ£ Open OBS Studio")
IO.puts("")
IO.puts("2οΈβ£ Go to Settings β Stream")
IO.puts("")
IO.puts("3οΈβ£ Configure the following settings:")
IO.puts(" Service: Custom")
IO.puts(" Server: #{ingress.url}")
IO.puts(" Stream Key: #{ingress.stream_key}")
IO.puts("")
IO.puts("4οΈβ£ Recommended encoding settings:")
IO.puts(" Encoder: x264")
IO.puts(" Rate Control: CBR")
IO.puts(" Bitrate: 2500 Kbps (adjust based on your upload speed)")
IO.puts(" Keyframe Interval: 2 seconds")
IO.puts("")
IO.puts("5οΈβ£ Audio settings:")
IO.puts(" Sample Rate: 48 kHz")
IO.puts(" Channels: Stereo")
IO.puts(" Bitrate: 160 Kbps")
IO.puts("")
IO.puts("6οΈβ£ Click 'Start Streaming' in OBS to begin!")
IO.puts("")
IO.puts("β οΈ Make sure your room '#{ingress.room_name}' is created in LiveKit")
else
IO.puts("β οΈ Please create an RTMP ingress first")
end
FFmpeg Command Generation
For programmatic streaming, letβs generate FFmpeg commands:
# FFmpeg command generator
ffmpeg_form = Kino.Control.form(
[
input_source: Kino.Input.select("Input Source", [
{"Test Pattern (color bars)", :test_pattern},
{"Video File", :file},
{"Webcam", :webcam},
{"Screen Capture", :screen}
]),
video_file_path: Kino.Input.text("Video File Path (if file selected)", default: "/path/to/video.mp4"),
video_bitrate: Kino.Input.number("Video Bitrate (kbps)", default: 2500),
audio_bitrate: Kino.Input.number("Audio Bitrate (kbps)", default: 160),
framerate: Kino.Input.number("Frame Rate (fps)", default: 30),
resolution: Kino.Input.select("Resolution", [
{"1920x1080 (1080p)", "1920x1080"},
{"1280x720 (720p)", "1280x720"},
{"854x480 (480p)", "854x480"}
])
],
submit: "Generate FFmpeg Command"
)
# Generate and display FFmpeg commands
ffmpeg_params = Kino.Control.read(ffmpeg_form)
ingress = Process.get(:rtmp_ingress)
if ingress do
# Base FFmpeg parameters
base_cmd = "ffmpeg"
# Input source configuration
input_cmd = case ffmpeg_params.input_source do
:test_pattern ->
"-f lavfi -i testsrc2=size=#{ffmpeg_params.resolution}:rate=#{ffmpeg_params.framerate} -f lavfi -i sine=frequency=1000:sample_rate=48000"
:file ->
"-i \"#{ffmpeg_params.video_file_path}\""
:webcam ->
# macOS/Linux webcam (adjust device as needed)
"-f avfoundation -i \"0:0\"" # macOS
# For Linux: "-f v4l2 -i /dev/video0 -f alsa -i default"
:screen ->
# macOS screen capture
"-f avfoundation -i \"1:0\"" # macOS (screen 1, audio device 0)
# For Linux: "-f x11grab -i :0.0"
end
# Video encoding settings
video_cmd = "-c:v libx264 -preset veryfast -tune zerolatency -b:v #{ffmpeg_params.video_bitrate}k -maxrate #{ffmpeg_params.video_bitrate * 1.2}k -bufsize #{ffmpeg_params.video_bitrate * 2}k -pix_fmt yuv420p -g #{ffmpeg_params.framerate * 2} -r #{ffmpeg_params.framerate}"
# Audio encoding settings
audio_cmd = "-c:a aac -b:a #{ffmpeg_params.audio_bitrate}k -ar 48000 -ac 2"
# RTMP output settings
output_cmd = "-f flv \"#{ingress.url}/#{ingress.stream_key}\""
# Complete command
complete_command = "#{base_cmd} #{input_cmd} #{video_cmd} #{audio_cmd} #{output_cmd}"
IO.puts("π§ Generated FFmpeg Command:")
IO.puts("=" |> String.duplicate(60))
IO.puts("")
IO.puts(complete_command)
IO.puts("")
IO.puts("π Copy and paste this command into your terminal to start streaming!")
IO.puts("")
IO.puts("π‘ Additional FFmpeg Tips:")
IO.puts(" β’ Add -loglevel info for detailed logging")
IO.puts(" β’ Use -t 60 to limit stream to 60 seconds for testing")
IO.puts(" β’ Add -re flag before input to read at native frame rate")
IO.puts(" β’ Use -threads 0 to auto-detect optimal thread count")
else
IO.puts("β οΈ Please create an RTMP ingress first")
end
Stream Monitoring and Health Check
Letβs monitor our RTMP stream and check its health:
# Stream monitoring function
defmodule StreamMonitor do
def check_stream_status(client, ingress_id) do
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
ingress = Enum.find(response.items, fn i -> i.ingress_id == ingress_id end)
if ingress do
status = ingress.state && ingress.state.status
status_info = case status do
:ENDPOINT_INACTIVE -> {"π΄", "Inactive - No stream detected"}
:ENDPOINT_BUFFERING -> {"π‘", "Buffering - Stream starting up"}
:ENDPOINT_PUBLISHING -> {"π’", "Publishing - Stream is live!"}
:ENDPOINT_ERROR -> {"β", "Error - Stream encountered issues"}
:ENDPOINT_COMPLETE -> {"β
", "Complete - Stream finished"}
_ -> {"βͺ", "Unknown status"}
end
{emoji, description} = status_info
%{
status: status,
emoji: emoji,
description: description,
ingress: ingress
}
else
{:error, "Ingress not found"}
end
{:error, reason} ->
{:error, reason}
end
end
end
# Monitor current stream
ingress = Process.get(:rtmp_ingress)
client = Process.get(:client)
if ingress do
case StreamMonitor.check_stream_status(client, ingress.ingress_id) do
%{emoji: emoji, description: desc, ingress: current_ingress} ->
IO.puts("π Stream Status Monitor")
IO.puts("=" |> String.duplicate(30))
IO.puts("#{emoji} Status: #{desc}")
IO.puts("π·οΈ Name: #{current_ingress.name}")
IO.puts("π ID: #{current_ingress.ingress_id}")
IO.puts("π Room: #{current_ingress.room_name}")
IO.puts("π€ Participant: #{current_ingress.participant_identity}")
if current_ingress.state do
state = current_ingress.state
IO.puts("")
IO.puts("π Stream Details:")
if state.started_at && state.started_at > 0 do
started_time = DateTime.from_unix!(state.started_at, :millisecond)
IO.puts(" β° Started: #{DateTime.to_string(started_time)}")
end
if state.ended_at && state.ended_at > 0 do
ended_time = DateTime.from_unix!(state.ended_at, :millisecond)
IO.puts(" π Ended: #{DateTime.to_string(ended_time)}")
end
if state.error && state.error != "" do
IO.puts(" β οΈ Error: #{state.error}")
end
end
{:error, reason} ->
IO.puts("β Failed to check stream status: #{inspect(reason)}")
end
else
IO.puts("β οΈ No RTMP ingress to monitor. Please create one first.")
end
Real-time Stream Monitoring
Letβs create a real-time monitoring widget that refreshes automatically:
# Create a monitoring frame that updates every 5 seconds
monitoring_frame = Kino.Frame.new()
# Monitoring task
monitoring_task = Task.async(fn ->
ingress = Process.get(:rtmp_ingress)
client = Process.get(:client)
if ingress && client do
Enum.each(1..12, fn iteration -> # Monitor for 1 minute (12 * 5 seconds)
timestamp = DateTime.utc_now() |> DateTime.to_string()
content = case StreamMonitor.check_stream_status(client, ingress.ingress_id) do
%{emoji: emoji, description: desc, ingress: current_ingress} ->
"""
## π Live Stream Monitor (Update ##{iteration})
**Last Update:** #{timestamp}
#{emoji} **Status:** #{desc}
- **Stream ID:** #{current_ingress.ingress_id}
- **Room:** #{current_ingress.room_name}
- **Participant:** #{current_ingress.participant_identity}
### Connection Details:
- **RTMP URL:** `#{current_ingress.url}`
- **Stream Key:** `#{current_ingress.stream_key}`
### Quick Actions:
- Start your OBS/FFmpeg stream now!
- Check that your room exists in LiveKit
- Verify your network connection
"""
{:error, reason} ->
"""
## β Monitoring Error (Update ##{iteration})
**Last Update:** #{timestamp}
Failed to check stream status: #{inspect(reason)}
"""
end
Kino.Frame.render(monitoring_frame, Kino.Markdown.new(content))
Process.sleep(5000) # Wait 5 seconds
end)
# Final update
Kino.Frame.render(monitoring_frame, Kino.Markdown.new("""
## π Monitoring Complete
**Monitoring session finished.**
Re-run this cell to start a new monitoring session.
"""))
else
Kino.Frame.render(monitoring_frame, Kino.Markdown.new("""
## β οΈ Monitoring Not Available
Please create an RTMP ingress first.
"""))
end
end)
monitoring_frame
Advanced RTMP Configuration
Letβs explore advanced RTMP ingress configurations:
# Advanced configuration form
advanced_form = Kino.Control.form(
[
stream_name: Kino.Input.text("Advanced Stream Name", default: "advanced-rtmp-#{System.system_time(:second)}"),
enable_transcoding: Kino.Input.checkbox("Enable Video Transcoding", default: true),
participant_name: Kino.Input.text("Display Name", default: "Advanced Streamer"),
metadata: Kino.Input.textarea("Custom Metadata (JSON)", default: """
{
"streamer_type": "professional",
"camera_model": "Sony A7S III",
"streaming_software": "OBS Studio",
"bitrate_target": "4000kbps",
"encoding": "x264"
}""")
],
submit: "Create Advanced RTMP Ingress"
)
# Create advanced RTMP ingress
advanced_params = Kino.Control.read(advanced_form)
client = Process.get(:client)
config = Process.get(:config)
# Parse and validate metadata JSON
metadata = try do
Jason.decode!(advanced_params.metadata)
advanced_params.metadata
rescue
_ -> "{\"note\": \"Invalid JSON provided, using fallback\"}"
end
request = %Livekit.CreateIngressRequest{
input_type: :RTMP_INPUT,
name: advanced_params.stream_name,
room_name: config.room_name,
participant_identity: "advanced-streamer-#{System.system_time(:second)}",
participant_name: advanced_params.participant_name,
participant_metadata: metadata,
enable_transcoding: advanced_params.enable_transcoding
}
case Livekit.IngressServiceClient.create_ingress(client, request) do
{:ok, ingress} ->
IO.puts("π Advanced RTMP Ingress Created!")
IO.puts("=" |> String.duplicate(50))
IO.puts("π ID: #{ingress.ingress_id}")
IO.puts("π·οΈ Name: #{ingress.name}")
IO.puts("π RTMP URL: #{ingress.url}")
IO.puts("π Stream Key: #{ingress.stream_key}")
IO.puts("π€ Identity: #{ingress.participant_identity}")
IO.puts("π Display Name: #{ingress.participant_name}")
IO.puts("βοΈ Transcoding: #{ingress.enable_transcoding}")
IO.puts("")
IO.puts("π Metadata:")
IO.puts(metadata)
{:error, reason} ->
IO.puts("β Failed to create advanced RTMP ingress: #{inspect(reason)}")
end
Troubleshooting Common RTMP Issues
IO.puts("π οΈ RTMP Troubleshooting Guide")
IO.puts("=" |> String.duplicate(40))
IO.puts("")
IO.puts("π **Common Issues and Solutions:**")
IO.puts("")
IO.puts("1οΈβ£ **Connection Refused / Can't Connect**")
IO.puts(" β’ Verify RTMP URL and stream key are correct")
IO.puts(" β’ Check firewall settings (port 1935 for RTMP)")
IO.puts(" β’ Ensure LiveKit server is accessible")
IO.puts(" β’ Try using RTMPS (secure RTMP) if available")
IO.puts("")
IO.puts("2οΈβ£ **Stream Keeps Disconnecting**")
IO.puts(" β’ Check network stability and bandwidth")
IO.puts(" β’ Reduce bitrate in OBS/FFmpeg")
IO.puts(" β’ Increase keyframe interval (2-4 seconds)")
IO.puts(" β’ Use CBR (Constant Bitrate) instead of VBR")
IO.puts("")
IO.puts("3οΈβ£ **Poor Video Quality**")
IO.puts(" β’ Increase video bitrate (balance with upload speed)")
IO.puts(" β’ Check CPU usage (use hardware encoder if available)")
IO.puts(" β’ Verify resolution and framerate settings")
IO.puts(" β’ Use appropriate encoder preset (fast/medium/slow)")
IO.puts("")
IO.puts("4οΈβ£ **Audio Issues**")
IO.puts(" β’ Set audio sample rate to 48kHz")
IO.puts(" β’ Use stereo audio (2 channels)")
IO.puts(" β’ Check audio bitrate (128-320 kbps)")
IO.puts(" β’ Verify audio source is not muted/disabled")
IO.puts("")
IO.puts("5οΈβ£ **Ingress Shows as Inactive**")
IO.puts(" β’ Confirm stream is actually running in OBS/FFmpeg")
IO.puts(" β’ Check if target room exists in LiveKit")
IO.puts(" β’ Verify participant identity is unique")
IO.puts(" β’ Wait 10-15 seconds for status to update")
IO.puts("")
IO.puts("π§ **Debug Commands:**")
IO.puts("")
IO.puts("**Test with FFmpeg:**")
IO.puts("```bash")
IO.puts("ffmpeg -re -f lavfi -i testsrc2 -f lavfi -i sine \\")
IO.puts(" -c:v libx264 -preset veryfast -b:v 1000k \\")
IO.puts(" -c:a aac -b:a 128k \\")
IO.puts(" -f flv rtmp://your-rtmp-url/your-stream-key")
IO.puts("```")
IO.puts("")
IO.puts("**Check network connectivity:**")
IO.puts("```bash")
IO.puts("telnet your-livekit-server.com 1935")
IO.puts("```")
Performance Optimization Tips
IO.puts("β‘ RTMP Performance Optimization")
IO.puts("=" |> String.duplicate(40))
IO.puts("")
IO.puts("π― **Bitrate Guidelines:**")
IO.puts(" β’ 1080p 30fps: 4000-6000 kbps")
IO.puts(" β’ 720p 30fps: 2000-4000 kbps")
IO.puts(" β’ 480p 30fps: 1000-2000 kbps")
IO.puts(" β’ Audio: 128-320 kbps")
IO.puts("")
IO.puts("π§ **OBS Studio Optimization:**")
IO.puts(" β’ Use x264 encoder with 'veryfast' preset")
IO.puts(" β’ Enable hardware encoding if available (NVENC/QuickSync)")
IO.puts(" β’ Set keyframe interval to 2 seconds")
IO.puts(" β’ Use CBR rate control")
IO.puts(" β’ Enable 'Enforce streaming service settings'")
IO.puts("")
IO.puts("π **Network Optimization:**")
IO.puts(" β’ Use wired connection when possible")
IO.puts(" β’ Ensure upload speed is 1.5x your bitrate")
IO.puts(" β’ Close bandwidth-heavy applications")
IO.puts(" β’ Consider using QoS on your router")
IO.puts("")
IO.puts("π» **System Optimization:**")
IO.puts(" β’ Close unnecessary applications")
IO.puts(" β’ Monitor CPU and GPU usage")
IO.puts(" β’ Use dedicated streaming PC if possible")
IO.puts(" β’ Ensure adequate cooling/thermal management")
Cleanup and Resource Management
# Cleanup form for managing ingress endpoints
cleanup_form = Kino.Control.form(
[
action: Kino.Input.select("Cleanup Action", [
{"List all my ingress endpoints", :list},
{"Delete specific ingress", :delete_specific},
{"Delete all test ingress endpoints", :delete_test_ingress}
]),
ingress_id_to_delete: Kino.Input.text("Ingress ID to delete (if specific)", default: "")
],
submit: "Execute Cleanup"
)
# Execute cleanup operations
cleanup_params = Kino.Control.read(cleanup_form)
client = Process.get(:client)
case cleanup_params.action do
:list ->
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
IO.puts("π All Ingress Endpoints (#{length(response.items)} total):")
for ingress <- response.items do
status_emoji = case ingress.state && ingress.state.status do
:ENDPOINT_PUBLISHING -> "π’"
:ENDPOINT_INACTIVE -> "π΄"
:ENDPOINT_BUFFERING -> "π‘"
_ -> "βͺ"
end
IO.puts(" #{status_emoji} #{ingress.name}")
IO.puts(" ID: #{ingress.ingress_id}")
IO.puts(" Type: #{ingress.input_type}")
IO.puts(" Room: #{ingress.room_name}")
IO.puts("")
end
{:error, reason} ->
IO.puts("β Failed to list ingress: #{inspect(reason)}")
end
:delete_specific ->
if cleanup_params.ingress_id_to_delete != "" do
request = %Livekit.DeleteIngressRequest{ingress_id: cleanup_params.ingress_id_to_delete}
case Livekit.IngressServiceClient.delete_ingress(client, request) do
{:ok, deleted} ->
IO.puts("β
Deleted ingress: #{deleted.name} (#{deleted.ingress_id})")
{:error, reason} ->
IO.puts("β Failed to delete: #{inspect(reason)}")
end
else
IO.puts("β οΈ Please provide an ingress ID to delete")
end
:delete_test_ingress ->
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
test_ingress = Enum.filter(response.items, fn ingress ->
String.contains?(ingress.name, "demo") ||
String.contains?(ingress.name, "test") ||
String.contains?(ingress.name, "rtmp") ||
String.contains?(ingress.name, "obs")
end)
if Enum.empty?(test_ingress) do
IO.puts("β¨ No test ingress endpoints found to clean up")
else
IO.puts("π§Ή Found #{length(test_ingress)} test ingress endpoints to clean up:")
for ingress <- test_ingress do
request = %Livekit.DeleteIngressRequest{ingress_id: ingress.ingress_id}
case Livekit.IngressServiceClient.delete_ingress(client, request) do
{:ok, _} ->
IO.puts(" β
Deleted: #{ingress.name}")
{:error, reason} ->
IO.puts(" β Failed to delete #{ingress.name}: #{inspect(reason)}")
end
end
end
{:error, reason} ->
IO.puts("β Failed to list ingress for cleanup: #{inspect(reason)}")
end
end
Summary and Next Steps
IO.puts("π RTMP Streaming Tutorial Complete!")
IO.puts("=" |> String.duplicate(40))
IO.puts("")
IO.puts("β
**What You've Accomplished:**")
IO.puts(" β’ Created RTMP ingress endpoints")
IO.puts(" β’ Generated OBS Studio configurations")
IO.puts(" β’ Built FFmpeg streaming commands")
IO.puts(" β’ Monitored stream health in real-time")
IO.puts(" β’ Learned troubleshooting techniques")
IO.puts(" β’ Optimized streaming performance")
IO.puts("")
IO.puts("π **Recommended Next Steps:**")
IO.puts(" 1. Try the WebRTC Ingress Livebook for low-latency streaming")
IO.puts(" 2. Explore File Processing Livebook for batch video ingestion")
IO.puts(" 3. Set up automated ingress management workflows")
IO.puts(" 4. Implement custom monitoring and alerting")
IO.puts("")
IO.puts("π **Additional Resources:**")
IO.puts(" β’ OBS Studio: https://obsproject.com/")
IO.puts(" β’ FFmpeg Documentation: https://ffmpeg.org/documentation.html")
IO.puts(" β’ LiveKit RTMP Guide: https://docs.livekit.io/ingress/rtmp/")
IO.puts(" β’ RTMP Specification: https://rtmp.veriskope.com/docs/spec/")