Powered by AppSignal & Oban Pro

USP Transports: WebSocket and MQTT

livebook/17_usp_transports.livemd

USP Transports: WebSocket and MQTT

Overview

USP supports multiple transport protocols. Caretaker implements:

  • WebSocket - Direct connections using binary frames
  • MQTT - Message broker-based communication

This livebook covers both transport options and their topic/path structures.

Setup

Mix.install([
  {:caretaker, path: "."}
])

alias Caretaker.USP.Transport.WebSocket.Paths, as: WSPaths
alias Caretaker.USP.Transport.MQTT.Topics, as: MQTTTopics
alias Caretaker.USP.{Proto, Record}
require Logger

WebSocket Transport

Path Structure

WebSocket connections use URL paths:

controller_id = "self::acs.example.com"
agent_id = "os::ACME-Router-123"

# Controller path - where agents connect
controller_path = WSPaths.controller_path(controller_id)
Logger.info("Controller path: #{controller_path}")

# Agent path - where controllers connect (if needed)
agent_path = WSPaths.agent_path(agent_id)
Logger.info("Agent path: #{agent_path}")

%{controller: controller_path, agent: agent_path}

Building WebSocket URLs

# Build full WebSocket URLs
ws_url = WSPaths.controller_url("localhost", 8080, controller_id)
wss_url = WSPaths.controller_url("acs.example.com", 443, controller_id, secure: true)

Logger.info("WS URL: #{ws_url}")
Logger.info("WSS URL: #{wss_url}")

%{insecure: ws_url, secure: wss_url}

WebSocket Subprotocol

USP WebSocket connections use a specific subprotocol:

subprotocol = WSPaths.subprotocol()
Logger.info("Subprotocol: #{subprotocol}")

# Check if a connection offers USP subprotocol
client_protocols = ["mqtt", "v1.usp", "custom"]
has_usp = WSPaths.has_usp_subprotocol?(client_protocols)
Logger.info("Client supports USP: #{has_usp}")

Parsing Paths

Extract endpoint information from paths:

# Parse controller path
{:ok, {:controller, parsed_ctrl}} = WSPaths.parse_endpoint_from_path("/usp/controller/self::acs")
Logger.info("Parsed controller: #{parsed_ctrl}")

# Parse agent path
{:ok, {:agent, parsed_agent}} = WSPaths.parse_endpoint_from_path("/usp/agent/os::device-123")
Logger.info("Parsed agent: #{parsed_agent}")

# Invalid path
{:error, :invalid_path} = WSPaths.parse_endpoint_from_path("/invalid/path")
Logger.info("Invalid path rejected")

MQTT Transport

Topic Structure

MQTT uses topic hierarchies:

controller_id = "self::acs.example.com"
agent_id = "os::ACME-Router-123"

# Agent topics
agent_request_topic = MQTTTopics.agent_request(agent_id)
agent_notify_topic = MQTTTopics.agent_notify(agent_id)

# Controller topic
controller_topic = MQTTTopics.controller_request(controller_id)

# Wildcard for notifications
notify_subscription = MQTTTopics.controller_notify_subscription()

Logger.info("Agent request: #{agent_request_topic}")
Logger.info("Agent notify: #{agent_notify_topic}")
Logger.info("Controller: #{controller_topic}")
Logger.info("Notify wildcard: #{notify_subscription}")

%{
  agent_request: agent_request_topic,
  agent_notify: agent_notify_topic,
  controller: controller_topic,
  notify_sub: notify_subscription
}

Message Flow over MQTT

# Controller → Agent flow:
# 1. Controller publishes to: usp/agent//request
# 2. Agent subscribes to: usp/agent//request
# 3. Agent responds to: usp/controller//request

# Agent → Controller flow:
# 1. Agent publishes to: usp/controller//request
# 2. Controller subscribes to: usp/controller//request

# Notifications:
# 1. Agent publishes to: usp/agent//notify
# 2. Controller subscribes to: usp/agent/+/notify (wildcard)

Logger.info("MQTT message flow configured via topics")

Parsing Topics

# Parse agent topic
{:ok, {:agent, parsed_agent}} =
  MQTTTopics.parse_endpoint_from_topic("usp/agent/os::device-123/request")
Logger.info("Parsed agent from topic: #{parsed_agent}")

