Powered by AppSignal & Oban Pro

System Design: Real-Time Fleet Vehicle Tracking System

vehicle_tracking_system.livemd

System Design: Real-Time Fleet Vehicle Tracking System

Mix.install([
  {:choreo, github: "code-shoily/choreo", branch: "main"},
  {:kino, "~> 0.19"}
])

🚗 1. Business Context & Requirements

This system design details a enterprise-grade, real-time vehicle tracking platform managing a heterogeneous fleet of 10,000 vehicles (comprising office vehicles, trucks, and motorbikes) operating across 3 distinct timezones. Telemetry data is captured once every minute per vehicle from a mix of dedicated IoT hardware and mobile apps. We require a robust, scalable architecture with a strict security access control model, real-time command dispatch capabilities, and historical data retention for up to 3 years.

🔒 Access Control Model (ACL)

  1. Departmental Isolation: Vehicles are partitioned by departments (e.g., Logistics, Corporate Office, Field Operations). Users and vehicles belong to specific departments.
  2. Role-Based Visibility: Telemetry data, real-time tracking, and historical logs are strictly visible only to users with the Supervisor role within their corresponding department. A supervisor from “Field Operations” cannot view vehicles or reports from “Logistics”.
  3. Admin Overrides: Global administrators have cross-departmental access for configuration and hardware provisioning.

📥 Telemetry Ingestion Constraints

  • IoT Devices: Send telemetry payloads formatted as raw NMEA 0183 sentences (e.g., $GPRMC) over lightweight protocols (UDP or raw TCP).
  • Mobile Applications: Send structured JSON payloads containing latitude, longitude, speed, battery levels, and timestamp details over HTTPS or secure WebSockets.
  • Frequency: Each vehicle emits 1 telemetry point per minute.
  • Volume Projection:
    • 10,000 vehicles × 1 message/min = 10,000 writes/min (~167 writes/second).
    • Daily Telemetry Points: 14.4 Million records.
    • Yearly Telemetry Points: ~5.26 Billion records.
    • 3-Year Telemetry Points: ~15.78 Billion records (requiring tiered warm/cold storage strategy).

📊 Reporting & Direct Command Requirements

  • Batch/On-Demand Reports: Daily start/stop times, max/average speed per trip, stoppage analysis (idling duration), and geofence entry/exit violations.
  • Interactive Command Dispatch: Ability to send real-time commands from the supervisor’s browser down to the active vehicle device or application (e.g., immobilize_engine, get_instant_location, ping_diagnostics).

🗄️ 2. Telemetry Data Taxonomy & ERD

To support strict ACL boundaries and high-throughput telemetry, the database schema is divided into relational metadata (users, roles, departments, geofences) and time-series data (telemetry logs, geofence violations, command history).

alias Choreo.ERD
alias Choreo.Lab.{Siren, Sketch}

