Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

LiveKit Ingress Service - Basic Setup

ingress_basic_setup.livemd

LiveKit Ingress Service - Basic Setup

Mix.install([
  {:livekit, path: "../.."},
  {:kino, "~> 0.12"}
])

Overview

This Livebook demonstrates the basic usage of LiveKit’s Ingress Service, which allows you to bring external streams into LiveKit rooms. The Ingress Service supports multiple input types:

  • RTMP streams: Traditional streaming protocol
  • WHIP streams: WebRTC-HTTP Ingress Protocol
  • URL streams: Direct URL-based inputs (HLS, etc.)

Configuration Setup

Let’s start by setting up our LiveKit configuration. You’ll need your LiveKit server credentials.

# Configuration form
config_form = Kino.Control.form(
  [
    api_key: Kino.Input.password("API Key"),
    api_secret: Kino.Input.password("API Secret"), 
    url: Kino.Input.text("LiveKit Server URL", default: "wss://your-server.livekit.cloud")
  ],
  submit: "Save Configuration"
)
# Get configuration values
config = Kino.Control.read(config_form)

# Store in process dictionary for later use
Process.put(:livekit_config, config)

IO.puts("✅ Configuration saved!")
IO.inspect(Map.put(config, :api_secret, "[HIDDEN]"))

Creating an Ingress Service Client

Let’s create our ingress service client using the configured credentials:

config = Process.get(:livekit_config)

case Livekit.IngressServiceClient.new(config.url, config.api_key, config.api_secret) do
  {:ok, client} ->
    Process.put(:ingress_client, client)
    IO.puts("✅ Successfully connected to LiveKit Ingress Service!")
    
  {:error, reason} ->
    IO.puts("❌ Failed to connect: #{reason}")
    IO.puts("Please check your configuration and try again.")
end

Basic Ingress Operations

1. List Existing Ingress Endpoints

Let’s start by checking what ingress endpoints already exist:

client = Process.get(:ingress_client)

case Livekit.IngressServiceClient.list_ingress(client) do
  {:ok, response} ->
    if Enum.empty?(response.items) do
      IO.puts("📭 No ingress endpoints found.")
    else
      IO.puts("📡 Found #{length(response.items)} ingress endpoint(s):")
      
      for ingress <- response.items do
        IO.puts("  - ID: #{ingress.ingress_id}")
        IO.puts("    Name: #{ingress.name}")
        IO.puts("    Type: #{ingress.input_type}")
        IO.puts("    Room: #{ingress.room_name}")
        IO.puts("    Status: #{ingress.state &amp;&amp; ingress.state.status}")
        IO.puts("")
      end
    end
    
  {:error, reason} ->
    IO.puts("❌ Failed to list ingress endpoints: #{inspect(reason)}")
end

2. Create a Simple RTMP Ingress

Let’s create a basic RTMP ingress endpoint:

# Form for ingress creation
ingress_form = Kino.Control.form(
  [
    name: Kino.Input.text("Ingress Name", default: "livebook-demo"),
    room_name: Kino.Input.text("Room Name", default: "demo-room"),
    participant_identity: Kino.Input.text("Participant Identity", default: "rtmp-streamer")
  ],
  submit: "Create RTMP Ingress"
)
# Create the RTMP ingress
ingress_params = Kino.Control.read(ingress_form)
client = Process.get(:ingress_client)

request = %Livekit.CreateIngressRequest{
  input_type: :RTMP_INPUT,
  name: ingress_params.name,
  room_name: ingress_params.room_name,
  participant_identity: ingress_params.participant_identity
}

case Livekit.IngressServiceClient.create_ingress(client, request) do
  {:ok, ingress} ->
    Process.put(:created_ingress, ingress)
    
    IO.puts("✅ Successfully created RTMP ingress!")
    IO.puts("📡 Ingress Details:")
    IO.puts("   ID: #{ingress.ingress_id}")
    IO.puts("   Name: #{ingress.name}")
    IO.puts("   RTMP URL: #{ingress.url}")
    IO.puts("   Stream Key: #{ingress.stream_key}")
    IO.puts("   Room: #{ingress.room_name}")
    IO.puts("   Participant: #{ingress.participant_identity}")
    
  {:error, reason} ->
    IO.puts("❌ Failed to create ingress: #{inspect(reason)}")
end

3. Test the Created Ingress

Let’s verify our newly created ingress by listing all ingress endpoints again:

client = Process.get(:ingress_client)

case Livekit.IngressServiceClient.list_ingress(client) do
  {:ok, response} ->
    IO.puts("📡 Current ingress endpoints (#{length(response.items)} total):")
    
    for ingress <- response.items do
      status = if ingress == Process.get(:created_ingress) do
        "🆕 NEWLY CREATED"
      else
        "📍 Existing"
      end
      
      IO.puts("  #{status}")
      IO.puts("  - ID: #{ingress.ingress_id}")
      IO.puts("    Name: #{ingress.name}")
      IO.puts("    Type: #{ingress.input_type}")
      IO.puts("    Room: #{ingress.room_name}")
      IO.puts("")
    end
    
  {:error, reason} ->
    IO.puts("❌ Failed to list ingress endpoints: #{inspect(reason)}")
