Powered by AppSignal & Oban Pro

Firmware Upgrade Simulation

livebook/09_firmware_upgrade.livemd

Firmware Upgrade Simulation

> This notebook demonstrates the FirmwareSimulator module for simulating TR-069 firmware upgrade workflows.

Setup

Mix.install([
  {:caretaker, path: "."}
])

alias Caretaker.CPE.{FirmwareSimulator, DeviceState, Client}
alias Caretaker.ACS.{Server, Session}
require Logger

Understanding the Firmware Upgrade State Machine

The FirmwareSimulator follows a standard TR-069 firmware upgrade lifecycle:

idle -> downloading -> downloaded -> applying -> rebooting -> upgraded

Key concepts:

  • idle: Device is ready to receive a Download RPC
  • downloading: Firmware download in progress (simulated or real fetch)
  • downloaded: Download complete, ready for TransferComplete notification
  • applying: ACS acknowledged TransferComplete, firmware being applied
  • rebooting: Device rebooting to apply new firmware
  • upgraded: Firmware upgrade complete, device running new version

Creating a FirmwareSimulator

# Create a simulator with a specific current version and timing options
{:ok, sim} = FirmwareSimulator.start_link(
  current_version: "1.0.0",
  download_behavior: :mock,      # :mock or :fetch (validates URL exists)
  download_duration: 2_000,      # 2 second simulated download
  reboot_delay: 1_000            # 1 second simulated reboot
)

# Check initial status
{state, info} = FirmwareSimulator.status(sim)
Logger.info("Initial state: #{state}")
Logger.info("Current version: #{info.current_version}")
Logger.info("Target version: #{inspect(info.target_version)}")

{state, info}

Starting a Download (Mock Mode)

When the ACS sends a Download RPC, we start the firmware download simulation.

# Start download with firmware metadata
result = FirmwareSimulator.start_download(sim, %{
  url: "http://firmware.example.com/firmware_v2.0.0.bin",
  command_key: "upgrade-001",
  file_type: "1 Firmware Upgrade Image",
  target_version: "2.0.0"  # Optional: auto-extracted from URL if not provided
})

Logger.info("Download started: #{inspect(result)}")

# Check status immediately after starting
{state, info} = FirmwareSimulator.status(sim)
Logger.info("State after start: #{state}")
Logger.info("Command key: #{info.command_key}")
Logger.info("Target version: #{info.target_version}")

{state, info}

Checking Download Status and Completion

# Poll for download completion (in practice, this would be event-driven)
check_status = fn ->
  {state, info} = FirmwareSimulator.status(sim)
  Logger.info("Current state: #{state}")
  Logger.info("Download started: #{info.download_started_at}")
  Logger.info("Download completed: #{info.download_completed_at}")
  {state, info}
end

# Initial check
check_status.()

# Wait for download to complete (download_duration was 2000ms)
Logger.info("Waiting for download to complete...")
Process.sleep(2500)

# Check if download is complete
is_complete = FirmwareSimulator.download_complete?(sim)
Logger.info("Download complete?: #{is_complete}")

check_status.()

Getting Transfer Times for TransferComplete

After download completes, we need timestamps for the TransferComplete RPC.

# Get transfer times in ISO8601 format for TR-069 TransferComplete
{start_time, complete_time} = FirmwareSimulator.transfer_times(sim)

Logger.info("Start time: #{start_time}")
Logger.info("Complete time: #{complete_time}")

# Get fault information (0 = success)
{fault_code, fault_string} = FirmwareSimulator.fault_info(sim)
Logger.info("Fault code: #{fault_code}")
Logger.info("Fault string: #{inspect(fault_string)}")

# Get command key for correlation
command_key = FirmwareSimulator.command_key(sim)
Logger.info("Command key: #{command_key}")

%{
  start_time: start_time,
  complete_time: complete_time,
  fault_code: fault_code,
  command_key: command_key
}

Acknowledging Transfer and Starting Reboot

After the ACS receives TransferComplete, it acknowledges and may send a Reboot RPC.

# Acknowledge that ACS received TransferComplete
:ok = FirmwareSimulator.transfer_acknowledged(sim)

{state, _} = FirmwareSimulator.status(sim)
Logger.info("State after acknowledgment: #{state}")

# Start the reboot simulation (returns delay in ms)
{:ok, reboot_delay} = FirmwareSimulator.start_reboot(sim)
Logger.info("Reboot started, delay: #{reboot_delay}ms")

{state, _} = FirmwareSimulator.status(sim)
Logger.info("State during reboot: #{state}")

state

Verifying Version Update After Reboot

# Wait for reboot to complete
Logger.info("Waiting for reboot to complete...")
Process.sleep(1500)

# Check if reboot is complete
is_complete = FirmwareSimulator.reboot_complete?(sim)
Logger.info("Reboot complete?: #{is_complete}")

# Get the new current version
new_version = FirmwareSimulator.current_version(sim)
Logger.info("New firmware version: #{new_version}")

{state, info} = FirmwareSimulator.status(sim)
Logger.info("Final state: #{state}")