db_schema =
  ERD.new()
  # Organization & ACL
  |> ERD.add_table(:departments, columns: [
    %{name: :id, type: :uuid, key: :pk},
    %{name: :name, type: :varchar},
    %{name: :timezone, type: :varchar, comment: "One of the 3 supported timezone regions"}
  ])
  |> ERD.add_table(:users, columns: [
    %{name: :id, type: :uuid, key: :pk},
    %{name: :email, type: :varchar},
    %{name: :password_hash, type: :varchar},
    %{name: :role, type: :varchar, comment: "supervisor, driver, admin"},
    %{name: :department_id, type: :uuid, key: :fk}
  ])
  
  # Vehicles
  |> ERD.add_table(:vehicles, columns: [
    %{name: :id, type: :uuid, key: :pk},
    %{name: :license_plate, type: :varchar},
    %{name: :type, type: :varchar, comment: "office_vehicle, truck, motorbike"},
    %{name: :device_uid, type: :varchar, comment: "Hardware IMEI or App client token"},
    %{name: :device_type, type: :varchar, comment: "iot_nmea, app_json"},
    %{name: :status, type: :varchar, comment: "active, maintenance, retired"},
    %{name: :department_id, type: :uuid, key: :fk}
  ])

  # Geofencing
  |> ERD.add_table(:geofences, columns: [
    %{name: :id, type: :uuid, key: :pk},
    %{name: :name, type: :varchar},
    %{name: :polygon_coordinates, type: :geometry, comment: "Spatial boundary data"},
    %{name: :department_id, type: :uuid, key: :fk}
  ])

  # High-throughput Telemetry (Partitioned by Month / Year)
  |> ERD.add_table(:telemetry_logs, columns: [
    %{name: :id, type: :bigint, key: :pk},
    %{name: :vehicle_id, type: :uuid, key: :fk},
    %{name: :timestamp, type: :timestamp_tz},
    %{name: :latitude, type: :double},
    %{name: :longitude, type: :double},
    %{name: :speed_kmh, type: :double},
    %{name: :heading, type: :int, comment: "0-359 degrees"},
    %{name: :raw_payload, type: :text, comment: "Original NMEA or JSON string"}
  ])

  # Alerts & Violations
  |> ERD.add_table(:geofence_violations, columns: [
    %{name: :id, type: :uuid, key: :pk},
    %{name: :vehicle_id, type: :uuid, key: :fk},
    %{name: :geofence_id, type: :uuid, key: :fk},
    %{name: :violation_type, type: :varchar, comment: "entry, exit"},
    %{name: :timestamp, type: :timestamp_tz}
  ])

  # Device Downlink Commands
  |> ERD.add_table(:device_commands, columns: [
    %{name: :id, type: :uuid, key: :pk},
    %{name: :vehicle_id, type: :uuid, key: :fk},
    %{name: :command_type, type: :varchar, comment: "ping, engine_lock, reset"},
    %{name: :payload, type: :jsonb},
    %{name: :status, type: :varchar, comment: "pending, sent, acknowledged, failed"},
    %{name: :issued_by_user_id, type: :uuid, key: :fk},
    %{name: :created_at, type: :timestamp_tz},
    %{name: :updated_at, type: :timestamp_tz}
  ])

  # Relationships
  |> ERD.add_relationship(:departments, :users, cardinality: :exactly_one_to_many, label: "employs")
  |> ERD.add_relationship(:departments, :vehicles, cardinality: :exactly_one_to_many, label: "allocates")
  |> ERD.add_relationship(:departments, :geofences, cardinality: :exactly_one_to_many, label: "monitors")
  |> ERD.add_relationship(:vehicles, :telemetry_logs, cardinality: :exactly_one_to_many, label: "emits")
  |> ERD.add_relationship(:vehicles, :geofence_violations, cardinality: :exactly_one_to_many, label: "triggers")
  |> ERD.add_relationship(:geofences, :geofence_violations, cardinality: :exactly_one_to_many, label: "references")
  |> ERD.add_relationship(:vehicles, :device_commands, cardinality: :exactly_one_to_many, label: "receives")
  |> ERD.add_relationship(:users, :device_commands, cardinality: :exactly_one_to_many, label: "commands")
mermaid_erd = ERD.to_mermaid(db_schema)

Kino.Layout.tabs(
  "Entity Relationship Model (Siren)": Siren.new(mermaid_erd, height: "650px", theme: "neutral"),
  "Interactive Sketch Canvas": Sketch.new(mermaid_erd, height: "1000px")
)

🏛️ 3. High-Level System Architecture (C4 Model)

The ingestion and execution flows utilize Elixir’s concurrency model. Specifically, each active vehicle (out of 10,000) is backed by a dedicated, dynamic GenServer process. These GenServers are distributed across the Elixir cluster using a cluster registry (e.g., Horde or DynamicSupervisor with pg).