end

Understanding Ingress Types

Let’s explore the different ingress types available:

IO.puts("🔄 LiveKit Ingress Types:")
IO.puts("")

IO.puts("1️⃣ RTMP_INPUT")
IO.puts("   - Traditional streaming protocol")
IO.puts("   - Compatible with OBS, FFmpeg, and most streaming software")
IO.puts("   - Provides RTMP URL + Stream Key")
IO.puts("")

IO.puts("2️⃣ WHIP_INPUT") 
IO.puts("   - WebRTC-HTTP Ingress Protocol")
IO.puts("   - Low-latency streaming over WebRTC")
IO.puts("   - Modern alternative to RTMP")
IO.puts("")

IO.puts("3️⃣ URL_INPUT")
IO.puts("   - Direct URL-based inputs")
IO.puts("   - Supports HLS, DASH, and other URL streams")
IO.puts("   - Good for re-streaming existing content")

Interactive Testing

Let’s create an interactive section where you can test different ingress operations:

# Interactive operation selector
operation_form = Kino.Control.form(
  [
    operation: Kino.Input.select("Choose Operation", [
      {"List all ingress endpoints", :list},
      {"Create WHIP ingress", :create_whip},
      {"Create URL ingress", :create_url},
      {"Delete an ingress", :delete}
    ]),
    room_name: Kino.Input.text("Room Name (for create operations)", default: "test-room"),
    identity: Kino.Input.text("Participant Identity (for create)", default: "test-participant"),
    url_input: Kino.Input.text("URL (for URL ingress)", default: "https://example.com/stream.m3u8"),
    ingress_id: Kino.Input.text("Ingress ID (for delete)", default: "")
  ],
  submit: "Execute Operation"
)
# Execute the selected operation
params = Kino.Control.read(operation_form)
client = Process.get(:ingress_client)

case params.operation do
  :list ->
    case Livekit.IngressServiceClient.list_ingress(client) do
      {:ok, response} ->
        IO.puts("📡 Ingress Endpoints (#{length(response.items)} total):")
        for ingress <- response.items do
          IO.puts("  - #{ingress.name} (#{ingress.ingress_id}) - #{ingress.input_type}")
        end
      {:error, reason} ->
        IO.puts("❌ Error: #{inspect(reason)}")
    end
    
  :create_whip ->
    request = %Livekit.CreateIngressRequest{
      input_type: :WHIP_INPUT,
      name: "whip-#{System.system_time(:second)}",
      room_name: params.room_name,
      participant_identity: params.identity
    }
    
    case Livekit.IngressServiceClient.create_ingress(client, request) do
      {:ok, ingress} ->
        IO.puts("✅ Created WHIP ingress: #{ingress.ingress_id}")
        IO.puts("   WHIP URL: #{ingress.url}")
      {:error, reason} ->
        IO.puts("❌ Error: #{inspect(reason)}")
    end
    
  :create_url ->
    request = %Livekit.CreateIngressRequest{
      input_type: :URL_INPUT,
      name: "url-#{System.system_time(:second)}",
      room_name: params.room_name,
      participant_identity: params.identity,
      url: params.url_input
    }
    
    case Livekit.IngressServiceClient.create_ingress(client, request) do
      {:ok, ingress} ->
        IO.puts("✅ Created URL ingress: #{ingress.ingress_id}")
        IO.puts("   Source URL: #{ingress.url}")
      {:error, reason} ->
        IO.puts("❌ Error: #{inspect(reason)}")
    end
    
  :delete ->
    if params.ingress_id != "" do
      request = %Livekit.DeleteIngressRequest{ingress_id: params.ingress_id}
      
      case Livekit.IngressServiceClient.delete_ingress(client, request) do
        {:ok, deleted_ingress} ->
          IO.puts("✅ Deleted ingress: #{deleted_ingress.name} (#{deleted_ingress.ingress_id})")
        {:error, reason} ->
          IO.puts("❌ Error: #{inspect(reason)}")
      end
    else
      IO.puts("⚠️  Please provide an ingress ID to delete")
    end
    
  _ ->
    IO.puts("⚠️  Unknown operation")
end

Next Steps

Congratulations! You’ve successfully:

  • ✅ Connected to the LiveKit Ingress Service
  • ✅ Listed existing ingress endpoints
  • ✅ Created a new RTMP ingress endpoint
  • ✅ Learned about different ingress types
  • ✅ Interactively tested various operations

Recommended Next Steps:

  1. Try the RTMP Streaming Livebook - Learn how to set up OBS or FFmpeg to stream to your RTMP endpoint
  2. Explore WebRTC Ingress - Set up low-latency streaming with WHIP
  3. Test File Processing - Learn how to ingest video files and URLs
  4. Set up Management Workflows - Automate ingress lifecycle management

Useful Resources:

Cleanup

Don’t forget to clean up any test ingress endpoints you created:

# This will help you clean up test resources
client = Process.get(:ingress_client)

IO.puts("🧹 Cleanup Helper")
IO.puts("If you created test ingress endpoints, consider deleting them to avoid clutter.")
IO.puts("Use the interactive section above with the 'Delete an ingress' operation.")