Firmware Upgrade Simulation
> This notebook demonstrates the FirmwareSimulator module for simulating TR-069 firmware upgrade workflows.
Setup
Mix.install([
{:caretaker, path: "."}
])
alias Caretaker.CPE.{FirmwareSimulator, DeviceState, Client}
alias Caretaker.ACS.{Server, Session}
require Logger
Understanding the Firmware Upgrade State Machine
The FirmwareSimulator follows a standard TR-069 firmware upgrade lifecycle:
idle -> downloading -> downloaded -> applying -> rebooting -> upgraded
Key concepts:
- idle: Device is ready to receive a Download RPC
- downloading: Firmware download in progress (simulated or real fetch)
- downloaded: Download complete, ready for TransferComplete notification
- applying: ACS acknowledged TransferComplete, firmware being applied
- rebooting: Device rebooting to apply new firmware
- upgraded: Firmware upgrade complete, device running new version
Creating a FirmwareSimulator
# Create a simulator with a specific current version and timing options
{:ok, sim} = FirmwareSimulator.start_link(
current_version: "1.0.0",
download_behavior: :mock, # :mock or :fetch (validates URL exists)
download_duration: 2_000, # 2 second simulated download
reboot_delay: 1_000 # 1 second simulated reboot
)
# Check initial status
{state, info} = FirmwareSimulator.status(sim)
Logger.info("Initial state: #{state}")
Logger.info("Current version: #{info.current_version}")
Logger.info("Target version: #{inspect(info.target_version)}")
{state, info}
Starting a Download (Mock Mode)
When the ACS sends a Download RPC, we start the firmware download simulation.
# Start download with firmware metadata
result = FirmwareSimulator.start_download(sim, %{
url: "http://firmware.example.com/firmware_v2.0.0.bin",
command_key: "upgrade-001",
file_type: "1 Firmware Upgrade Image",
target_version: "2.0.0" # Optional: auto-extracted from URL if not provided
})
Logger.info("Download started: #{inspect(result)}")
# Check status immediately after starting
{state, info} = FirmwareSimulator.status(sim)
Logger.info("State after start: #{state}")
Logger.info("Command key: #{info.command_key}")
Logger.info("Target version: #{info.target_version}")
{state, info}
Checking Download Status and Completion
# Poll for download completion (in practice, this would be event-driven)
check_status = fn ->
{state, info} = FirmwareSimulator.status(sim)
Logger.info("Current state: #{state}")
Logger.info("Download started: #{info.download_started_at}")
Logger.info("Download completed: #{info.download_completed_at}")
{state, info}
end
# Initial check
check_status.()
# Wait for download to complete (download_duration was 2000ms)
Logger.info("Waiting for download to complete...")
Process.sleep(2500)
# Check if download is complete
is_complete = FirmwareSimulator.download_complete?(sim)
Logger.info("Download complete?: #{is_complete}")
check_status.()
Getting Transfer Times for TransferComplete
After download completes, we need timestamps for the TransferComplete RPC.
# Get transfer times in ISO8601 format for TR-069 TransferComplete
{start_time, complete_time} = FirmwareSimulator.transfer_times(sim)
Logger.info("Start time: #{start_time}")
Logger.info("Complete time: #{complete_time}")
# Get fault information (0 = success)
{fault_code, fault_string} = FirmwareSimulator.fault_info(sim)
Logger.info("Fault code: #{fault_code}")
Logger.info("Fault string: #{inspect(fault_string)}")
# Get command key for correlation
command_key = FirmwareSimulator.command_key(sim)
Logger.info("Command key: #{command_key}")
%{
start_time: start_time,
complete_time: complete_time,
fault_code: fault_code,
command_key: command_key
}
Acknowledging Transfer and Starting Reboot
After the ACS receives TransferComplete, it acknowledges and may send a Reboot RPC.
# Acknowledge that ACS received TransferComplete
:ok = FirmwareSimulator.transfer_acknowledged(sim)
{state, _} = FirmwareSimulator.status(sim)
Logger.info("State after acknowledgment: #{state}")
# Start the reboot simulation (returns delay in ms)
{:ok, reboot_delay} = FirmwareSimulator.start_reboot(sim)
Logger.info("Reboot started, delay: #{reboot_delay}ms")
{state, _} = FirmwareSimulator.status(sim)
Logger.info("State during reboot: #{state}")
state
Verifying Version Update After Reboot
# Wait for reboot to complete
Logger.info("Waiting for reboot to complete...")
Process.sleep(1500)
# Check if reboot is complete
is_complete = FirmwareSimulator.reboot_complete?(sim)
Logger.info("Reboot complete?: #{is_complete}")
# Get the new current version
new_version = FirmwareSimulator.current_version(sim)
Logger.info("New firmware version: #{new_version}")
{state, info} = FirmwareSimulator.status(sim)
Logger.info("Final state: #{state}")
%{
state: state,
version: new_version,
target_cleared: info.target_version == nil
}
Telemetry Events During Firmware Upgrade
The FirmwareSimulator emits telemetry events at key lifecycle points.
# Set up telemetry handler to capture events
test_pid = self()
handler_id = "livebook-firmware-telemetry"
events = [
[:caretaker, :firmware, :download, :start],
[:caretaker, :firmware, :download, :complete],
[:caretaker, :firmware, :download, :failed],
[:caretaker, :firmware, :transfer, :acknowledged],
[:caretaker, :firmware, :reboot, :start],
[:caretaker, :firmware, :reboot, :complete]
]
:telemetry.attach_many(
handler_id,
events,
fn name, measurements, metadata, _config ->
event_name = name |> Enum.join(".")
send(test_pid, {:telemetry_event, event_name, measurements, metadata})
Logger.info("Telemetry: #{event_name} - #{inspect(metadata)}")
end,
nil
)
Logger.info("Telemetry handler attached for events:")
Enum.each(events, fn e -> Logger.info(" - #{Enum.join(e, ".")}")end)
:ok
# Create a new simulator and run through the full upgrade cycle
{:ok, sim2} = FirmwareSimulator.start_link(
current_version: "1.0.0",
download_duration: 500,
reboot_delay: 500
)
# Start download - triggers download.start event
{:ok, :downloading} = FirmwareSimulator.start_download(sim2, %{
url: "http://example.com/fw_v3.0.0.bin",
command_key: "telemetry-demo",
target_version: "3.0.0"
})
# Wait for download - triggers download.complete event
Process.sleep(700)
# Acknowledge transfer - triggers transfer.acknowledged event
:ok = FirmwareSimulator.transfer_acknowledged(sim2)
# Start reboot - triggers reboot.start event
{:ok, _} = FirmwareSimulator.start_reboot(sim2)
# Wait for reboot - triggers reboot.complete event
Process.sleep(700)
Logger.info("Final version: #{FirmwareSimulator.current_version(sim2)}")
# Receive telemetry events
receive_events = fn ->
receive do
{:telemetry_event, name, measurements, metadata} ->
%{event: name, measurements: measurements, metadata: metadata}
after
100 -> nil
end
end
# Collect all received events
events_received = Stream.repeatedly(receive_events)
|> Enum.take_while(&(&1 != nil))
Logger.info("Captured #{length(events_received)} telemetry events")
events_received
# Clean up telemetry handler
:telemetry.detach(handler_id)
Logger.info("Telemetry handler detached")
:ok
Integration with DeviceState
The FirmwareSimulator integrates with DeviceState through the firmware_simulator option.
# Create a firmware simulator
{:ok, sim3} = FirmwareSimulator.start_link(
current_version: "1.0.0",
download_duration: 100,
reboot_delay: 100
)
# Create DeviceState with firmware_simulator option
{:ok, device_state} = DeviceState.start_link(
device_id: %{oui: "DEMO01", product_class: "Router", serial_number: "FW001"},
params: %{
"Device.DeviceInfo.Manufacturer" => "Acme Corp",
"Device.DeviceInfo.SoftwareVersion" => "1.0.0",
"Device.DeviceInfo.SerialNumber" => "FW001"
},
firmware_simulator: sim3
)
# Retrieve the firmware simulator from DeviceState
{:ok, retrieved_sim} = DeviceState.get_option(device_state, :firmware_simulator)
Logger.info("Retrieved simulator: #{inspect(retrieved_sim)}")
Logger.info("Same as original: #{retrieved_sim == sim3}")
# Can also set the option dynamically
{:ok, new_sim} = FirmwareSimulator.start_link(current_version: "2.0.0")
:ok = DeviceState.set_option(device_state, :firmware_simulator, new_sim)
{:ok, updated_sim} = DeviceState.get_option(device_state, :firmware_simulator)
Logger.info("Updated simulator version: #{FirmwareSimulator.current_version(updated_sim)}")
%{
original_sim: sim3,
new_sim: new_sim,
option_works: retrieved_sim == sim3
}
Full End-to-End Example with CPE Client Session
This demonstrates the complete firmware upgrade flow with the ACS.
# Start the ACS server
port = 4090
{:ok, acs_sup} = Supervisor.start_link(
[Server.child_spec(port: port)],
strategy: :one_for_one
)
# Start the session queue
{:ok, session_pid} = Session.start_link()
# Ensure Finch is started
case Process.whereis(Caretaker.Finch) do
nil -> {:ok, _} = Finch.start_link(name: Caretaker.Finch)
_ -> :ok
end
Logger.info("ACS server started on port #{port}")
%{acs_url: "http://localhost:#{port}/cwmp", port: port}
# Create firmware simulator for the device
{:ok, fw_sim} = FirmwareSimulator.start_link(
current_version: "1.0.0",
download_duration: 500,
reboot_delay: 500
)
# Create device with firmware simulator
device_id = %{
manufacturer: "Acme Corp",
oui: "E2E001",
product_class: "Router",
serial_number: "E2E-FW-001"
}
{:ok, device} = DeviceState.start_link(
device_id: %{oui: "E2E001", product_class: "Router", serial_number: "E2E-FW-001"},
params: %{
"Device.DeviceInfo.Manufacturer" => "Acme Corp",
"Device.DeviceInfo.SoftwareVersion" => "1.0.0",
"Device.DeviceInfo.SerialNumber" => "E2E-FW-001"
},
firmware_simulator: fw_sim
)
Logger.info("Device created with firmware version: #{FirmwareSimulator.current_version(fw_sim)}")
%{device: device, firmware_sim: fw_sim, device_id: device_id}
# Attach telemetry to observe RPC responses
test_pid = self()
rpc_handler_id = "livebook-rpc-handler"
:telemetry.attach(
rpc_handler_id,
[:caretaker, :cpe_client, :rpc, :responded],
fn _name, _measurements, metadata, _config ->
send(test_pid, {:rpc_response, metadata.rpc, metadata})
end,
nil
)
Logger.info("RPC telemetry handler attached")
:ok
# Create and queue a Download RPC
download = Caretaker.TR069.RPC.Download.new(
command_key: "upgrade-e2e-001",
file_type: "1 Firmware Upgrade Image",
url: "http://firmware.example.com/firmware_v2.0.0.bin",
file_size: 1024000,
delay_seconds: 0
)
{:ok, download_body} = Caretaker.TR069.RPC.Download.encode(download)
# Start session in a background task
acs_url = "http://localhost:4090/cwmp"
session_task = Task.async(fn ->
Client.run_session(acs_url,
device_id: device_id,
device_state: device
)
end)
# Wait for initial GetParameterValues response (auto-queued by ACS)
receive do
{:rpc_response, "GetParameterValues", _meta} ->
Logger.info("Received GetParameterValues response")
after
3000 -> Logger.warn("Timeout waiting for GetParameterValues")
end
# Queue the Download RPC
:ok = Session.queue_for_ip({127, 0, 0, 1}, download_body)
Logger.info("Download RPC queued")
# Wait for Download response
receive do
{:rpc_response, "Download", meta} ->
Logger.info("Download response received!")
Logger.info(" Command key: #{meta.command_key}")
Logger.info(" Status: #{meta.status} (1 = async download started)")
after
3000 -> Logger.warn("Timeout waiting for Download response")
end
# Wait for session to complete
{:ok, result} = Task.await(session_task, 5000)
Logger.info("Session completed, last RPC: #{result.rpc}")
result
# Check firmware simulator state after Download RPC
Process.sleep(1000) # Wait for download to complete
{state, info} = FirmwareSimulator.status(fw_sim)
Logger.info("Firmware state after session: #{state}")
Logger.info("Download complete?: #{FirmwareSimulator.download_complete?(fw_sim)}")
# Get transfer times for TransferComplete (would be sent in next session)
{start_time, complete_time} = FirmwareSimulator.transfer_times(fw_sim)
Logger.info("Transfer start: #{start_time}")
Logger.info("Transfer complete: #{complete_time}")
%{state: state, info: info}
# Simulate reboot phase (would normally be triggered by Reboot RPC)
Logger.info("Simulating reboot phase...")
# Acknowledge transfer
:ok = FirmwareSimulator.transfer_acknowledged(fw_sim)
# Start reboot
{:ok, delay} = FirmwareSimulator.start_reboot(fw_sim)
Logger.info("Reboot started, delay: #{delay}ms")
# Wait for reboot
Process.sleep(700)
# Check final state
final_version = FirmwareSimulator.current_version(fw_sim)
Logger.info("Firmware upgrade complete!")
Logger.info("New version: #{final_version}")
{state, _} = FirmwareSimulator.status(fw_sim)
Logger.info("Final state: #{state}")
%{
final_version: final_version,
state: state,
upgrade_successful: final_version == "2.0.0"
}
Resetting the Simulator
You can reset the simulator to test multiple upgrade cycles.
# Reset to idle state (version is preserved)
:ok = FirmwareSimulator.reset(fw_sim)
{state, info} = FirmwareSimulator.status(fw_sim)
Logger.info("After reset - State: #{state}")
Logger.info("After reset - Version: #{info.current_version}")
Logger.info("After reset - Command key: #{inspect(info.command_key)}")
%{state: state, version_preserved: info.current_version == "2.0.0"}
Cleanup
# Detach telemetry handler
:telemetry.detach(rpc_handler_id)
# Stop processes
GenServer.stop(session_pid)
Supervisor.stop(acs_sup)
# Stop simulators and device states
Agent.stop(sim)
Agent.stop(sim2)
Agent.stop(sim3)
Agent.stop(fw_sim)
Agent.stop(device_state)
Agent.stop(device)
Agent.stop(new_sim)
Logger.info("Cleanup complete")
:ok