Architecture Decomposition

  1. Ingestion Gateways:
    • TCP/UDP Ingestion Gateway: Custom Elixir/Erlang listeners (using ranch or :gen_tcp/:gen_udp) that accept connections from IoT hardware, parse raw NMEA sentences, and dispatch updates to the matching vehicle GenServer.
    • HTTP/WS Ingestion Gateway: Phoenix channels and HTTP controllers that authenticate client apps and parse JSON telemetry.
  2. Elixir Vehicle GenServers (Active Tracking Layer):
    • Keeps the latest state in memory (last location, active geofences, speed, connection health).
    • Validates coordinates against active geofences in-process (or via spatial indexing).
    • Buffers telemetry writes to prevent database throttling.
    • Maintains active TCP socket/WS connection refs to send direct commands down to the hardware immediately.
  3. Tiered Storage Layer:
    • Hot State (Redis): Fast lookup of the latest coordinates for dashboard rendering.
    • Warm Storage (TimescaleDB / Postgres): Retains 3 to 6 months of detailed telemetry for real-time reporting and alerts.
    • Cold Storage (S3 / Parquet): Raw/compressed telemetry partition files. An ETL process runs daily to batch-migrate warm telemetry to Parquet format in S3, allowing cost-effective storage for 3 years, queried on-demand using AWS Athena.
alias Choreo.C4

c4_system =
  C4.new()
  # Actors
  |> C4.add_person(:supervisor, label: "Department Supervisor", description: "Monitors department vehicle coordinates, views reports, and dispatches commands.")
  |> C4.add_person(:driver, label: "Vehicle Driver", description: "Drives vehicles, broadcasts telemetry from mobile applications.")

  # System Boundaries
  |> C4.add_software_system(:vehicle_fleet, label: "Vehicle Fleet System", scope: :in)
  
  # External telemetry units
  |> C4.add_container(:iot_hardware, label: "IoT GPS Device", technology: "Embedded SIM / NMEA 0183", description: "Broadcasts GPS sentences over UDP every minute.")
  |> C4.add_container(:driver_app, label: "Driver Mobile App", technology: "iOS / Android (JSON)", description: "Sends locations over JSON WebSockets / HTTPS.")

  # Web Interfaces
  |> C4.add_container(:supervisor_web_portal, label: "Supervisor Dashboard", technology: "Phoenix LiveView", description: "Real-time tracking UI, command console, and reporting views.", parent: :vehicle_fleet)

  # Gateway Layers
  |> C4.add_container(:load_balancer, label: "Load Balancer", technology: "HAProxy / AWS ALB", description: "Routes traffic to gateways and handles TCP/UDP/HTTPS load distribution.", parent: :vehicle_fleet)
  |> C4.add_container(:telemetry_gateway, label: "Elixir Ingestion Gateways", technology: "Elixir / Phoenix / Ranch", description: "Decodes NMEA sentences and JSON. Routes inputs to Vehicle GenServers.", parent: :vehicle_fleet)

  # Core Compute / Stateful Layer
  |> C4.add_container(:genserver_registry, label: "Horde / Dynamic Registry", technology: "Elixir Registry", description: "Maps 10,000 Vehicle IDs to active GenServer PIDs.", parent: :vehicle_fleet)
  |> C4.add_container(:vehicle_genserver, label: "Vehicle GenServer Instance", technology: "Elixir GenServer", description: "One per vehicle. Manages current state, local geofence rules, command delivery.", parent: :vehicle_fleet)

  # Storage Tiers
  |> C4.add_container(:redis_cache, label: "Latest Location Cache", technology: "Redis Cluster", description: "Caches current coordinates and supervisor sessions.", parent: :vehicle_fleet)
  |> C4.add_container(:timescaledb, label: "Warm Database", technology: "TimescaleDB (Postgres)", description: "Stores relational metadata (ACL) and warm telemetry logs.", parent: :vehicle_fleet)
  |> C4.add_container(:s3_cold, label: "Cold Archive", technology: "S3 Object Store (Parquet)", description: "Holds up to 3 years of telemetry logs for long-term reporting.", parent: :vehicle_fleet)

  # Worker Pipeline
  |> C4.add_container(:oban_workers, label: "Report Processing Engine", technology: "Oban / Elixir Task", description: "Executes geofence violation detection and daily report generations.", parent: :vehicle_fleet)

  # Connections — Drivers & Hardware
  |> C4.add_relationship(:driver, :driver_app, label: "Uses")
  |> C4.add_relationship(:driver_app, :load_balancer, label: "JSON Telemetry", technology: "WebSockets / HTTPS")
  |> C4.add_relationship(:iot_hardware, :load_balancer, label: "NMEA Sentences", technology: "UDP / TCP")
  |> C4.add_relationship(:load_balancer, :telemetry_gateway, label: "Proxies traffic", technology: "TCP/UDP/HTTP")

  # Connections — Gateway to Elixir Compute
  |> C4.add_relationship(:telemetry_gateway, :genserver_registry, label: "Resolves PID", technology: "Local/Distributed Registry Call")
  |> C4.add_relationship(:telemetry_gateway, :vehicle_genserver, label: "Dispatches telemetry data", technology: "Erlang message send")

  # Compute details
  |> C4.add_relationship(:vehicle_genserver, :redis_cache, label: "Updates latest location", technology: "Redix")
  |> C4.add_relationship(:vehicle_genserver, :timescaledb, label: "Buffers and writes telemetry logs", technology: "Ecto pool")
  |> C4.add_relationship(:vehicle_genserver, :oban_workers, label: "Enqueues geofence events", technology: "Oban enqueuer")

  # Supervisor Web Flow
  |> C4.add_relationship(:supervisor, :supervisor_web_portal, label: "Accesses reports & tracks vehicles", technology: "Web Browser")
  |> C4.add_relationship(:supervisor_web_portal, :timescaledb, label: "Queries metadata & ACL definitions", technology: "Ecto SQL")
  |> C4.add_relationship(:supervisor_web_portal, :redis_cache, label: "Reads real-time states", technology: "Redix")
  |> C4.add_relationship(:supervisor_web_portal, :vehicle_genserver, label: "Sends direct downlink command", technology: "GenServer.call")

  # Cold Storage Pipeline
  |> C4.add_relationship(:oban_workers, :timescaledb, label: "Aggregates daily reports", technology: "SQL query")
  |> C4.add_relationship(:oban_workers, :s3_cold, label: "Flushes consolidated data to Parquet", technology: "AWS SDK")
