Powered by AppSignal & Oban Pro

Sensocto Resilience Assessment

livebooks/resilience-assessment.livemd

Sensocto Resilience Assessment

Mix.install([
  {:kino, "~> 0.14"},
  {:kino_vega_lite, "~> 0.1"}
])

Overview

This Livebook provides an interactive exploration of Sensocto’s resilience architecture. Run this connected to a running Sensocto instance to see live metrics.

1. Supervision Tree Visualization

supervision_tree = """
graph TD
    A[Sensocto.Supervisor
:rest_for_one
5 restarts/10s] --> B[Infrastructure.Supervisor
:one_for_one
3 restarts/5s] A --> C[Registry.Supervisor
:one_for_one
5 restarts/5s] A --> D[Storage.Supervisor
:rest_for_one
3 restarts/5s] A --> E[Bio.Supervisor
:one_for_one] A --> F[Domain.Supervisor
:one_for_one
5 restarts/10s] A --> G[GuestUserStore] A --> H[ChatStore] A --> I[Endpoint] A --> J[AshAuthentication] B --> B1[Telemetry] B --> B2[TaskSupervisor] B --> B3[Repo] B --> B4[Repo.Replica] B --> B5[DNSCluster] B --> B6[PubSub
pool: 16] B --> B7[Presence] B --> B8[Finch] C --> C1[14 Registries
Horde + Local] D --> D1[Iroh.RoomStore] D --> D2[HydrationManager] D --> D3[RoomStore] D --> D4[RoomSync] D --> D5[RoomStateCRDT] E --> E1[NoveltyDetector] E --> E2[PredictiveLoadBalancer] E --> E3[HomeostaticTuner] E --> E4[ResourceArbiter] E --> E5[CircadianScheduler] F --> F1[AttentionTracker] F --> F2[SystemLoadMonitor] F --> F3[Lenses.Supervisor] F --> F4[TableOwner] F --> F5[SensorsDynamicSupervisor] F --> F6[RoomsDynamicSupervisor
Horde] F --> F7[CallSupervisor] F --> F8[...] style A fill:#e74c3c,color:#fff style B fill:#3498db,color:#fff style C fill:#3498db,color:#fff style D fill:#9b59b6,color:#fff style E fill:#27ae60,color:#fff style F fill:#f39c12,color:#fff """
Kino.Mermaid.new(supervision_tree)

2. Live System Health Check