%{
  state: state,
  version: new_version,
  target_cleared: info.target_version == nil
}

Telemetry Events During Firmware Upgrade

The FirmwareSimulator emits telemetry events at key lifecycle points.

# Set up telemetry handler to capture events
test_pid = self()
handler_id = "livebook-firmware-telemetry"

events = [
  [:caretaker, :firmware, :download, :start],
  [:caretaker, :firmware, :download, :complete],
  [:caretaker, :firmware, :download, :failed],
  [:caretaker, :firmware, :transfer, :acknowledged],
  [:caretaker, :firmware, :reboot, :start],
  [:caretaker, :firmware, :reboot, :complete]
]

:telemetry.attach_many(
  handler_id,
  events,
  fn name, measurements, metadata, _config ->
    event_name = name |> Enum.join(".")
    send(test_pid, {:telemetry_event, event_name, measurements, metadata})
    Logger.info("Telemetry: #{event_name} - #{inspect(metadata)}")
  end,
  nil
)

Logger.info("Telemetry handler attached for events:")
Enum.each(events, fn e -> Logger.info("  - #{Enum.join(e, ".")}")end)

:ok
# Create a new simulator and run through the full upgrade cycle
{:ok, sim2} = FirmwareSimulator.start_link(
  current_version: "1.0.0",
  download_duration: 500,
  reboot_delay: 500
)

# Start download - triggers download.start event
{:ok, :downloading} = FirmwareSimulator.start_download(sim2, %{
  url: "http://example.com/fw_v3.0.0.bin",
  command_key: "telemetry-demo",
  target_version: "3.0.0"
})

# Wait for download - triggers download.complete event
Process.sleep(700)

# Acknowledge transfer - triggers transfer.acknowledged event
:ok = FirmwareSimulator.transfer_acknowledged(sim2)

# Start reboot - triggers reboot.start event
{:ok, _} = FirmwareSimulator.start_reboot(sim2)

# Wait for reboot - triggers reboot.complete event
Process.sleep(700)

Logger.info("Final version: #{FirmwareSimulator.current_version(sim2)}")

# Receive telemetry events
receive_events = fn ->
  receive do
    {:telemetry_event, name, measurements, metadata} ->
      %{event: name, measurements: measurements, metadata: metadata}
  after
    100 -> nil
  end
end

# Collect all received events
events_received = Stream.repeatedly(receive_events)
|> Enum.take_while(&(&1 != nil))

Logger.info("Captured #{length(events_received)} telemetry events")
events_received
# Clean up telemetry handler
:telemetry.detach(handler_id)
Logger.info("Telemetry handler detached")
:ok

Integration with DeviceState

The FirmwareSimulator integrates with DeviceState through the firmware_simulator option.

# Create a firmware simulator
{:ok, sim3} = FirmwareSimulator.start_link(
  current_version: "1.0.0",
  download_duration: 100,
  reboot_delay: 100
)

# Create DeviceState with firmware_simulator option
{:ok, device_state} = DeviceState.start_link(
  device_id: %{oui: "DEMO01", product_class: "Router", serial_number: "FW001"},
  params: %{
    "Device.DeviceInfo.Manufacturer" => "Acme Corp",
    "Device.DeviceInfo.SoftwareVersion" => "1.0.0",
    "Device.DeviceInfo.SerialNumber" => "FW001"
  },
  firmware_simulator: sim3
)

# Retrieve the firmware simulator from DeviceState
{:ok, retrieved_sim} = DeviceState.get_option(device_state, :firmware_simulator)
Logger.info("Retrieved simulator: #{inspect(retrieved_sim)}")
Logger.info("Same as original: #{retrieved_sim == sim3}")

# Can also set the option dynamically
{:ok, new_sim} = FirmwareSimulator.start_link(current_version: "2.0.0")
:ok = DeviceState.set_option(device_state, :firmware_simulator, new_sim)

{:ok, updated_sim} = DeviceState.get_option(device_state, :firmware_simulator)
Logger.info("Updated simulator version: #{FirmwareSimulator.current_version(updated_sim)}")

%{
  original_sim: sim3,
  new_sim: new_sim,
  option_works: retrieved_sim == sim3
}

Full End-to-End Example with CPE Client Session

This demonstrates the complete firmware upgrade flow with the ACS.

# Start the ACS server
port = 4090

{:ok, acs_sup} = Supervisor.start_link(
  [Server.child_spec(port: port)],
  strategy: :one_for_one
)

# Start the session queue
{:ok, session_pid} = Session.start_link()

# Ensure Finch is started
case Process.whereis(Caretaker.Finch) do
  nil -> {:ok, _} = Finch.start_link(name: Caretaker.Finch)
  _ -> :ok
end

Logger.info("ACS server started on port #{port}")

%{acs_url: "http://localhost:#{port}/cwmp", port: port}
# Create firmware simulator for the device
{:ok, fw_sim} = FirmwareSimulator.start_link(
  current_version: "1.0.0",
  download_duration: 500,
  reboot_delay: 500
)

