OpenAI Realtime Integration with Membrane WebRTC
File.cd(__DIR__)
Logger.configure(level: :info)
Mix.install([
{:membrane_core, "~> 1.1"},
{:membrane_webrtc_plugin, "~> 0.22.0"},
{:membrane_opus_plugin, "~> 0.20.4"},
{:membrane_raw_audio_parser_plugin, "~> 0.4.0"},
{:membrane_realtimer_plugin, "~> 0.10.0"},
{:kino_membrane, "~> 0.3.0"},
{:websockex, "~> 0.4.3"},
{:jason, "~> 1.4"}
])
Introduction
This demo shows how to use Membrane Framework to create a simple WebRTC based app that allows you to have a conversation with ChatGPT using the newest OpenAI Realtime API.
WebSocket handler
OpenAI Realtime API requires sending and receiving audio via the WebSocket. Let’s create a module responsible for handling it with WebSockex
library.
defmodule OpenAIWebSocket do
use WebSockex
require Logger
def start_link(opts) do
WebSockex.start_link(
"wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01",
__MODULE__,
%{parent: self()},
opts
)
end
@impl true
def handle_frame(frame, state) do
send(state.parent, {:websocket_frame, frame})
{:ok, state}
end
def send_frame(ws, frame), do: WebSockex.send_frame(ws, {:text, frame})
end
Membrane Components
Then, we will create a Membrane Element that will receive and send raw audio frames via the WebSocket.
defmodule OpenAIEndpoint do
use Membrane.Endpoint
require Membrane.Logger
def_input_pad(:input, accepted_format: _any)
def_output_pad(:output, accepted_format: _any, flow_control: :push)
def_options(websocket_opts: [])
@impl true
def handle_init(_ctx, opts) do
{:ok, ws} = OpenAIWebSocket.start_link(opts.websocket_opts)
{[], %{ws: ws}}
end
@impl true
def handle_playing(_ctx, state) do
# format of the buffers sent in the line 36
format = %Membrane.RawAudio{channels: 1, sample_rate: 24_000, sample_format: :s16le}
{[stream_format: {:output, format}], state}
end
@impl true
def handle_buffer(:input, buffer, _ctx, state) do
audio = Base.encode64(buffer.payload)
frame = %{type: "input_audio_buffer.append", audio: audio} |> Jason.encode!()
:ok = OpenAIWebSocket.send_frame(state.ws, frame)
{[], state}
end
@impl true
def handle_info({:websocket_frame, {:text, frame}}, _ctx, state) do
case Jason.decode!(frame) do
%{"type" => "response.audio.delta", "delta" => delta} ->
audio_payload = Base.decode64!(delta)
{[buffer: {:output, %Membrane.Buffer{payload: audio_payload}}], state}
%{"type" => "response.audio.done"} ->
{[event: {:output, %Membrane.Realtimer.Events.Reset{}}], state}
%{"type" => "response.audio_transcript.done", "transcript" => transcript} ->
Membrane.Logger.info("AI transcription: #{transcript}")
{[], state}
%{} ->
{[], state}
end
end
end
Now, let’s write a Pipeline module that exchanges the media with the browser using Membrane.WebRTC.Source
and Sink
and with OpenAI server using OpenAIEndpoint
.
Because WebRTC requires and provides audio in OPUS format and OpenAI Realtime API uses raw audio, we have to spawn the proper encoder and decoder between WebRTC and OpenAI elements.
defmodule OpenAIPipeline do
use Membrane.Pipeline
@impl true
def handle_init(_ctx, opts) do
spec =
child(:webrtc_source, %Membrane.WebRTC.Source{
signaling: {:websocket, port: opts[:webrtc_source_ws_port]}
})
|> via_out(:output, options: [kind: :audio])
|> child(:input_opus_parser, Membrane.Opus.Parser)
|> child(:opus_decoder, %Membrane.Opus.Decoder{sample_rate: 24_000})
|> child(:open_ai, %OpenAIEndpoint{websocket_opts: opts[:openai_ws_opts]})
|> child(:raw_audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true})
|> via_in(:input, target_queue_size: 1_000_000_000, toilet_capacity: 1_000_000_000)
|> child(:realtimer, Membrane.Realtimer)
|> child(:opus_encoder, Membrane.Opus.Encoder)
|> via_in(:input, options: [kind: :audio])
|> child(:webrtc_sink, %Membrane.WebRTC.Sink{
tracks: [:audio],
signaling: {:websocket, port: opts[:webrtc_sink_ws_port]}
})
{[spec: spec], %{}}
end
end
Getting OpenAI API key from the env
Let’s set the WebSocket options (remember to set OPENAI_API KEY
env).
openai_api_key = System.get_env("OPENAI_API_KEY")
if openai_api_key == nil do
raise "You have to set OPENAI_API_KEY env"
end
openai_ws_opts = [
extra_headers: [
{"Authorization", "Bearer " <> openai_api_key},
{"OpenAI-Beta", "realtime=v1"}
]
]
:ok
Running the server
Now, let’s start the pipeline.
{:ok, _supervisor, pipeline} =
Membrane.Pipeline.start_link(OpenAIPipeline,
openai_ws_opts: openai_ws_opts,
webrtc_source_ws_port: 8829,
webrtc_sink_ws_port: 8831
)
:inets.start()
:inets.start(:httpd,
bind_address: ~c"localhost",
port: 8000,
document_root: ~c"#{__DIR__}/assets",
server_name: ~c"webrtc",
server_root: "/tmp"
)
Process.monitor(pipeline)
receive do
{:DOWN, _ref, :process, ^pipeline, _reason} -> :ok
end
Enter from the new tab of Google Chrome and start your conversation with the AI!
Transcription of AI answers will be available in the logs of the cell below.