USP Transports: WebSocket and MQTT
Overview
USP supports multiple transport protocols. Caretaker implements:
- WebSocket - Direct connections using binary frames
- MQTT - Message broker-based communication
This livebook covers both transport options and their topic/path structures.
Setup
Mix.install([
{:caretaker, path: "."}
])
alias Caretaker.USP.Transport.WebSocket.Paths, as: WSPaths
alias Caretaker.USP.Transport.MQTT.Topics, as: MQTTTopics
alias Caretaker.USP.{Proto, Record}
require Logger
WebSocket Transport
Path Structure
WebSocket connections use URL paths:
controller_id = "self::acs.example.com"
agent_id = "os::ACME-Router-123"
# Controller path - where agents connect
controller_path = WSPaths.controller_path(controller_id)
Logger.info("Controller path: #{controller_path}")
# Agent path - where controllers connect (if needed)
agent_path = WSPaths.agent_path(agent_id)
Logger.info("Agent path: #{agent_path}")
%{controller: controller_path, agent: agent_path}
Building WebSocket URLs
# Build full WebSocket URLs
ws_url = WSPaths.controller_url("localhost", 8080, controller_id)
wss_url = WSPaths.controller_url("acs.example.com", 443, controller_id, secure: true)
Logger.info("WS URL: #{ws_url}")
Logger.info("WSS URL: #{wss_url}")
%{insecure: ws_url, secure: wss_url}
WebSocket Subprotocol
USP WebSocket connections use a specific subprotocol:
subprotocol = WSPaths.subprotocol()
Logger.info("Subprotocol: #{subprotocol}")
# Check if a connection offers USP subprotocol
client_protocols = ["mqtt", "v1.usp", "custom"]
has_usp = WSPaths.has_usp_subprotocol?(client_protocols)
Logger.info("Client supports USP: #{has_usp}")
Parsing Paths
Extract endpoint information from paths:
# Parse controller path
{:ok, {:controller, parsed_ctrl}} = WSPaths.parse_endpoint_from_path("/usp/controller/self::acs")
Logger.info("Parsed controller: #{parsed_ctrl}")
# Parse agent path
{:ok, {:agent, parsed_agent}} = WSPaths.parse_endpoint_from_path("/usp/agent/os::device-123")
Logger.info("Parsed agent: #{parsed_agent}")
# Invalid path
{:error, :invalid_path} = WSPaths.parse_endpoint_from_path("/invalid/path")
Logger.info("Invalid path rejected")
MQTT Transport
Topic Structure
MQTT uses topic hierarchies:
controller_id = "self::acs.example.com"
agent_id = "os::ACME-Router-123"
# Agent topics
agent_request_topic = MQTTTopics.agent_request(agent_id)
agent_notify_topic = MQTTTopics.agent_notify(agent_id)
# Controller topic
controller_topic = MQTTTopics.controller_request(controller_id)
# Wildcard for notifications
notify_subscription = MQTTTopics.controller_notify_subscription()
Logger.info("Agent request: #{agent_request_topic}")
Logger.info("Agent notify: #{agent_notify_topic}")
Logger.info("Controller: #{controller_topic}")
Logger.info("Notify wildcard: #{notify_subscription}")
%{
agent_request: agent_request_topic,
agent_notify: agent_notify_topic,
controller: controller_topic,
notify_sub: notify_subscription
}
Message Flow over MQTT
# Controller → Agent flow:
# 1. Controller publishes to: usp/agent//request
# 2. Agent subscribes to: usp/agent//request
# 3. Agent responds to: usp/controller//request
# Agent → Controller flow:
# 1. Agent publishes to: usp/controller//request
# 2. Controller subscribes to: usp/controller//request
# Notifications:
# 1. Agent publishes to: usp/agent//notify
# 2. Controller subscribes to: usp/agent/+/notify (wildcard)
Logger.info("MQTT message flow configured via topics")
Parsing Topics
# Parse agent topic
{:ok, {:agent, parsed_agent}} =
MQTTTopics.parse_endpoint_from_topic("usp/agent/os::device-123/request")
Logger.info("Parsed agent from topic: #{parsed_agent}")
# Parse controller topic
{:ok, {:controller, parsed_ctrl}} =
MQTTTopics.parse_endpoint_from_topic("usp/controller/self::acs/request")
Logger.info("Parsed controller from topic: #{parsed_ctrl}")
Response Topics
Determine where to send responses:
# If agent receives on controller topic, respond to agent topic
received_topic = "usp/controller/self::acs/request"
responder_id = "os::device-123"
response_topic = MQTTTopics.response_topic_for(received_topic, responder_id)
Logger.info("Response topic: #{response_topic}")
Comparing Transports
agent_id = "os::ACME-Router-123"
controller_id = "self::acs.example.com"
comparison = %{
websocket: %{
controller_path: WSPaths.controller_path(controller_id),
agent_path: WSPaths.agent_path(agent_id),
connection: "Direct TCP/TLS",
framing: "Binary WebSocket frames"
},
mqtt: %{
controller_topic: MQTTTopics.controller_request(controller_id),
agent_topic: MQTTTopics.agent_request(agent_id),
connection: "Via MQTT broker",
framing: "MQTT publish/subscribe"
}
}
comparison
Message Encoding for Transport
Both transports use the same encoding:
# Build a message
get_msg = Proto.build_get(["Device.DeviceInfo."])
# Wrap in a Record
record = Record.new(get_msg,
to_id: "os::agent-001",
from_id: "self::controller"
)
# Encode to binary (same for both transports)
{:ok, encoded} = Record.encode(record)
Logger.info("Encoded message: #{byte_size(encoded)} bytes")
Logger.info("First 20 bytes (hex): #{Base.encode16(binary_part(encoded, 0, min(20, byte_size(encoded))), case: :lower)}")
# This binary is sent as:
# - WebSocket: Binary frame
# - MQTT: Message payload
%{size: byte_size(encoded), format: "Protocol Buffers"}
Transport Selection
Choose based on your architecture:
guidelines = %{
websocket: [
"Direct device-to-controller connections",
"Low latency requirements",
"Simpler network topology",
"TLS for security"
],
mqtt: [
"Devices behind NAT/firewall",
"Existing MQTT infrastructure",
"Fan-out notifications",
"Broker handles reliability"
]
}
guidelines
Using the Transport Modules
WebSocket Server (Controller side)
# Start a WebSocket server for Controller
# This requires a running Controller process
# {:ok, controller} = Caretaker.USP.Controller.start_link(
# endpoint_id: "self::acs.example.com"
# )
#
# {:ok, ws_server} = Caretaker.USP.Transport.WebSocket.Server.start_link(
# controller: controller,
# port: 8080
# )
#
# # Agents connect to: ws://localhost:8080/usp/controller/self::acs.example.com
Logger.info("WebSocket server would listen on configured port")
WebSocket Client (Agent side)
# Start a WebSocket client for Agent
# This requires a running Agent process
# {:ok, agent} = Caretaker.USP.Agent.start_link(
# endpoint_id: "os::my-device"
# )
#
# {:ok, ws_client} = Caretaker.USP.Transport.WebSocket.Client.start_link(
# agent: agent,
# controller_host: "acs.example.com",
# controller_port: 8080,
# controller_id: "self::acs.example.com"
# )
Logger.info("WebSocket client would connect to controller")
MQTT Transport (both sides)
# MQTT requires a broker (e.g., Mosquitto, EMQX, or Nipper for testing)
# Controller side:
# {:ok, mqtt_ctrl} = Caretaker.USP.Transport.MQTT.Controller.start_link(
# controller: controller,
# broker_host: "localhost",
# broker_port: 1883
# )
# Agent side:
# {:ok, mqtt_agent} = Caretaker.USP.Transport.MQTT.Agent.start_link(
# agent: agent,
# broker_host: "localhost",
# broker_port: 1883,
# controller_id: "self::acs.example.com"
# )
Logger.info("MQTT transport connects via broker")
Summary
USP Transport Options:
| Feature | WebSocket | MQTT |
|---|---|---|
| Connection | Direct | Via broker |
| Addressing | URL paths | Topic hierarchy |
| Subprotocol | v1.usp | N/A |
| NAT traversal | Requires port forwarding | Broker handles |
| QoS | Application level | MQTT QoS levels |
Both transports:
- Use Protocol Buffers encoding
- Support the same message types
- Wrap messages in USP Records
- Use endpoint IDs for addressing