LiveKit Ingress Service - Basic Setup
Mix.install([
{:livekit, path: "../.."},
{:kino, "~> 0.12"}
])
Overview
This Livebook demonstrates the basic usage of LiveKit’s Ingress Service, which allows you to bring external streams into LiveKit rooms. The Ingress Service supports multiple input types:
- RTMP streams: Traditional streaming protocol
- WHIP streams: WebRTC-HTTP Ingress Protocol
- URL streams: Direct URL-based inputs (HLS, etc.)
Configuration Setup
Let’s start by setting up our LiveKit configuration. You’ll need your LiveKit server credentials.
# Configuration form
config_form = Kino.Control.form(
[
api_key: Kino.Input.password("API Key"),
api_secret: Kino.Input.password("API Secret"),
url: Kino.Input.text("LiveKit Server URL", default: "wss://your-server.livekit.cloud")
],
submit: "Save Configuration"
)
# Get configuration values
config = Kino.Control.read(config_form)
# Store in process dictionary for later use
Process.put(:livekit_config, config)
IO.puts("✅ Configuration saved!")
IO.inspect(Map.put(config, :api_secret, "[HIDDEN]"))
Creating an Ingress Service Client
Let’s create our ingress service client using the configured credentials:
config = Process.get(:livekit_config)
case Livekit.IngressServiceClient.new(config.url, config.api_key, config.api_secret) do
{:ok, client} ->
Process.put(:ingress_client, client)
IO.puts("✅ Successfully connected to LiveKit Ingress Service!")
{:error, reason} ->
IO.puts("❌ Failed to connect: #{reason}")
IO.puts("Please check your configuration and try again.")
end
Basic Ingress Operations
1. List Existing Ingress Endpoints
Let’s start by checking what ingress endpoints already exist:
client = Process.get(:ingress_client)
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
if Enum.empty?(response.items) do
IO.puts("📭 No ingress endpoints found.")
else
IO.puts("📡 Found #{length(response.items)} ingress endpoint(s):")
for ingress <- response.items do
IO.puts(" - ID: #{ingress.ingress_id}")
IO.puts(" Name: #{ingress.name}")
IO.puts(" Type: #{ingress.input_type}")
IO.puts(" Room: #{ingress.room_name}")
IO.puts(" Status: #{ingress.state && ingress.state.status}")
IO.puts("")
end
end
{:error, reason} ->
IO.puts("❌ Failed to list ingress endpoints: #{inspect(reason)}")
end
2. Create a Simple RTMP Ingress
Let’s create a basic RTMP ingress endpoint:
# Form for ingress creation
ingress_form = Kino.Control.form(
[
name: Kino.Input.text("Ingress Name", default: "livebook-demo"),
room_name: Kino.Input.text("Room Name", default: "demo-room"),
participant_identity: Kino.Input.text("Participant Identity", default: "rtmp-streamer")
],
submit: "Create RTMP Ingress"
)
# Create the RTMP ingress
ingress_params = Kino.Control.read(ingress_form)
client = Process.get(:ingress_client)
request = %Livekit.CreateIngressRequest{
input_type: :RTMP_INPUT,
name: ingress_params.name,
room_name: ingress_params.room_name,
participant_identity: ingress_params.participant_identity
}
case Livekit.IngressServiceClient.create_ingress(client, request) do
{:ok, ingress} ->
Process.put(:created_ingress, ingress)
IO.puts("✅ Successfully created RTMP ingress!")
IO.puts("📡 Ingress Details:")
IO.puts(" ID: #{ingress.ingress_id}")
IO.puts(" Name: #{ingress.name}")
IO.puts(" RTMP URL: #{ingress.url}")
IO.puts(" Stream Key: #{ingress.stream_key}")
IO.puts(" Room: #{ingress.room_name}")
IO.puts(" Participant: #{ingress.participant_identity}")
{:error, reason} ->
IO.puts("❌ Failed to create ingress: #{inspect(reason)}")
end
3. Test the Created Ingress
Let’s verify our newly created ingress by listing all ingress endpoints again:
client = Process.get(:ingress_client)
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
IO.puts("📡 Current ingress endpoints (#{length(response.items)} total):")
for ingress <- response.items do
status = if ingress == Process.get(:created_ingress) do
"🆕 NEWLY CREATED"
else
"📍 Existing"
end
IO.puts(" #{status}")
IO.puts(" - ID: #{ingress.ingress_id}")
IO.puts(" Name: #{ingress.name}")
IO.puts(" Type: #{ingress.input_type}")
IO.puts(" Room: #{ingress.room_name}")
IO.puts("")
end
{:error, reason} ->
IO.puts("❌ Failed to list ingress endpoints: #{inspect(reason)}")
end
Understanding Ingress Types
Let’s explore the different ingress types available:
IO.puts("🔄 LiveKit Ingress Types:")
IO.puts("")
IO.puts("1️⃣ RTMP_INPUT")
IO.puts(" - Traditional streaming protocol")
IO.puts(" - Compatible with OBS, FFmpeg, and most streaming software")
IO.puts(" - Provides RTMP URL + Stream Key")
IO.puts("")
IO.puts("2️⃣ WHIP_INPUT")
IO.puts(" - WebRTC-HTTP Ingress Protocol")
IO.puts(" - Low-latency streaming over WebRTC")
IO.puts(" - Modern alternative to RTMP")
IO.puts("")
IO.puts("3️⃣ URL_INPUT")
IO.puts(" - Direct URL-based inputs")
IO.puts(" - Supports HLS, DASH, and other URL streams")
IO.puts(" - Good for re-streaming existing content")
Interactive Testing
Let’s create an interactive section where you can test different ingress operations:
# Interactive operation selector
operation_form = Kino.Control.form(
[
operation: Kino.Input.select("Choose Operation", [
{"List all ingress endpoints", :list},
{"Create WHIP ingress", :create_whip},
{"Create URL ingress", :create_url},
{"Delete an ingress", :delete}
]),
room_name: Kino.Input.text("Room Name (for create operations)", default: "test-room"),
identity: Kino.Input.text("Participant Identity (for create)", default: "test-participant"),
url_input: Kino.Input.text("URL (for URL ingress)", default: "https://example.com/stream.m3u8"),
ingress_id: Kino.Input.text("Ingress ID (for delete)", default: "")
],
submit: "Execute Operation"
)
# Execute the selected operation
params = Kino.Control.read(operation_form)
client = Process.get(:ingress_client)
case params.operation do
:list ->
case Livekit.IngressServiceClient.list_ingress(client) do
{:ok, response} ->
IO.puts("📡 Ingress Endpoints (#{length(response.items)} total):")
for ingress <- response.items do
IO.puts(" - #{ingress.name} (#{ingress.ingress_id}) - #{ingress.input_type}")
end
{:error, reason} ->
IO.puts("❌ Error: #{inspect(reason)}")
end
:create_whip ->
request = %Livekit.CreateIngressRequest{
input_type: :WHIP_INPUT,
name: "whip-#{System.system_time(:second)}",
room_name: params.room_name,
participant_identity: params.identity
}
case Livekit.IngressServiceClient.create_ingress(client, request) do
{:ok, ingress} ->
IO.puts("✅ Created WHIP ingress: #{ingress.ingress_id}")
IO.puts(" WHIP URL: #{ingress.url}")
{:error, reason} ->
IO.puts("❌ Error: #{inspect(reason)}")
end
:create_url ->
request = %Livekit.CreateIngressRequest{
input_type: :URL_INPUT,
name: "url-#{System.system_time(:second)}",
room_name: params.room_name,
participant_identity: params.identity,
url: params.url_input
}
case Livekit.IngressServiceClient.create_ingress(client, request) do
{:ok, ingress} ->
IO.puts("✅ Created URL ingress: #{ingress.ingress_id}")
IO.puts(" Source URL: #{ingress.url}")
{:error, reason} ->
IO.puts("❌ Error: #{inspect(reason)}")
end
:delete ->
if params.ingress_id != "" do
request = %Livekit.DeleteIngressRequest{ingress_id: params.ingress_id}
case Livekit.IngressServiceClient.delete_ingress(client, request) do
{:ok, deleted_ingress} ->
IO.puts("✅ Deleted ingress: #{deleted_ingress.name} (#{deleted_ingress.ingress_id})")
{:error, reason} ->
IO.puts("❌ Error: #{inspect(reason)}")
end
else
IO.puts("⚠️ Please provide an ingress ID to delete")
end
_ ->
IO.puts("⚠️ Unknown operation")
end
Next Steps
Congratulations! You’ve successfully:
- ✅ Connected to the LiveKit Ingress Service
- ✅ Listed existing ingress endpoints
- ✅ Created a new RTMP ingress endpoint
- ✅ Learned about different ingress types
- ✅ Interactively tested various operations
Recommended Next Steps:
- Try the RTMP Streaming Livebook - Learn how to set up OBS or FFmpeg to stream to your RTMP endpoint
- Explore WebRTC Ingress - Set up low-latency streaming with WHIP
- Test File Processing - Learn how to ingest video files and URLs
- Set up Management Workflows - Automate ingress lifecycle management
Useful Resources:
Cleanup
Don’t forget to clean up any test ingress endpoints you created:
# This will help you clean up test resources
client = Process.get(:ingress_client)
IO.puts("🧹 Cleanup Helper")
IO.puts("If you created test ingress endpoints, consider deleting them to avoid clutter.")
IO.puts("Use the interactive section above with the 'Delete an ingress' operation.")