defmodule ResilienceCheck do
  @doc """
  Performs comprehensive resilience checks on the running system.
  """
  def run_all_checks do
    %{
      supervisors: check_supervisors(),
      genservers: check_critical_genservers(),
      ets_tables: check_ets_tables(),
      pubsub: check_pubsub(),
      database: check_database(),
      memory: check_memory(),
      load: check_system_load()
    }
  end

  def check_supervisors do
    supervisors = [
      Sensocto.Infrastructure.Supervisor,
      Sensocto.Registry.Supervisor,
      Sensocto.Storage.Supervisor,
      Sensocto.Bio.Supervisor,
      Sensocto.Domain.Supervisor
    ]

    Enum.map(supervisors, fn sup ->
      status =
        case Process.whereis(sup) do
          nil -> :dead
          pid when is_pid(pid) -> :alive
        end

      {sup, status}
    end)
  end

  def check_critical_genservers do
    genservers = [
      Sensocto.AttentionTracker,
      Sensocto.SystemLoadMonitor,
      Sensocto.Lenses.Router,
      Sensocto.Iroh.RoomStore,
      Sensocto.RoomStore
    ]

    Enum.map(genservers, fn gs ->
      {status, queue_len} =
        case Process.whereis(gs) do
          nil ->
            {:dead, 0}

          pid ->
            case Process.info(pid, :message_queue_len) do
              {:message_queue_len, len} -> {:alive, len}
              nil -> {:dead, 0}
            end
        end

      %{name: gs, status: status, queue_length: queue_len}
    end)
  end

  def check_ets_tables do
    tables = [
      :attribute_store_hot,
      :attribute_store_warm,
      :attention_levels_cache,
      :sensor_attention_cache,
      :system_load_cache,
      :bio_novelty_scores
    ]

    Enum.map(tables, fn table ->
      case :ets.whereis(table) do
        :undefined ->
          %{table: table, exists: false, size: 0, memory_kb: 0}

        _tid ->
          size = :ets.info(table, :size)
          memory = :ets.info(table, :memory) * :erlang.system_info(:wordsize)
          memory_kb = div(memory, 1024)
          %{table: table, exists: true, size: size, memory_kb: memory_kb}
      end
    end)
  end

  def check_pubsub do
    ref = make_ref()
    topic = "health_check:#{inspect(ref)}"

    try do
      Phoenix.PubSub.subscribe(Sensocto.PubSub, topic)
      Phoenix.PubSub.broadcast(Sensocto.PubSub, topic, {:ping, ref})

      receive do
        {:ping, ^ref} ->
          Phoenix.PubSub.unsubscribe(Sensocto.PubSub, topic)
          %{healthy: true, latency_ms: 0}
      after
        1000 ->
          Phoenix.PubSub.unsubscribe(Sensocto.PubSub, topic)
          %{healthy: false, error: :timeout}
      end
    rescue
      e -> %{healthy: false, error: Exception.message(e)}
    end
  end

  def check_database do
    start_time = System.monotonic_time(:millisecond)

    try do
      Sensocto.Repo.query!("SELECT 1", [], timeout: 5000)
      latency = System.monotonic_time(:millisecond) - start_time
      %{healthy: true, latency_ms: latency}
    rescue
      e -> %{healthy: false, error: Exception.message(e)}
    catch
      :exit, _ -> %{healthy: false, error: "connection_timeout"}
    end
  end

  def check_memory do
    mem = :erlang.memory()

    %{
      total_mb: div(Keyword.get(mem, :total, 0), 1024 * 1024),
      processes_mb: div(Keyword.get(mem, :processes, 0), 1024 * 1024),
      ets_mb: div(Keyword.get(mem, :ets, 0), 1024 * 1024),
      binary_mb: div(Keyword.get(mem, :binary, 0), 1024 * 1024)
    }
  end

  def check_system_load do
    try do
      Sensocto.SystemLoadMonitor.get_metrics()
    rescue
      _ -> %{error: "SystemLoadMonitor not available"}
    catch
      :exit, _ -> %{error: "SystemLoadMonitor not available"}
    end
  end
end
# Run all checks
results = ResilienceCheck.run_all_checks()

# Display supervisor status
supervisor_status =
  results.supervisors
  |> Enum.map(fn {name, status} ->
    status_emoji = if status == :alive, do: "✅", else: "❌"
    "#{status_emoji} #{inspect(name)}"
  end)
  |> Enum.join("\n")

Kino.Markdown.new("""
## Supervisor Status

#{supervisor_status}
""")
# Display GenServer health
genserver_data =
  results.genservers
  |> Enum.map(fn gs ->
    %{
      "Name" => gs.name |> Module.split() |> Enum.take(-2) |> Enum.join("."),
      "Status" => if(gs.status == :alive, do: "✅ Alive", else: "❌ Dead"),
      "Queue" => gs.queue_length,
      "Health" => if(gs.queue_length < 100, do: "🟢", else: "🔴")
    }
  end)

Kino.DataTable.new(genserver_data, name: "Critical GenServers")
# Display ETS table memory usage
ets_data =
  results.ets_tables
  |> Enum.map(fn t ->
    %{
      "Table" => t.table,
      "Exists" => if(t.exists, do: "✅", else: "❌"),
      "Entries" => t.size,
      "Memory (KB)" => t.memory_kb
    }
  end)

Kino.DataTable.new(ets_data, name: "ETS Table Memory")

3. Backpressure System Visualization