container_view = Choreo.View.zoom(c4_system, level: 1)
mermaid_c4 = C4.to_mermaid(container_view)

Kino.Layout.tabs(
  "C4 Container Architecture (Siren)": Siren.new(mermaid_c4, height: "750px", theme: :forest),
  "Interactive Sketch Canvas": Sketch.new(mermaid_c4, height: "1200px")
)

🔄 4. Dynamic Telemetry & Command Sequences

Below are the sequence diagrams depicting two vital operations in the system:

  1. Telemetry Ingestion & Geofence Checks for an IoT NMEA sentence.
  2. Direct Downlink Command execution from a supervisor down to a vehicle’s mobile app.

4.1 Ingestion & Real-Time Alert Flow (NMEA over UDP)

alias Choreo.Sequence

ingestion_flow =
  Sequence.new()
  |> Sequence.add_participant(:iot, label: "IoT GPS Hardware")
  |> Sequence.add_participant(:gw, label: "Ingestion Gateway")
  |> Sequence.add_participant(:registry, label: "Horde Registry")
  |> Sequence.add_participant(:genserver, label: "Vehicle GenServer")
  |> Sequence.add_participant(:timescale, label: "TimescaleDB")
  |> Sequence.add_participant(:liveview, label: "Phoenix LiveView")
  |> Sequence.add_actor(:supervisor, label: "Department Supervisor")

  |> Sequence.message(:iot, :gw, label: "Send: $GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*6A")
  |> Sequence.activate(:gw)
  |> Sequence.message(:gw, :registry, label: "lookup(\"vehicle_uuid_1\")")
  |> Sequence.activate(:registry)
  |> Sequence.return(:registry, :gw, label: "PID: 0.450.0")
  |> Sequence.deactivate(:registry)
  
  # Decode NMEA coordinates
  |> Sequence.message(:gw, :genserver, label: "cast(PID, telemetry_coordinates: {48.1173, 11.5166, speed: 41.5})")
  |> Sequence.deactivate(:gw)
  |> Sequence.activate(:genserver)
  
  # GenServer checks internal geofences and schedules DB writes
  |> Sequence.message(:genserver, :timescale, label: "INSERT INTO telemetry_logs (vehicle_uuid_1, speed, point)")
  
  # Alert evaluation
  |> Sequence.message(:genserver, :liveview, label: "PubSub.broadcast(\"dept:logistics\", location_updated)")
  |> Sequence.deactivate(:genserver)
  |> Sequence.activate(:liveview)
  |> Sequence.message(:liveview, :supervisor, label: "Push web socket: Move vehicle on map UI")
  |> Sequence.deactivate(:liveview)

