USP Controller: Server-Side Implementation
Overview
The USP Controller manages USP Agents (devices). It sends commands, receives notifications, and tracks agent sessions. This is the server-side equivalent of a TR-069 ACS.
Setup
Mix.install([
{:caretaker, path: "."}
])
alias Caretaker.USP.{Agent, Controller, Proto, Record}
require Logger
Starting a Controller
# Start a Controller with an endpoint ID
{:ok, controller} = Controller.start_link(
endpoint_id: "self::acs.example.com"
)
controller_id = Controller.endpoint_id(controller)
Logger.info("Controller started: #{controller_id}")
Starting Test Agents
For demonstration, let’s start some agents:
# Start multiple agents
{:ok, agent1} = Agent.start_link(
endpoint_id: "os::Router-001",
initial_data: %{
"Device" => %{
"DeviceInfo" => %{
"Manufacturer" => "ACME",
"ModelName" => "Router-X100",
"SerialNumber" => "SN001"
}
}
}
)
{:ok, agent2} = Agent.start_link(
endpoint_id: "os::Router-002",
initial_data: %{
"Device" => %{
"DeviceInfo" => %{
"Manufacturer" => "ACME",
"ModelName" => "Router-X200",
"SerialNumber" => "SN002"
}
}
}
)
Logger.info("Agents started: #{Agent.endpoint_id(agent1)}, #{Agent.endpoint_id(agent2)}")
Agent Registration
Agents register with the Controller:
agent1_id = Agent.endpoint_id(agent1)
agent2_id = Agent.endpoint_id(agent2)
# Build and send Register messages
register1 = Agent.build_register_message(agent1)
register2 = Agent.build_register_message(agent2)
# Controller handles registrations
Controller.handle_agent_message(controller, agent1_id, register1)
Controller.handle_agent_message(controller, agent2_id, register2)
# Check registered agents
registered = Controller.list_agents(controller)
Logger.info("Registered agents: #{inspect(registered)}")
Sending Commands to Agents
Queue commands for agents:
# Build a Get command
get_cmd = Proto.build_get(["Device.DeviceInfo.Manufacturer"])
# Queue command for agent1
:ok = Controller.queue_command(controller, agent1_id, get_cmd)
# Check pending commands
pending = Controller.pending_commands(controller, agent1_id)
Logger.info("Pending commands for #{agent1_id}: #{length(pending)}")
Complete Controller ↔ Agent Flow
Demonstrate full message exchange:
# 1. Controller sends Get request
get_msg = Proto.build_get([
"Device.DeviceInfo.Manufacturer",
"Device.DeviceInfo.ModelName"
])
# Create record from Controller to Agent
request_record = Record.new(get_msg,
to_id: agent1_id,
from_id: controller_id
)
# 2. Encode for transport
{:ok, encoded} = Record.encode(request_record)
Logger.info("Sending #{byte_size(encoded)} bytes to agent")
# 3. Agent receives and decodes
{:ok, received_record} = Record.decode(encoded)
{:ok, received_msg} = Record.extract_message(received_record)
# 4. Agent processes message
{:ok, response_msg} = Agent.handle_message(agent1, received_msg)
# 5. Agent sends response
response_record = Record.response_for(received_record, response_msg)
{:ok, response_encoded} = Record.encode(response_record)
Logger.info("Agent responds with #{byte_size(response_encoded)} bytes")
# 6. Controller receives response
{:ok, resp_record} = Record.decode(response_encoded)
{:ok, resp_msg} = Record.extract_message(resp_record)
# 7. Controller handles response
Controller.handle_agent_message(controller, agent1_id, resp_msg)
%{
request_id: Proto.message_id(get_msg),
response_id: Proto.message_id(resp_msg),
flow: "complete"
}
Handling Notify Messages
Agents send notifications for events and value changes:
# Simulate agent sending a value change notification
notify_msg = Proto.build_notify_value_change(
"sub-123",
"Device.DeviceInfo.SoftwareVersion",
"2.0.0"
)
# Controller receives notification
Controller.handle_agent_message(controller, agent1_id, notify_msg)
Logger.info("Notification received from #{agent1_id}")
Agent Session Management
Controller tracks agent sessions:
# Get session info for an agent
session = Controller.get_session(controller, agent1_id)
case session do
nil -> Logger.info("No session for #{agent1_id}")
info -> Logger.info("Session: #{inspect(info)}")
end
# List all agent sessions
all_sessions = Controller.list_agents(controller)
Logger.info("Active agents: #{inspect(all_sessions)}")
Deregistration
Agents can deregister:
# Build deregister message
deregister_msg = Proto.build_deregister()
# Agent sends deregister
Controller.handle_agent_message(controller, agent2_id, deregister_msg)
# Check remaining agents
remaining = Controller.list_agents(controller)
Logger.info("Remaining agents: #{inspect(remaining)}")
Batch Operations
Send commands to multiple agents:
# Start a third agent
{:ok, agent3} = Agent.start_link(endpoint_id: "os::Router-003")
agent3_id = Agent.endpoint_id(agent3)
# Register it
register3 = Agent.build_register_message(agent3)
Controller.handle_agent_message(controller, agent3_id, register3)
# Queue same command for multiple agents
reboot_cmd = Proto.build_operate("Device.Reboot()", [])
all_agents = Controller.list_agents(controller)
for agent_id <- all_agents do
Controller.queue_command(controller, agent_id, reboot_cmd)
Logger.info("Queued reboot for #{agent_id}")
end
length(all_agents)
Controller Telemetry
The Controller emits telemetry events:
# Telemetry events emitted:
# - [:caretaker, :usp, :controller, :agent_registered]
# - [:caretaker, :usp, :controller, :agent_deregistered]
# - [:caretaker, :usp, :controller, :message_received]
# - [:caretaker, :usp, :controller, :command_queued]
Logger.info("Telemetry events are emitted for monitoring")
Cleanup
GenServer.stop(agent1)
GenServer.stop(agent2)
GenServer.stop(agent3)
GenServer.stop(controller)
Logger.info("All processes stopped")
Controller Design Notes
The USP Controller:
- Session Management - Tracks connected agents with timestamps
- Command Queue - Queues commands for agents
- Message Routing - Routes incoming messages to handlers
- Notification Handling - Processes Notify messages from agents
- Telemetry - Emits events for monitoring
Summary
The USP Controller provides server-side device management:
-
Start with
Controller.start_link/1 -
Register agents via
handle_agent_message/3 -
Queue commands with
queue_command/3 -
Track agents with
list_agents/1andget_session/2
Next: See 17_usp_transports.livemd for WebSocket and MQTT transport options.