# Create device with firmware simulator
device_id = %{
  manufacturer: "Acme Corp",
  oui: "E2E001",
  product_class: "Router",
  serial_number: "E2E-FW-001"
}

{:ok, device} = DeviceState.start_link(
  device_id: %{oui: "E2E001", product_class: "Router", serial_number: "E2E-FW-001"},
  params: %{
    "Device.DeviceInfo.Manufacturer" => "Acme Corp",
    "Device.DeviceInfo.SoftwareVersion" => "1.0.0",
    "Device.DeviceInfo.SerialNumber" => "E2E-FW-001"
  },
  firmware_simulator: fw_sim
)

Logger.info("Device created with firmware version: #{FirmwareSimulator.current_version(fw_sim)}")

%{device: device, firmware_sim: fw_sim, device_id: device_id}
# Attach telemetry to observe RPC responses
test_pid = self()
rpc_handler_id = "livebook-rpc-handler"

:telemetry.attach(
  rpc_handler_id,
  [:caretaker, :cpe_client, :rpc, :responded],
  fn _name, _measurements, metadata, _config ->
    send(test_pid, {:rpc_response, metadata.rpc, metadata})
  end,
  nil
)

Logger.info("RPC telemetry handler attached")
:ok
# Create and queue a Download RPC
download = Caretaker.TR069.RPC.Download.new(
  command_key: "upgrade-e2e-001",
  file_type: "1 Firmware Upgrade Image",
  url: "http://firmware.example.com/firmware_v2.0.0.bin",
  file_size: 1024000,
  delay_seconds: 0
)

{:ok, download_body} = Caretaker.TR069.RPC.Download.encode(download)

# Start session in a background task
acs_url = "http://localhost:4090/cwmp"

session_task = Task.async(fn ->
  Client.run_session(acs_url,
    device_id: device_id,
    device_state: device
  )
end)

# Wait for initial GetParameterValues response (auto-queued by ACS)
receive do
  {:rpc_response, "GetParameterValues", _meta} ->
    Logger.info("Received GetParameterValues response")
after
  3000 -> Logger.warn("Timeout waiting for GetParameterValues")
end

# Queue the Download RPC
:ok = Session.queue_for_ip({127, 0, 0, 1}, download_body)
Logger.info("Download RPC queued")

# Wait for Download response
receive do
  {:rpc_response, "Download", meta} ->
    Logger.info("Download response received!")
    Logger.info("  Command key: #{meta.command_key}")
    Logger.info("  Status: #{meta.status} (1 = async download started)")
after
  3000 -> Logger.warn("Timeout waiting for Download response")
end

# Wait for session to complete
{:ok, result} = Task.await(session_task, 5000)
Logger.info("Session completed, last RPC: #{result.rpc}")

result
# Check firmware simulator state after Download RPC
Process.sleep(1000)  # Wait for download to complete

{state, info} = FirmwareSimulator.status(fw_sim)
Logger.info("Firmware state after session: #{state}")
Logger.info("Download complete?: #{FirmwareSimulator.download_complete?(fw_sim)}")

# Get transfer times for TransferComplete (would be sent in next session)
{start_time, complete_time} = FirmwareSimulator.transfer_times(fw_sim)
Logger.info("Transfer start: #{start_time}")
Logger.info("Transfer complete: #{complete_time}")

%{state: state, info: info}
# Simulate reboot phase (would normally be triggered by Reboot RPC)
Logger.info("Simulating reboot phase...")

# Acknowledge transfer
:ok = FirmwareSimulator.transfer_acknowledged(fw_sim)

# Start reboot
{:ok, delay} = FirmwareSimulator.start_reboot(fw_sim)
Logger.info("Reboot started, delay: #{delay}ms")

# Wait for reboot
Process.sleep(700)

# Check final state
final_version = FirmwareSimulator.current_version(fw_sim)
Logger.info("Firmware upgrade complete!")
Logger.info("New version: #{final_version}")

{state, _} = FirmwareSimulator.status(fw_sim)
Logger.info("Final state: #{state}")

%{
  final_version: final_version,
  state: state,
  upgrade_successful: final_version == "2.0.0"
}

Resetting the Simulator

You can reset the simulator to test multiple upgrade cycles.

# Reset to idle state (version is preserved)
:ok = FirmwareSimulator.reset(fw_sim)

{state, info} = FirmwareSimulator.status(fw_sim)
Logger.info("After reset - State: #{state}")
Logger.info("After reset - Version: #{info.current_version}")
Logger.info("After reset - Command key: #{inspect(info.command_key)}")

%{state: state, version_preserved: info.current_version == "2.0.0"}

Cleanup

# Detach telemetry handler
:telemetry.detach(rpc_handler_id)

# Stop processes
GenServer.stop(session_pid)
Supervisor.stop(acs_sup)

# Stop simulators and device states
Agent.stop(sim)
Agent.stop(sim2)
Agent.stop(sim3)
Agent.stop(fw_sim)
Agent.stop(device_state)
Agent.stop(device)
Agent.stop(new_sim)

Logger.info("Cleanup complete")
:ok