backpressure_diagram = """
flowchart TB
    subgraph Layer1["Layer 1: Attention-Based"]
        A1[":high = 0.2x
User focused"] A2[":medium = 0.4x
User viewing"] A3[":low = 4.0x
No viewers"] A4[":none = 10.0x
No connections"] end subgraph Layer2["Layer 2: System Load"] L1[":normal = 1.0x
CPU < 50%"] L2[":elevated = 1.5x
CPU < 70%"] L3[":high = 3.0x
CPU < 85%"] L4[":critical = 5.0x
CPU >= 85%"] end subgraph Layer3["Layer 3: Biomimetic"] B1["NoveltyDetector
0.5x for anomalies"] B2["PredictiveLoadBalancer
0.75x - 1.2x"] B3["ResourceArbiter
0.5x - 5.0x"] B4["CircadianScheduler
0.85x - 1.2x"] end subgraph Layer4["Layer 4: ETS Limits"] E1["Hot: 1000 → 200"] E2["Warm: 60000 → 3000"] end Layer1 --> Layer2 Layer2 --> Layer3 Layer3 --> Layer4 Layer4 --> Final["Final Window
= base × all multipliers"] style Layer1 fill:#3498db,color:#fff style Layer2 fill:#e74c3c,color:#fff style Layer3 fill:#27ae60,color:#fff style Layer4 fill:#9b59b6,color:#fff """
Kino.Mermaid.new(backpressure_diagram)

4. Live Attention Metrics

# Get current attention state
attention_state =
  try do
    Sensocto.AttentionTracker.get_state()
  rescue
    _ -> %{attention_state: %{}, pinned_sensors: %{}, battery_states: %{}}
  catch
    :exit, _ -> %{attention_state: %{}, pinned_sensors: %{}, battery_states: %{}}
  end

sensor_count = attention_state.attention_state |> Map.keys() |> length()
pinned_count = attention_state.pinned_sensors |> Map.keys() |> length()
battery_count = attention_state.battery_states |> Map.keys() |> length()

Kino.Markdown.new("""
## Attention Tracker State

| Metric | Value |
|--------|-------|
| Sensors with attention | #{sensor_count} |
| Pinned sensors | #{pinned_count} |
| Battery states tracked | #{battery_count} |
""")

5. System Load Metrics

load_metrics = results.load

case load_metrics do
  %{error: _} ->
    Kino.Markdown.new("⚠️ SystemLoadMonitor not available")

  metrics ->
    # Create gauge-style visualization
    cpu_pct = Float.round((metrics.scheduler_utilization || 0) * 100, 1)
    mem_pct = Float.round((metrics.memory_pressure || 0) * 100, 1)
    pubsub_pct = Float.round((metrics.pubsub_pressure || 0) * 100, 1)
    queue_pct = Float.round((metrics.message_queue_pressure || 0) * 100, 1)

    load_level = metrics.load_level || :unknown

    level_emoji =
      case load_level do
        :normal -> "🟢"
        :elevated -> "🟡"
        :high -> "🟠"
        :critical -> "🔴"
        _ -> "⚪"
      end

    Kino.Markdown.new("""
    ## System Load: #{level_emoji} #{load_level}

    | Metric | Value | Threshold |
    |--------|-------|-----------|
    | CPU (Scheduler) | #{cpu_pct}% | 50% / 70% / 85% |
    | Memory Pressure | #{mem_pct}% | 70% / 80% / 90% |
    | PubSub Pressure | #{pubsub_pct}% | N/A |
    | Queue Pressure | #{queue_pct}% | N/A |
    | Load Multiplier | #{metrics.load_multiplier || 1.0}x | 1.0x - 5.0x |
    """)
end

6. Memory Budget Calculator