# Parse controller topic
{:ok, {:controller, parsed_ctrl}} =
  MQTTTopics.parse_endpoint_from_topic("usp/controller/self::acs/request")
Logger.info("Parsed controller from topic: #{parsed_ctrl}")

Response Topics

Determine where to send responses:

# If agent receives on controller topic, respond to agent topic
received_topic = "usp/controller/self::acs/request"
responder_id = "os::device-123"

response_topic = MQTTTopics.response_topic_for(received_topic, responder_id)
Logger.info("Response topic: #{response_topic}")

Comparing Transports

agent_id = "os::ACME-Router-123"
controller_id = "self::acs.example.com"

comparison = %{
  websocket: %{
    controller_path: WSPaths.controller_path(controller_id),
    agent_path: WSPaths.agent_path(agent_id),
    connection: "Direct TCP/TLS",
    framing: "Binary WebSocket frames"
  },
  mqtt: %{
    controller_topic: MQTTTopics.controller_request(controller_id),
    agent_topic: MQTTTopics.agent_request(agent_id),
    connection: "Via MQTT broker",
    framing: "MQTT publish/subscribe"
  }
}

comparison

Message Encoding for Transport

Both transports use the same encoding:

# Build a message
get_msg = Proto.build_get(["Device.DeviceInfo."])

# Wrap in a Record
record = Record.new(get_msg,
  to_id: "os::agent-001",
  from_id: "self::controller"
)

# Encode to binary (same for both transports)
{:ok, encoded} = Record.encode(record)

Logger.info("Encoded message: #{byte_size(encoded)} bytes")
Logger.info("First 20 bytes (hex): #{Base.encode16(binary_part(encoded, 0, min(20, byte_size(encoded))), case: :lower)}")

# This binary is sent as:
# - WebSocket: Binary frame
# - MQTT: Message payload

%{size: byte_size(encoded), format: "Protocol Buffers"}

Transport Selection

Choose based on your architecture:

guidelines = %{
  websocket: [
    "Direct device-to-controller connections",
    "Low latency requirements",
    "Simpler network topology",
    "TLS for security"
  ],
  mqtt: [
    "Devices behind NAT/firewall",
    "Existing MQTT infrastructure",
    "Fan-out notifications",
    "Broker handles reliability"
  ]
}

guidelines

Using the Transport Modules

WebSocket Server (Controller side)

# Start a WebSocket server for Controller
# This requires a running Controller process

# {:ok, controller} = Caretaker.USP.Controller.start_link(
#   endpoint_id: "self::acs.example.com"
# )
#
# {:ok, ws_server} = Caretaker.USP.Transport.WebSocket.Server.start_link(
#   controller: controller,
#   port: 8080
# )
#
# # Agents connect to: ws://localhost:8080/usp/controller/self::acs.example.com

Logger.info("WebSocket server would listen on configured port")

WebSocket Client (Agent side)

# Start a WebSocket client for Agent
# This requires a running Agent process

# {:ok, agent} = Caretaker.USP.Agent.start_link(
#   endpoint_id: "os::my-device"
# )
#
# {:ok, ws_client} = Caretaker.USP.Transport.WebSocket.Client.start_link(
#   agent: agent,
#   controller_host: "acs.example.com",
#   controller_port: 8080,
#   controller_id: "self::acs.example.com"
# )

Logger.info("WebSocket client would connect to controller")

MQTT Transport (both sides)

# MQTT requires a broker (e.g., Mosquitto, EMQX, or Nipper for testing)

# Controller side:
# {:ok, mqtt_ctrl} = Caretaker.USP.Transport.MQTT.Controller.start_link(
#   controller: controller,
#   broker_host: "localhost",
#   broker_port: 1883
# )

# Agent side:
# {:ok, mqtt_agent} = Caretaker.USP.Transport.MQTT.Agent.start_link(
#   agent: agent,
#   broker_host: "localhost",
#   broker_port: 1883,
#   controller_id: "self::acs.example.com"
# )

Logger.info("MQTT transport connects via broker")

Summary

USP Transport Options:

Feature WebSocket MQTT
Connection Direct Via broker
Addressing URL paths Topic hierarchy
Subprotocol v1.usp N/A
NAT traversal Requires port forwarding Broker handles
QoS Application level MQTT QoS levels

Both transports:

  • Use Protocol Buffers encoding
  • Support the same message types
  • Wrap messages in USP Records
  • Use endpoint IDs for addressing