Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

OpenAI Realtime Integration with Membrane WebRTC

openai_realtime_with_membrane_webrtc.livemd

OpenAI Realtime Integration with Membrane WebRTC

File.cd(__DIR__)
Logger.configure(level: :info)

Mix.install([
  {:membrane_core, "~> 1.1"},
  {:membrane_webrtc_plugin, "~> 0.22.0"},
  {:membrane_opus_plugin, "~> 0.20.4"},
  {:membrane_raw_audio_parser_plugin, "~> 0.4.0"},
  {:membrane_realtimer_plugin, "~> 0.10.0"},
  {:kino_membrane, "~> 0.3.0"},
  {:websockex, "~> 0.4.3"},
  {:jason, "~> 1.4"}
])

Introduction

This demo shows how to use Membrane Framework to create a simple WebRTC based app that allows you to have a conversation with ChatGPT using the newest OpenAI Realtime API.

WebSocket handler

OpenAI Realtime API requires sending and receiving audio via the WebSocket. Let’s create a module responsible for handling it with WebSockex library.

defmodule OpenAIWebSocket do
  use WebSockex
  require Logger

  def start_link(opts) do
    WebSockex.start_link(
      "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01",
      __MODULE__,
      %{parent: self()},
      opts
    )
  end

  @impl true
  def handle_frame(frame, state) do
    send(state.parent, {:websocket_frame, frame})
    {:ok, state}
  end

  def send_frame(ws, frame), do: WebSockex.send_frame(ws, {:text, frame})
end

Membrane Components

Then, we will create a Membrane Element that will receive and send raw audio frames via the WebSocket.

defmodule OpenAIEndpoint do
  use Membrane.Endpoint
  require Membrane.Logger

  def_input_pad(:input, accepted_format: _any)
  def_output_pad(:output, accepted_format: _any, flow_control: :push)

  def_options(websocket_opts: [])

  @impl true
  def handle_init(_ctx, opts) do
    {:ok, ws} = OpenAIWebSocket.start_link(opts.websocket_opts)
    {[], %{ws: ws}}
  end

  @impl true
  def handle_playing(_ctx, state) do
    # format of the buffers sent in the line 36
    format = %Membrane.RawAudio{channels: 1, sample_rate: 24_000, sample_format: :s16le}
    {[stream_format: {:output, format}], state}
  end

  @impl true
  def handle_buffer(:input, buffer, _ctx, state) do
    audio = Base.encode64(buffer.payload)
    frame = %{type: "input_audio_buffer.append", audio: audio} |> Jason.encode!()
    :ok = OpenAIWebSocket.send_frame(state.ws, frame)
    {[], state}
  end

  @impl true
  def handle_info({:websocket_frame, {:text, frame}}, _ctx, state) do
    case Jason.decode!(frame) do
      %{"type" => "response.audio.delta", "delta" => delta} ->
        audio_payload = Base.decode64!(delta)
        {[buffer: {:output, %Membrane.Buffer{payload: audio_payload}}], state}

      %{"type" => "response.audio.done"} ->
        {[event: {:output, %Membrane.Realtimer.Events.Reset{}}], state}

      %{"type" => "response.audio_transcript.done", "transcript" => transcript} ->
        Membrane.Logger.info("AI transcription: #{transcript}")
        {[], state}

      %{} ->
        {[], state}
    end
  end
end

Now, let’s write a Pipeline module that exchanges the media with the browser using Membrane.WebRTC.Source and Sink and with OpenAI server using OpenAIEndpoint.

Because WebRTC requires and provides audio in OPUS format and OpenAI Realtime API uses raw audio, we have to spawn the proper encoder and decoder between WebRTC and OpenAI elements.

defmodule OpenAIPipeline do
  use Membrane.Pipeline

  @impl true
  def handle_init(_ctx, opts) do
    spec =
      child(:webrtc_source, %Membrane.WebRTC.Source{
        signaling: {:websocket, port: opts[:webrtc_source_ws_port]}
      })
      |> via_out(:output, options: [kind: :audio])
      |> child(:input_opus_parser, Membrane.Opus.Parser)
      |> child(:opus_decoder, %Membrane.Opus.Decoder{sample_rate: 24_000})
      |> child(:open_ai, %OpenAIEndpoint{websocket_opts: opts[:openai_ws_opts]})
      |> child(:raw_audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true})
      |> via_in(:input, target_queue_size: 1_000_000_000, toilet_capacity: 1_000_000_000)
      |> child(:realtimer, Membrane.Realtimer)
      |> child(:opus_encoder, Membrane.Opus.Encoder)
      |> via_in(:input, options: [kind: :audio])
      |> child(:webrtc_sink, %Membrane.WebRTC.Sink{
        tracks: [:audio],
        signaling: {:websocket, port: opts[:webrtc_sink_ws_port]}
      })

    {[spec: spec], %{}}
  end
end

Getting OpenAI API key from the env

Let’s set the WebSocket options (remember to set OPENAI_API KEY env).

openai_api_key = System.get_env("OPENAI_API_KEY")

if openai_api_key == nil do
  raise "You have to set OPENAI_API_KEY env"
end

openai_ws_opts = [
  extra_headers: [
    {"Authorization", "Bearer " <> openai_api_key},
    {"OpenAI-Beta", "realtime=v1"}
  ]
]

:ok

Running the server

Now, let’s start the pipeline.

{:ok, _supervisor, pipeline} =
  Membrane.Pipeline.start_link(OpenAIPipeline,
    openai_ws_opts: openai_ws_opts,
    webrtc_source_ws_port: 8829,
    webrtc_sink_ws_port: 8831
  )

:inets.start()

:inets.start(:httpd,
  bind_address: ~c"localhost",
  port: 8000,
  document_root: ~c"#{__DIR__}/assets",
  server_name: ~c"webrtc",
  server_root: "/tmp"
)

Process.monitor(pipeline)

receive do
  {:DOWN, _ref, :process, ^pipeline, _reason} -> :ok
end

Enter from the new tab of Google Chrome and start your conversation with the AI!

Transcription of AI answers will be available in the logs of the cell below.