defmodule MemoryCalculator do
  @hot_entry_size 200  # bytes per entry
  @warm_entry_size 200  # bytes per entry
  @default_hot_limit 1000
  @default_warm_limit 60_000

  def calculate(sensor_count, attrs_per_sensor) do
    hot_per_sensor = @default_hot_limit * attrs_per_sensor * @hot_entry_size
    warm_per_sensor = @default_warm_limit * attrs_per_sensor * @warm_entry_size

    total_hot = sensor_count * hot_per_sensor
    total_warm = sensor_count * warm_per_sensor

    %{
      hot_mb: Float.round(total_hot / (1024 * 1024), 2),
      warm_mb: Float.round(total_warm / (1024 * 1024), 2),
      total_mb: Float.round((total_hot + total_warm) / (1024 * 1024), 2)
    }
  end

  def calculate_with_load(sensor_count, attrs_per_sensor, load_level) do
    multipliers = %{
      normal: %{hot: 1.0, warm: 1.0},
      elevated: %{hot: 0.8, warm: 0.5},
      high: %{hot: 0.4, warm: 0.2},
      critical: %{hot: 0.2, warm: 0.05}
    }

    mult = Map.get(multipliers, load_level, multipliers.normal)

    hot_limit = round(@default_hot_limit * mult.hot)
    warm_limit = round(@default_warm_limit * mult.warm)

    hot_per_sensor = hot_limit * attrs_per_sensor * @hot_entry_size
    warm_per_sensor = warm_limit * attrs_per_sensor * @warm_entry_size

    total_hot = sensor_count * hot_per_sensor
    total_warm = sensor_count * warm_per_sensor

    %{
      hot_mb: Float.round(total_hot / (1024 * 1024), 2),
      warm_mb: Float.round(total_warm / (1024 * 1024), 2),
      total_mb: Float.round((total_hot + total_warm) / (1024 * 1024), 2),
      hot_limit: hot_limit,
      warm_limit: warm_limit
    }
  end
end

# Calculate for different scenarios
scenarios = [
  {100, 5, :normal},
  {100, 5, :critical},
  {500, 5, :normal},
  {500, 5, :critical},
  {1000, 5, :normal},
  {1000, 5, :critical}
]

scenario_data =
  Enum.map(scenarios, fn {sensors, attrs, load} ->
    result = MemoryCalculator.calculate_with_load(sensors, attrs, load)

    %{
      "Sensors" => sensors,
      "Attrs" => attrs,
      "Load" => load,
      "Hot Limit" => result.hot_limit,
      "Warm Limit" => result.warm_limit,
      "Hot (MB)" => result.hot_mb,
      "Warm (MB)" => result.warm_mb,
      "Total (MB)" => result.total_mb
    }
  end)

Kino.DataTable.new(scenario_data, name: "Memory Budget by Scenario")

7. Circuit Breaker Simulation

defmodule CircuitBreakerDemo do
  @moduledoc """
  Demonstrates how a circuit breaker would work in Sensocto.
  This is NOT the actual implementation - it's a demonstration.
  """

  defstruct [
    state: :closed,
    failure_count: 0,
    success_count: 0,
    last_failure_time: nil,
    failure_threshold: 5,
    success_threshold: 3,
    timeout_ms: 30_000
  ]

  def new(opts \\ []) do
    %__MODULE__{
      failure_threshold: Keyword.get(opts, :failure_threshold, 5),
      success_threshold: Keyword.get(opts, :success_threshold, 3),
      timeout_ms: Keyword.get(opts, :timeout_ms, 30_000)
    }
  end

  def call(cb, success?) do
    case cb.state do
      :closed ->
        if success? do
          {:ok, %{cb | failure_count: 0}}
        else
          new_count = cb.failure_count + 1
          if new_count >= cb.failure_threshold do
            {:tripped, %{cb |
              state: :open,
              failure_count: new_count,
              last_failure_time: System.monotonic_time(:millisecond)
            }}
          else
            {:failed, %{cb | failure_count: new_count}}
          end
        end

      :open ->
        now = System.monotonic_time(:millisecond)
        if now - cb.last_failure_time > cb.timeout_ms do
          # Try half-open
          if success? do
            {:recovering, %{cb | state: :half_open, success_count: 1}}
          else
            {:still_failing, %{cb | last_failure_time: now}}
          end
        else
          {:blocked, cb}
        end

      :half_open ->
        if success? do
          new_count = cb.success_count + 1
          if new_count >= cb.success_threshold do
            {:recovered, %{cb | state: :closed, success_count: 0, failure_count: 0}}
          else
            {:improving, %{cb | success_count: new_count}}
          end
        else
          {:back_to_open, %{cb |
            state: :open,
            failure_count: cb.failure_threshold,
            success_count: 0,
            last_failure_time: System.monotonic_time(:millisecond)
          }}
        end
    end
  end

  def simulate_sequence(sequence) do
    initial = new(failure_threshold: 3, success_threshold: 2, timeout_ms: 1000)

    {_, history} =
      Enum.reduce(sequence, {initial, []}, fn event, {cb, hist} ->
        {result, new_cb} = call(cb, event == :success)
        entry = %{
          event: event,
          result: result,
          state: new_cb.state,
          failures: new_cb.failure_count,
          successes: new_cb.success_count
        }
        {new_cb, [entry | hist]}
      end)

    Enum.reverse(history)
  end
