Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Protobuf over WebSocket

protobuf_websocket.livemd

Protobuf over WebSocket

Mix.install([
  {:phoenix_playground, "~> 0.1.7"},
  {:kino, "~> 0.16.1"},
  {:protobuf, "~> 0.14.1"},
  {:websockex, "~> 0.5.0", hex: :websockex_wt}
])

require Logger

Protobuf messages

defmodule LivebookProto.Event do
  use Protobuf

  oneof :type, 0

  field :client_connected, 1,
    type: LivebookProto.ClientConnected,
    oneof: 0
end
defmodule LivebookProto.ClientConnected do
  use Protobuf

  field(:env_vars, 1, repeated: true, type: LivebookProto.EnvironmentVariable)
end
defmodule LivebookProto.EnvironmentVariable do
  use Protobuf

  field :name, 1, type: :string
  field :value, 2, type: :string
end
env_var = %LivebookProto.EnvironmentVariable{
  name: "SOME_API_TOKEN",
  value: "my api token value"
}

client_connected = %LivebookProto.ClientConnected{env_vars: [env_var]}

event = %LivebookProto.Event{type: {:client_connected, client_connected}}

binary =
  event
  |> LivebookProto.Event.encode()
  |> dbg()

Kino.nothing()
binary
|> LivebookProto.Event.decode()
|> dbg()

Kino.nothing()

Server

defmodule UserSocket do
  @behaviour Phoenix.Socket.Transport

  @impl true
  def child_spec(_opts) do
    # We won't spawn any process, so let's ignore the child spec
    :ignore
  end

  @impl true
  def connect(%{connect_info: %{x_headers: x_headers}}) do
    org_name = get_header(x_headers, "x-org-name")
    Logger.debug("[SERVER] Received WebSocket connection with org name: #{org_name}")

    {:ok, %{org_name: org_name}}
  end

  @impl true
  def init(state) do
    message = build_client_connected_message(state.org_name)

    send(self(), {:message, message})

    {:ok, state}
  end

  defp build_client_connected_message(_org_name) do
    env_var = %LivebookProto.EnvironmentVariable{name: "API_TOKEN", value: "some api token"}
    client_connected = %LivebookProto.ClientConnected{env_vars: [env_var]}
    event = %LivebookProto.Event{type: {:client_connected, client_connected}}

    LivebookProto.Event.encode(event)
  end

  @impl true
  def handle_info({:message, message}, state) do
    Logger.debug("[SERVER] Sending WebSocket message to client: #{inspect(message)}")

    {:push, {:binary, message}, state}
  end

  @impl true
  def handle_in({_message, _opts}, state) do
    {:ok, state}
  end

  @impl true
  def terminate(_reason, _state) do
    :ok
  end

  defp get_header(headers, key) do
    for {^key, value} <- headers, do: value
  end
end
defmodule Teams.Endpoint do
  use Phoenix.Endpoint, otp_app: :phoenix_playground

  socket("/user", UserSocket, websocket: [connect_info: [:x_headers]])
end
{:ok, phx_playground_pid} =
  PhoenixPlayground.start(endpoint: Teams.Endpoint, port: 4600, open_browser: false)

Client

defmodule Teams.Connection do
  use WebSockex

  @loop_ping_delay 5_000
  @teams_ws_url "http://localhost:4600/user/websocket"

  def start_link(connection_handler, headers, url \\ @teams_ws_url) do
    WebSockex.start_link(url, __MODULE__, %{connection_handler: connection_handler},
      extra_headers: headers
    )
  end

  @impl true
  def handle_connect(_conn, state) do
    send(state.connection_handler, {:connection_state, :connected})

    Process.send_after(self(), :loop_ping, @loop_ping_delay)

    {:ok, state}
  end

  # Received a binary message
  @impl true
  def handle_frame({:binary, msg}, state) do
    Logger.debug("[CLIENT] Received WebSocket binary message: #{inspect(msg)}")

    event = LivebookProto.Event.decode(msg)
    Logger.debug("[CLIENT] Decoded Protobuf from WebSocket message: #{inspect(event)}")

    %{type: {type, message}} = event
    send(state.connection_handler, {:event, type, message})

    {:ok, state}
  end

  # Received a message
  @impl true
  def handle_frame({type, msg}, state) do
    IO.puts("Received Message - Type: #{inspect(type)} -- Message: #{inspect(msg)}")
    {:ok, state}
  end

  @impl true
  def handle_info(:loop_ping, state) do
    Process.send_after(self(), :loop_ping, @loop_ping_delay)
    {:reply, :ping, state}
  end
end
defmodule Teams.Client do
  use GenServer

  defstruct [
    :org_name,
    :env_vars,
    :connection_pid,
    connection_state: :disconnected
  ]

  def start_link(org_name) do
    GenServer.start_link(__MODULE__, org_name)
  end

  @impl true
  def init(org_name) do
    headers = [{"x-org-name", org_name}]
    
    {:ok, connection_pid} = Teams.Connection.start_link(self(), headers)
    
    {:ok, %__MODULE__{org_name: org_name, connection_pid: connection_pid}}
  end

  @impl true
  def handle_info({:connection_state, :connected}, state) do
    {:noreply, %{state | connection_state: :connected}}
  end

  @impl true
  def handle_info({:event, :client_connected, %LivebookProto.ClientConnected{} = message}, state) do
    %LivebookProto.ClientConnected{env_vars: env_vars} = message

    env_vars =
      for env_var <- env_vars do
        %{name: env_var.name, value: env_var.value}
      end

    {:noreply, %{state | env_vars: env_vars}}
  end
end
teams_client = Kino.start_child!({Teams.Client, "dashbit"})
Kino.nothing()
:sys.get_state(teams_client)