4.2 Supervisor Command Downlink Flow (JSON WebSockets)

This sequence visualizes sending a downlink command (e.g. engine immobilization) initiated by an authorized Supervisor, matching department ACL validations, and dispatching down the active socket.

command_flow =
  Sequence.new()
  |> Sequence.add_actor(:supervisor, label: "Supervisor User")
  |> Sequence.add_participant(:liveview, label: "LiveView Dashboard")
  |> Sequence.add_participant(:genserver, label: "Vehicle GenServer")
  |> Sequence.add_participant(:timescale, label: "TimescaleDB")
  |> Sequence.add_participant(:gw, label: "WebSocket Gateway")
  |> Sequence.add_participant(:app, label: "Driver Mobile App")

  |> Sequence.message(:supervisor, :liveview, label: "Click 'Immobilize Engine'")
  |> Sequence.activate(:liveview)
  
  # Check ACL (ensure user's department_id matches vehicle's department_id)
  |> Sequence.message(:liveview, :timescale, label: "SELECT 1 FROM users u JOIN vehicles v ON u.dept_id = v.dept_id WHERE ...")
  |> Sequence.activate(:timescale)
  |> Sequence.return(:timescale, :liveview, label: "Authorized (Is Supervisor & Same Dept)")
  |> Sequence.deactivate(:timescale)
  
  # Dispatch command to GenServer
  |> Sequence.message(:liveview, :genserver, label: "GenServer.call(PID, {:send_command, :immobilize})")
  |> Sequence.activate(:genserver)
  
  # Log command record
  |> Sequence.message(:genserver, :timescale, label: "INSERT INTO device_commands (status: :pending)")
  
  # Find connection ref and push to WS
  |> Sequence.message(:genserver, :gw, label: "WS.push(connection_ref, {\"action\": \"immobilize\"})")
  |> Sequence.activate(:gw)
  |> Sequence.message(:gw, :app, label: "JSON Frame: {\"command\": \"immobilize\", \"id\": \"cmd_101\"}")
  |> Sequence.deactivate(:gw)
  
  # App execution response
  |> Sequence.message(:app, :gw, label: "ACK payload (command successful)")
  |> Sequence.activate(:gw)
  |> Sequence.message(:gw, :genserver, label: "ACK payload received")
  |> Sequence.deactivate(:gw)
  
  # Update state
  |> Sequence.message(:genserver, :timescale, label: "UPDATE device_commands SET status = :acknowledged")
  |> Sequence.return(:genserver, :liveview, label: "{:ok, :immobilized}")
  |> Sequence.deactivate(:genserver)
  
  |> Sequence.return(:liveview, :supervisor, label: "Update Dashboard UI: 'Engine immobilized successfully'")
  |> Sequence.deactivate(:liveview)
mermaid_ingest = Sequence.to_mermaid(ingestion_flow)
mermaid_command = Sequence.to_mermaid(command_flow)

Kino.Layout.tabs(
  "1. Ingestion Flow": Siren.new(mermaid_ingest, height: "650px", theme: :forest),
  "2. Command Downlink Flow": Siren.new(mermaid_command, height: "650px", theme: :forest)
)

⚙️ 5. Vehicle Connection & Session Lifecycle (FSM)

Each vehicle’s telemetry stream and command delivery path depends heavily on its connection state. We manage this status using a Finite State Machine model inside each Elixir GenServer.

alias Choreo.FSM