end

# Simulate a failure sequence
sequence = [
  :success, :success, :failure, :failure, :failure,  # Trip the breaker
  :blocked, :blocked,  # Blocked calls
  # Wait for timeout...
  :success, :success,  # Recover
  :success  # Back to normal
]

# For demo, we'll simulate manually
demo_sequence = [:success, :success, :failure, :failure, :failure]
results = CircuitBreakerDemo.simulate_sequence(demo_sequence)

circuit_data =
  Enum.with_index(results, 1)
  |> Enum.map(fn {r, i} ->
    %{
      "Step" => i,
      "Event" => r.event,
      "Result" => r.result,
      "State" => r.state,
      "Failures" => r.failures
    }
  end)

Kino.DataTable.new(circuit_data, name: "Circuit Breaker Simulation")
circuit_diagram = """
stateDiagram-v2
    [*] --> Closed
    Closed --> Open: failures >= threshold
    Open --> HalfOpen: timeout elapsed
    HalfOpen --> Closed: successes >= threshold
    HalfOpen --> Open: any failure
    Closed --> Closed: success

    note right of Closed
        Normal operation
        Tracking failures
    end note

    note right of Open
        All calls blocked
        Wait for timeout
    end note

    note right of HalfOpen
        Testing recovery
        Limited calls allowed
    end note
"""

Kino.Mermaid.new(circuit_diagram)

8. Blast Radius Analysis

blast_radius_data = [
  %{
    "Component" => "Single Sensor",
    "Blast Radius" => "Single sensor only",
    "Recovery" => "< 1s (auto-restart)",
    "Impact" => "🟢 Minimal"
  },
  %{
    "Component" => "AttentionTracker",
    "Blast Radius" => "All backpressure",
    "Recovery" => "< 5s",
    "Impact" => "🟡 Low - defaults to :none"
  },
  %{
    "Component" => "SystemLoadMonitor",
    "Blast Radius" => "Load detection",
    "Recovery" => "< 5s",
    "Impact" => "🟡 Low - defaults to 1.0x"
  },
  %{
    "Component" => "LensRouter",
    "Blast Radius" => "Lens data routing",
    "Recovery" => "< 5s",
    "Impact" => "🟠 Moderate - sensors unaffected"
  },
  %{
    "Component" => "TableOwner",
    "Blast Radius" => "All ETS tables lost",
    "Recovery" => "< 10s",
    "Impact" => "🔴 High - sensors re-register"
  },
  %{
    "Component" => "Iroh.RoomStore",
    "Blast Radius" => "Storage cascade",
    "Recovery" => "10-30s",
    "Impact" => "🔴 Moderate - in-memory preserved"
  },
  %{
    "Component" => "Phoenix.PubSub",
    "Blast Radius" => "All real-time updates",
    "Recovery" => "10-30s",
    "Impact" => "🔴 High"
  },
  %{
    "Component" => "Sensocto.Repo",
    "Blast Radius" => "All database ops",
    "Recovery" => "10-60s",
    "Impact" => "🔴 Critical"
  }
]

