Protobuf over WebSocket
Mix.install([
{:phoenix_playground, "~> 0.1.7"},
{:kino, "~> 0.16.1"},
{:protobuf, "~> 0.14.1"},
{:websockex, "~> 0.5.0", hex: :websockex_wt}
])
require Logger
Protobuf messages
defmodule LivebookProto.Event do
use Protobuf
oneof :type, 0
field :client_connected, 1,
type: LivebookProto.ClientConnected,
oneof: 0
end
defmodule LivebookProto.ClientConnected do
use Protobuf
field(:env_vars, 1, repeated: true, type: LivebookProto.EnvironmentVariable)
end
defmodule LivebookProto.EnvironmentVariable do
use Protobuf
field :name, 1, type: :string
field :value, 2, type: :string
end
env_var = %LivebookProto.EnvironmentVariable{
name: "SOME_API_TOKEN",
value: "my api token value"
}
client_connected = %LivebookProto.ClientConnected{env_vars: [env_var]}
event = %LivebookProto.Event{type: {:client_connected, client_connected}}
binary =
event
|> LivebookProto.Event.encode()
|> dbg()
Kino.nothing()
binary
|> LivebookProto.Event.decode()
|> dbg()
Kino.nothing()
Server
defmodule UserSocket do
@behaviour Phoenix.Socket.Transport
@impl true
def child_spec(_opts) do
# We won't spawn any process, so let's ignore the child spec
:ignore
end
@impl true
def connect(%{connect_info: %{x_headers: x_headers}}) do
org_name = get_header(x_headers, "x-org-name")
Logger.debug("[SERVER] Received WebSocket connection with org name: #{org_name}")
{:ok, %{org_name: org_name}}
end
@impl true
def init(state) do
message = build_client_connected_message(state.org_name)
send(self(), {:message, message})
{:ok, state}
end
defp build_client_connected_message(_org_name) do
env_var = %LivebookProto.EnvironmentVariable{name: "API_TOKEN", value: "some api token"}
client_connected = %LivebookProto.ClientConnected{env_vars: [env_var]}
event = %LivebookProto.Event{type: {:client_connected, client_connected}}
LivebookProto.Event.encode(event)
end
@impl true
def handle_info({:message, message}, state) do
Logger.debug("[SERVER] Sending WebSocket message to client: #{inspect(message)}")
{:push, {:binary, message}, state}
end
@impl true
def handle_in({_message, _opts}, state) do
{:ok, state}
end
@impl true
def terminate(_reason, _state) do
:ok
end
defp get_header(headers, key) do
for {^key, value} <- headers, do: value
end
end
defmodule Teams.Endpoint do
use Phoenix.Endpoint, otp_app: :phoenix_playground
socket("/user", UserSocket, websocket: [connect_info: [:x_headers]])
end
{:ok, phx_playground_pid} =
PhoenixPlayground.start(endpoint: Teams.Endpoint, port: 4600, open_browser: false)
Client
defmodule Teams.Connection do
use WebSockex
@loop_ping_delay 5_000
@teams_ws_url "http://localhost:4600/user/websocket"
def start_link(connection_handler, headers, url \\ @teams_ws_url) do
WebSockex.start_link(url, __MODULE__, %{connection_handler: connection_handler},
extra_headers: headers
)
end
@impl true
def handle_connect(_conn, state) do
send(state.connection_handler, {:connection_state, :connected})
Process.send_after(self(), :loop_ping, @loop_ping_delay)
{:ok, state}
end
# Received a binary message
@impl true
def handle_frame({:binary, msg}, state) do
Logger.debug("[CLIENT] Received WebSocket binary message: #{inspect(msg)}")
event = LivebookProto.Event.decode(msg)
Logger.debug("[CLIENT] Decoded Protobuf from WebSocket message: #{inspect(event)}")
%{type: {type, message}} = event
send(state.connection_handler, {:event, type, message})
{:ok, state}
end
# Received a message
@impl true
def handle_frame({type, msg}, state) do
IO.puts("Received Message - Type: #{inspect(type)} -- Message: #{inspect(msg)}")
{:ok, state}
end
@impl true
def handle_info(:loop_ping, state) do
Process.send_after(self(), :loop_ping, @loop_ping_delay)
{:reply, :ping, state}
end
end
defmodule Teams.Client do
use GenServer
defstruct [
:org_name,
:env_vars,
:connection_pid,
connection_state: :disconnected
]
def start_link(org_name) do
GenServer.start_link(__MODULE__, org_name)
end
@impl true
def init(org_name) do
headers = [{"x-org-name", org_name}]
{:ok, connection_pid} = Teams.Connection.start_link(self(), headers)
{:ok, %__MODULE__{org_name: org_name, connection_pid: connection_pid}}
end
@impl true
def handle_info({:connection_state, :connected}, state) do
{:noreply, %{state | connection_state: :connected}}
end
@impl true
def handle_info({:event, :client_connected, %LivebookProto.ClientConnected{} = message}, state) do
%LivebookProto.ClientConnected{env_vars: env_vars} = message
env_vars =
for env_var <- env_vars do
%{name: env_var.name, value: env_var.value}
end
{:noreply, %{state | env_vars: env_vars}}
end
end
teams_client = Kino.start_child!({Teams.Client, "dashbit"})
Kino.nothing()
:sys.get_state(teams_client)