Powered by AppSignal & Oban Pro

USP Controller: Server-Side Implementation

livebook/16_usp_controller.livemd

USP Controller: Server-Side Implementation

Overview

The USP Controller manages USP Agents (devices). It sends commands, receives notifications, and tracks agent sessions. This is the server-side equivalent of a TR-069 ACS.

Setup

Mix.install([
  {:caretaker, path: "."}
])

alias Caretaker.USP.{Agent, Controller, Proto, Record}
require Logger

Starting a Controller

# Start a Controller with an endpoint ID
{:ok, controller} = Controller.start_link(
  endpoint_id: "self::acs.example.com"
)

controller_id = Controller.endpoint_id(controller)
Logger.info("Controller started: #{controller_id}")

Starting Test Agents

For demonstration, let’s start some agents:

# Start multiple agents
{:ok, agent1} = Agent.start_link(
  endpoint_id: "os::Router-001",
  initial_data: %{
    "Device" => %{
      "DeviceInfo" => %{
        "Manufacturer" => "ACME",
        "ModelName" => "Router-X100",
        "SerialNumber" => "SN001"
      }
    }
  }
)

{:ok, agent2} = Agent.start_link(
  endpoint_id: "os::Router-002",
  initial_data: %{
    "Device" => %{
      "DeviceInfo" => %{
        "Manufacturer" => "ACME",
        "ModelName" => "Router-X200",
        "SerialNumber" => "SN002"
      }
    }
  }
)

Logger.info("Agents started: #{Agent.endpoint_id(agent1)}, #{Agent.endpoint_id(agent2)}")

Agent Registration

Agents register with the Controller:

agent1_id = Agent.endpoint_id(agent1)
agent2_id = Agent.endpoint_id(agent2)

# Build and send Register messages
register1 = Agent.build_register_message(agent1)
register2 = Agent.build_register_message(agent2)

# Controller handles registrations
Controller.handle_agent_message(controller, agent1_id, register1)
Controller.handle_agent_message(controller, agent2_id, register2)

# Check registered agents
registered = Controller.list_agents(controller)
Logger.info("Registered agents: #{inspect(registered)}")

Sending Commands to Agents

Queue commands for agents:

# Build a Get command
get_cmd = Proto.build_get(["Device.DeviceInfo.Manufacturer"])

# Queue command for agent1
:ok = Controller.queue_command(controller, agent1_id, get_cmd)

# Check pending commands
pending = Controller.pending_commands(controller, agent1_id)
Logger.info("Pending commands for #{agent1_id}: #{length(pending)}")

Complete Controller ↔ Agent Flow

Demonstrate full message exchange:

# 1. Controller sends Get request
get_msg = Proto.build_get([
  "Device.DeviceInfo.Manufacturer",
  "Device.DeviceInfo.ModelName"
])

# Create record from Controller to Agent
request_record = Record.new(get_msg,
  to_id: agent1_id,
  from_id: controller_id
)

# 2. Encode for transport
{:ok, encoded} = Record.encode(request_record)
Logger.info("Sending #{byte_size(encoded)} bytes to agent")

# 3. Agent receives and decodes
{:ok, received_record} = Record.decode(encoded)
{:ok, received_msg} = Record.extract_message(received_record)

# 4. Agent processes message
{:ok, response_msg} = Agent.handle_message(agent1, received_msg)

# 5. Agent sends response
response_record = Record.response_for(received_record, response_msg)
{:ok, response_encoded} = Record.encode(response_record)
Logger.info("Agent responds with #{byte_size(response_encoded)} bytes")

# 6. Controller receives response
{:ok, resp_record} = Record.decode(response_encoded)
{:ok, resp_msg} = Record.extract_message(resp_record)

# 7. Controller handles response
Controller.handle_agent_message(controller, agent1_id, resp_msg)

%{
  request_id: Proto.message_id(get_msg),
  response_id: Proto.message_id(resp_msg),
  flow: "complete"
}

Handling Notify Messages

Agents send notifications for events and value changes:

# Simulate agent sending a value change notification
notify_msg = Proto.build_notify_value_change(
  "sub-123",
  "Device.DeviceInfo.SoftwareVersion",
  "2.0.0"
)

# Controller receives notification
Controller.handle_agent_message(controller, agent1_id, notify_msg)
Logger.info("Notification received from #{agent1_id}")

Agent Session Management

Controller tracks agent sessions:

# Get session info for an agent
session = Controller.get_session(controller, agent1_id)

case session do
  nil -> Logger.info("No session for #{agent1_id}")
  info -> Logger.info("Session: #{inspect(info)}")
end

# List all agent sessions
all_sessions = Controller.list_agents(controller)
Logger.info("Active agents: #{inspect(all_sessions)}")

Deregistration

Agents can deregister:

# Build deregister message
deregister_msg = Proto.build_deregister()

# Agent sends deregister
Controller.handle_agent_message(controller, agent2_id, deregister_msg)

# Check remaining agents
remaining = Controller.list_agents(controller)
Logger.info("Remaining agents: #{inspect(remaining)}")

Batch Operations

Send commands to multiple agents:

# Start a third agent
{:ok, agent3} = Agent.start_link(endpoint_id: "os::Router-003")
agent3_id = Agent.endpoint_id(agent3)

# Register it
register3 = Agent.build_register_message(agent3)
Controller.handle_agent_message(controller, agent3_id, register3)

# Queue same command for multiple agents
reboot_cmd = Proto.build_operate("Device.Reboot()", [])

all_agents = Controller.list_agents(controller)

for agent_id <- all_agents do
  Controller.queue_command(controller, agent_id, reboot_cmd)
  Logger.info("Queued reboot for #{agent_id}")
end

length(all_agents)

Controller Telemetry

The Controller emits telemetry events:

# Telemetry events emitted:
# - [:caretaker, :usp, :controller, :agent_registered]
# - [:caretaker, :usp, :controller, :agent_deregistered]
# - [:caretaker, :usp, :controller, :message_received]
# - [:caretaker, :usp, :controller, :command_queued]

Logger.info("Telemetry events are emitted for monitoring")

Cleanup

GenServer.stop(agent1)
GenServer.stop(agent2)
GenServer.stop(agent3)
GenServer.stop(controller)
Logger.info("All processes stopped")

Controller Design Notes

The USP Controller:

  1. Session Management - Tracks connected agents with timestamps
  2. Command Queue - Queues commands for agents
  3. Message Routing - Routes incoming messages to handlers
  4. Notification Handling - Processes Notify messages from agents
  5. Telemetry - Emits events for monitoring

Summary

The USP Controller provides server-side device management:

  • Start with Controller.start_link/1
  • Register agents via handle_agent_message/3
  • Queue commands with queue_command/3
  • Track agents with list_agents/1 and get_session/2

Next: See 17_usp_transports.livemd for WebSocket and MQTT transport options.