Kino.DataTable.new(blast_radius_data, name: "Blast Radius Analysis")

9. Recommendations Summary

recommendations = """
## Priority Actions

### P0 - Critical (Do Immediately)

1. **Add Circuit Breakers**
   - Target: `Iroh.RoomStore`, `Sensocto.Repo`
   - Library: Consider `fuse` or custom GenServer
   - Impact: Prevents cascade failures

2. **Add GenServer Call Timeouts**
   - Target: All cross-process calls
   - Default: 5000ms
   - Impact: Prevents hanging processes

### P1 - High (Next Sprint)

3. **Clean NoveltyDetector State**
   - Add periodic cleanup of deleted sensors
   - Bound `sensor_stats` map growth
   - Impact: Memory stability

4. **Move GuestUserStore Under Supervisor**
   - Create `Session.Supervisor`
   - Add `ChatStore` as sibling
   - Impact: Proper failure isolation

### P2 - Medium (Future)

5. **Add Iroh to Health Check**
   - Check `Iroh.RoomStore.ready?()`
   - Include in `/health/ready`
   - Impact: Better observability

6. **Consider LensRouter Sharding**
   - At 1000+ sensors
   - Partition by sensor_id hash
   - Impact: Horizontal scalability
"""

Kino.Markdown.new(recommendations)

10. Interactive Health Dashboard

# Create a live-updating health dashboard
frame = Kino.Frame.new()

update_dashboard = fn ->
  checks = ResilienceCheck.run_all_checks()

  # Supervisor health
  sup_health =
    checks.supervisors
    |> Enum.all?(fn {_, status} -> status == :alive end)

  # GenServer health
  gs_health =
    checks.genservers
    |> Enum.all?(fn gs -> gs.status == :alive and gs.queue_length < 100 end)

  # PubSub health
  pubsub_health = checks.pubsub.healthy

  # Database health
  db_health = checks.database.healthy

  overall =
    cond do
      sup_health and gs_health and pubsub_health and db_health -> "🟢 Healthy"
      db_health and sup_health -> "🟡 Degraded"
      sup_health -> "🟠 Impaired"
      true -> "🔴 Critical"
    end

  memory = checks.memory

  content = Kino.Markdown.new("""
  ## System Health: #{overall}

  | Component | Status |
  |-----------|--------|
  | Supervisors | #{if sup_health, do: "✅", else: "❌"} |
  | GenServers | #{if gs_health, do: "✅", else: "❌"} |
  | PubSub | #{if pubsub_health, do: "✅", else: "❌"} |
  | Database | #{if db_health, do: "✅", else: "❌"} |

  ### Memory Usage
  - Total: #{memory.total_mb} MB
  - Processes: #{memory.processes_mb} MB
  - ETS: #{memory.ets_mb} MB
  - Binary: #{memory.binary_mb} MB

  _Last updated: #{DateTime.utc_now() |> DateTime.to_string()}_
  """)

  Kino.Frame.render(frame, content)
end

# Initial render
update_dashboard.()

# Show frame
frame
# Button to refresh dashboard
button = Kino.Control.button("Refresh Health Check")

Kino.listen(button, fn _event ->
  update_dashboard.()
end)

button

Conclusion

This Livebook provides interactive tools for assessing Sensocto’s resilience:

  1. Supervision Tree Visualization - Understand failure isolation domains
  2. Live Health Checks - Verify system components are running
  3. Backpressure Analysis - Understand the multi-layer throttling system
  4. Memory Budget Calculator - Plan for scaling
  5. Circuit Breaker Demo - See how circuit breakers would protect the system
  6. Blast Radius Analysis - Understand failure impact

Run this regularly to monitor system health and identify potential issues before they become critical.