connection_fsm =
  FSM.new()
  |> FSM.add_initial_state(:offline, label: "No device session established")
  |> FSM.add_state(:connecting, label: "Establishing Socket / Handshaking client tokens")
  |> FSM.add_state(:active_tracking, label: "Sending telemetry every minute, connection healthy")
  |> FSM.add_state(:idle, label: "Vehicle parked (no movement detected, sending light keep-alives)")
  |> FSM.add_state(:stale_connection, label: "Telemetry missing for > 3 minutes, awaiting timeout")
  
  # Transitions
  |> FSM.add_transition(:offline, :connecting, label: "device_heartbeat")
  |> FSM.add_transition(:connecting, :active_tracking, label: "handshake_success")
  |> FSM.add_transition(:connecting, :offline, label: "handshake_timeout_or_failed")
  
  |> FSM.add_transition(:active_tracking, :idle, label: "ignition_off")
  |> FSM.add_transition(:idle, :active_tracking, label: "ignition_on")
  
  |> FSM.add_transition(:active_tracking, :stale_connection, label: "missed_telemetry_interval")
  |> FSM.add_transition(:idle, :stale_connection, label: "missed_keepalive")
  
  |> FSM.add_transition(:stale_connection, :active_tracking, label: "telemetry_received")
  |> FSM.add_transition(:stale_connection, :offline, label: "session_timeout_disconnect")
mermaid_fsm = FSM.to_mermaid(connection_fsm)
Siren.new(mermaid_fsm, height: "600px", theme: :forest)

🔍 6. System Resiliency & Outage Analysis

We simulate a component failure within this architecture to analyze cascade effects and evaluate single points of failure.

alias Choreo.Analysis

infra =
  Choreo.new()
  |> Choreo.add_user(:supervisor, label: "Department Supervisor")
  |> Choreo.add_user(:iot, label: "IoT GPS Device")
  
  # Load Balancing & Gateway
  |> Choreo.add_load_balancer(:lb, label: "Ingestion Load Balancer")
  |> Choreo.add_service(:gateway, label: "Ingestion Gateway")
  
  # Elixir Stateful Core
  |> Choreo.add_service(:genservers, label: "Vehicle GenServers")
  
  # Storage
  |> Choreo.add_cache(:redis, label: "Redis Hot Cache")
  |> Choreo.add_database(:timescaledb, label: "TimescaleDB")
  |> Choreo.add_database(:s3, label: "S3 Cold Storage Archive")
  
  # Workers
  |> Choreo.add_service(:oban, label: "Oban Report Workers")

  # Define Edges
  |> Choreo.connect(:iot, :lb)
  |> Choreo.connect(:lb, :gateway)
  |> Choreo.connect(:gateway, :genservers)
  
  # GenServer persistence & reads
  |> Choreo.connect(:genservers, :redis)
  |> Choreo.connect(:genservers, :timescaledb)
  
  # Supervisor read path
  |> Choreo.connect(:supervisor, :redis)
  |> Choreo.connect(:supervisor, :timescaledb)
  
  # Report Worker dependencies
  |> Choreo.connect(:oban, :timescaledb)
  |> Choreo.connect(:oban, :s3)

Simulation 1: TimescaleDB Database Failure

We evaluate system impact if the main database (TimescaleDB) suffers a localized crash:

affected = Analysis.impact_analysis(infra, :timescaledb)
IO.inspect(affected, label: "Affected services on TimescaleDB outage")

Architecture Mitigation Strategy

  • In-Memory Buffer Resiliency: If TimescaleDB crashes, the Elixir Vehicle GenServers can transition to caching incoming coordinates in-process (or in an ETS backup table) up to a configurable size limit (e.g., buffering 60 minutes of messages). Once the database recovers, the GenServers drain their internal buffer back into the database asynchronously.
  • Supervisors can still track live vehicles: Because live coordinate data passes directly to the Web Interface via Phoenix LiveView PubSub and the Redis Hot Cache, supervisors can view current coordinates even while past reporting databases are offline.

Simulation 2: Single Points of Failure Identification

spof = Analysis.single_points_of_failure(infra)
IO.inspect(spof.nodes, label: "Critical single points of failure")
  • Load Balancer & Gateways: Because they act as the entry point for all GPS devices, the load balancer is a single point of failure. In production, we deploy redundant DNS geoloop servers routing to multiple ingestion gateways in distinct timezones (US East, EU West, APAC) to ensure high availability.