Protobuf over WebSocket
Mix.install([
{:mint_web_socket, "~> 1.0"},
{:phoenix_playground, "~> 0.1.7"},
{:kino, "~> 0.15.3"},
{:protobuf, "~> 0.14.1"},
{:websockex, "~> 0.5.0", hex: :websockex_wt}
])
require Logger
Protobuf messages
defmodule LivebookProto.Event do
use Protobuf
oneof :type, 0
field :client_connected, 1,
type: LivebookProto.ClientConnected,
oneof: 0
end
defmodule LivebookProto.ClientConnected do
use Protobuf
field :env_vars, 1, repeated: true, type: LivebookProto.EnvironmentVariable
end
defmodule LivebookProto.EnvironmentVariable do
use Protobuf
field :name, 1, type: :string
field :value, 2, type: :string
end
env_var = %LivebookProto.EnvironmentVariable{name: "foo", value: "some foo"}
client_connected = %LivebookProto.ClientConnected{env_vars: [env_var]}
event = %LivebookProto.Event{type: {:client_connected, client_connected}}
binary =
event
|> LivebookProto.Event.encode()
|> dbg()
Kino.nothing()
binary
|> LivebookProto.Event.decode()
|> dbg()
Kino.nothing()
Server
defmodule Teams.ClientsTracker do
use Phoenix.Tracker
@default_options [
name: __MODULE__,
pubsub_server: PhoenixPlayground.PubSub
]
@topic "teams-clients"
def start_link(opts \\ []) do
opts = Keyword.merge(@default_options, opts)
Phoenix.Tracker.start_link(__MODULE__, opts, opts)
end
def track(org_name) do
key = random_string()
Phoenix.Tracker.track(__MODULE__, self(), @topic, key, %{org_name: org_name})
end
def list do
Phoenix.Tracker.list(__MODULE__, @topic)
end
@impl true
def init(opts) do
server = Keyword.fetch!(opts, :pubsub_server)
{:ok, %{pubsub_server: server, node_name: Phoenix.PubSub.node_name(server)}}
end
@impl true
def handle_diff(diff, state) do
for {_topic, {joins, leaves}} <- diff do
for {key, meta} <- joins do
Logger.debug("[SERVER] presence join: key '#{key}' with meta #{inspect(meta)}")
end
for {key, meta} <- leaves do
Logger.debug("[SERVER] presence leave: key '#{key}' with meta #{inspect(meta)}")
end
end
{:ok, state}
end
defp random_string() do
:crypto.strong_rand_bytes(8)
|> Base.url_encode64(padding: false)
end
end
defmodule UserSocket do
@behaviour Phoenix.Socket.Transport
@impl true
def child_spec(_opts) do
# We won't spawn any process, so let's ignore the child spec
:ignore
end
@impl true
def connect(%{connect_info: %{x_headers: x_headers}}) do
org_name = get_header(x_headers, "x-org-name")
Logger.debug("[SERVER] Received WebSocket connection with org name: #{org_name}")
Teams.ClientsTracker.track(org_name)
{:ok, %{org_name: org_name}}
end
@impl true
def init(state) do
message = build_client_connected_message(state.org_name)
send(self(), {:message, message})
{:ok, state}
end
defp build_client_connected_message(_org_name) do
env_var = %LivebookProto.EnvironmentVariable{name: "foo", value: "some value"}
client_connected = %LivebookProto.ClientConnected{env_vars: [env_var]}
event = %LivebookProto.Event{type: {:client_connected, client_connected}}
LivebookProto.Event.encode(event)
end
@impl true
def handle_info({:message, message}, state) do
Logger.debug("[SERVER] Sending WebSocket message to client: #{inspect(message)}")
{:push, {:binary, message}, state}
end
@impl true
def handle_in({_message, _opts}, state) do
{:ok, state}
end
@impl true
def terminate(_reason, _state) do
:ok
end
defp get_header(headers, key) do
for {^key, value} <- headers, do: value
end
end
defmodule Teams.Endpoint do
use Phoenix.Endpoint, otp_app: :phoenix_playground
socket("/user", UserSocket, websocket: [connect_info: [:x_headers]])
end
{:ok, phx_playground_pid} =
PhoenixPlayground.start(endpoint: Teams.Endpoint, port: 4600, open_browser: false)
Kino.start_child!(Teams.ClientsTracker)
Client
defmodule Teams.Connection do
use WebSockex
@loop_ping_delay 5_000
@teams_ws_url "http://localhost:4600/user/websocket"
def start_link(listener, headers, url \\ @teams_ws_url) do
WebSockex.start_link(url, __MODULE__, %{listener: listener}, extra_headers: headers)
end
def handle_connect(_conn, state) do
send(state.listener, {:connection_state, :connected})
Process.send_after(self(), :loop_ping, @loop_ping_delay)
{:ok, state}
end
def handle_frame({:binary, msg}, state) do
Logger.debug("[CLIENT] Received WebSocket binary message: #{inspect(msg)}")
event = LivebookProto.Event.decode(msg)
Logger.debug("[CLIENT] Decoded Protobuf from WebSocket message: #{inspect(event)}")
%{type: {type, message}} = event
send(state.listener, {:event, type, message})
{:ok, state}
end
def handle_frame({type, msg}, state) do
IO.puts("Received Message - Type: #{inspect(type)} -- Message: #{inspect(msg)}")
{:ok, state}
end
def handle_info(:loop_ping, state) do
Process.send_after(self(), :loop_ping, @loop_ping_delay)
{:reply, :ping, state}
end
end
defmodule Teams.Client do
use GenServer
defstruct [
:org_name,
:env_vars,
:connection_pid,
connection_state: :disconnected
]
def start_link(org_name) do
GenServer.start_link(__MODULE__, org_name)
end
@impl true
def init(org_name) do
headers = [{"x-org-name", org_name}]
{:ok, connection_pid} = Teams.Connection.start_link(self(), headers)
{:ok, %__MODULE__{org_name: org_name, connection_pid: connection_pid}}
end
@impl true
def handle_info({:connection_state, :connected}, state) do
{:noreply, %{state | connection_state: :connected}}
end
@impl true
def handle_info({:event, :client_connected, %LivebookProto.ClientConnected{} = message}, state) do
%LivebookProto.ClientConnected{env_vars: env_vars} = message
env_vars =
for env_var <- env_vars do
%{name: env_var.name, value: env_var.value}
end
{:noreply, %{state | env_vars: env_vars}}
end
end
teams_client = Kino.start_child!({Teams.Client, "dashbit"})
Kino.nothing()
:sys.get_state(teams_client)
Teams.ClientsTracker.list()
Kino.terminate_child(teams_client)
Teams.ClientsTracker.list()