Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Protobuf over WebSocket

protobuf_websocket.livemd

Protobuf over WebSocket

Mix.install([
  {:mint_web_socket, "~> 1.0"},
  {:phoenix_playground, "~> 0.1.7"},
  {:kino, "~> 0.15.3"},
  {:protobuf, "~> 0.14.1"},
  {:websockex, "~> 0.5.0", hex: :websockex_wt}
])

require Logger

Protobuf messages

defmodule LivebookProto.Event do
  use Protobuf

  oneof :type, 0

  field :client_connected, 1,
    type: LivebookProto.ClientConnected,
    oneof: 0
end

defmodule LivebookProto.ClientConnected do
  use Protobuf

  field :env_vars, 1, repeated: true, type: LivebookProto.EnvironmentVariable
end

defmodule LivebookProto.EnvironmentVariable do
  use Protobuf

  field :name, 1, type: :string
  field :value, 2, type: :string
end
env_var = %LivebookProto.EnvironmentVariable{name: "foo", value: "some foo"}
client_connected = %LivebookProto.ClientConnected{env_vars: [env_var]}
event = %LivebookProto.Event{type: {:client_connected, client_connected}}

binary =
  event
  |> LivebookProto.Event.encode()
  |> dbg()

Kino.nothing()
binary
|> LivebookProto.Event.decode()
|> dbg()

Kino.nothing()

Server

defmodule Teams.ClientsTracker do
  use Phoenix.Tracker

  @default_options [
    name: __MODULE__,
    pubsub_server: PhoenixPlayground.PubSub
  ]

  @topic "teams-clients"

  def start_link(opts \\ []) do
    opts = Keyword.merge(@default_options, opts)
    Phoenix.Tracker.start_link(__MODULE__, opts, opts)
  end

  def track(org_name) do
    key = random_string()
    Phoenix.Tracker.track(__MODULE__, self(), @topic, key, %{org_name: org_name})
  end

  def list do
    Phoenix.Tracker.list(__MODULE__, @topic)
  end

  @impl true
  def init(opts) do
    server = Keyword.fetch!(opts, :pubsub_server)
    {:ok, %{pubsub_server: server, node_name: Phoenix.PubSub.node_name(server)}}
  end

  @impl true
  def handle_diff(diff, state) do
    for {_topic, {joins, leaves}} <- diff do
      for {key, meta} <- joins do
        Logger.debug("[SERVER] presence join: key '#{key}' with meta #{inspect(meta)}")
      end

      for {key, meta} <- leaves do
        Logger.debug("[SERVER] presence leave: key '#{key}' with meta #{inspect(meta)}")
      end
    end

    {:ok, state}
  end

  defp random_string() do
    :crypto.strong_rand_bytes(8)
    |> Base.url_encode64(padding: false)
  end
end
defmodule UserSocket do
  @behaviour Phoenix.Socket.Transport

  @impl true
  def child_spec(_opts) do
    # We won't spawn any process, so let's ignore the child spec
    :ignore
  end

  @impl true
  def connect(%{connect_info: %{x_headers: x_headers}}) do
    org_name = get_header(x_headers, "x-org-name")
    Logger.debug("[SERVER] Received WebSocket connection with org name: #{org_name}")
    Teams.ClientsTracker.track(org_name)

    {:ok, %{org_name: org_name}}
  end

  @impl true
  def init(state) do
    message = build_client_connected_message(state.org_name)
    send(self(), {:message, message})

    {:ok, state}
  end

  defp build_client_connected_message(_org_name) do
    env_var = %LivebookProto.EnvironmentVariable{name: "foo", value: "some value"}
    client_connected = %LivebookProto.ClientConnected{env_vars: [env_var]}
    event = %LivebookProto.Event{type: {:client_connected, client_connected}}

    LivebookProto.Event.encode(event)
  end

  @impl true
  def handle_info({:message, message}, state) do
    Logger.debug("[SERVER] Sending WebSocket message to client: #{inspect(message)}")
    {:push, {:binary, message}, state}
  end

  @impl true
  def handle_in({_message, _opts}, state) do
    {:ok, state}
  end

  @impl true
  def terminate(_reason, _state) do
    :ok
  end

  defp get_header(headers, key) do
    for {^key, value} <- headers, do: value
  end
end
defmodule Teams.Endpoint do
  use Phoenix.Endpoint, otp_app: :phoenix_playground

  socket("/user", UserSocket, websocket: [connect_info: [:x_headers]])
end
{:ok, phx_playground_pid} =
  PhoenixPlayground.start(endpoint: Teams.Endpoint, port: 4600, open_browser: false)

Kino.start_child!(Teams.ClientsTracker)

Client

defmodule Teams.Connection do
  use WebSockex

  @loop_ping_delay 5_000
  @teams_ws_url "http://localhost:4600/user/websocket"

  def start_link(listener, headers, url \\ @teams_ws_url) do
    WebSockex.start_link(url, __MODULE__, %{listener: listener}, extra_headers: headers)
  end

  def handle_connect(_conn, state) do
    send(state.listener, {:connection_state, :connected})
    Process.send_after(self(), :loop_ping, @loop_ping_delay)
    {:ok, state}
  end

  def handle_frame({:binary, msg}, state) do
    Logger.debug("[CLIENT] Received WebSocket binary message: #{inspect(msg)}")

    event = LivebookProto.Event.decode(msg)
    Logger.debug("[CLIENT] Decoded Protobuf from WebSocket message: #{inspect(event)}")

    %{type: {type, message}} = event
    send(state.listener, {:event, type, message})

    {:ok, state}
  end

  def handle_frame({type, msg}, state) do
    IO.puts("Received Message - Type: #{inspect(type)} -- Message: #{inspect(msg)}")
    {:ok, state}
  end

  def handle_info(:loop_ping, state) do
    Process.send_after(self(), :loop_ping, @loop_ping_delay)
    {:reply, :ping, state}
  end
end
defmodule Teams.Client do
  use GenServer

  defstruct [
    :org_name,
    :env_vars,
    :connection_pid,
    connection_state: :disconnected
  ]

  def start_link(org_name) do
    GenServer.start_link(__MODULE__, org_name)
  end

  @impl true
  def init(org_name) do
    headers = [{"x-org-name", org_name}]
    {:ok, connection_pid} = Teams.Connection.start_link(self(), headers)
    {:ok, %__MODULE__{org_name: org_name, connection_pid: connection_pid}}
  end

  @impl true
  def handle_info({:connection_state, :connected}, state) do
    {:noreply, %{state | connection_state: :connected}}
  end

  @impl true
  def handle_info({:event, :client_connected, %LivebookProto.ClientConnected{} = message}, state) do
    %LivebookProto.ClientConnected{env_vars: env_vars} = message

    env_vars =
      for env_var <- env_vars do
        %{name: env_var.name, value: env_var.value}
      end

    {:noreply, %{state | env_vars: env_vars}}
  end
end
teams_client = Kino.start_child!({Teams.Client, "dashbit"})
Kino.nothing()
:sys.get_state(teams_client)
Teams.ClientsTracker.list()
Kino.terminate_child(teams_client)
Teams.ClientsTracker.list()