Sensocto Resilience Assessment
Mix.install([
{:kino, "~> 0.14"},
{:kino_vega_lite, "~> 0.1"}
])
Overview
This Livebook provides an interactive exploration of Sensocto’s resilience architecture. Run this connected to a running Sensocto instance to see live metrics.
1. Supervision Tree Visualization
supervision_tree = """
graph TD
A[Sensocto.Supervisor
:rest_for_one
5 restarts/10s] --> B[Infrastructure.Supervisor
:one_for_one
3 restarts/5s]
A --> C[Registry.Supervisor
:one_for_one
5 restarts/5s]
A --> D[Storage.Supervisor
:rest_for_one
3 restarts/5s]
A --> E[Bio.Supervisor
:one_for_one]
A --> F[Domain.Supervisor
:one_for_one
5 restarts/10s]
A --> G[GuestUserStore]
A --> H[ChatStore]
A --> I[Endpoint]
A --> J[AshAuthentication]
B --> B1[Telemetry]
B --> B2[TaskSupervisor]
B --> B3[Repo]
B --> B4[Repo.Replica]
B --> B5[DNSCluster]
B --> B6[PubSub
pool: 16]
B --> B7[Presence]
B --> B8[Finch]
C --> C1[14 Registries
Horde + Local]
D --> D1[Iroh.RoomStore]
D --> D2[HydrationManager]
D --> D3[RoomStore]
D --> D4[RoomSync]
D --> D5[RoomStateCRDT]
E --> E1[NoveltyDetector]
E --> E2[PredictiveLoadBalancer]
E --> E3[HomeostaticTuner]
E --> E4[ResourceArbiter]
E --> E5[CircadianScheduler]
F --> F1[AttentionTracker]
F --> F2[SystemLoadMonitor]
F --> F3[Lenses.Supervisor]
F --> F4[TableOwner]
F --> F5[SensorsDynamicSupervisor]
F --> F6[RoomsDynamicSupervisor
Horde]
F --> F7[CallSupervisor]
F --> F8[...]
style A fill:#e74c3c,color:#fff
style B fill:#3498db,color:#fff
style C fill:#3498db,color:#fff
style D fill:#9b59b6,color:#fff
style E fill:#27ae60,color:#fff
style F fill:#f39c12,color:#fff
"""
Kino.Mermaid.new(supervision_tree)
2. Live System Health Check
defmodule ResilienceCheck do
@doc """
Performs comprehensive resilience checks on the running system.
"""
def run_all_checks do
%{
supervisors: check_supervisors(),
genservers: check_critical_genservers(),
ets_tables: check_ets_tables(),
pubsub: check_pubsub(),
database: check_database(),
memory: check_memory(),
load: check_system_load()
}
end
def check_supervisors do
supervisors = [
Sensocto.Infrastructure.Supervisor,
Sensocto.Registry.Supervisor,
Sensocto.Storage.Supervisor,
Sensocto.Bio.Supervisor,
Sensocto.Domain.Supervisor
]
Enum.map(supervisors, fn sup ->
status =
case Process.whereis(sup) do
nil -> :dead
pid when is_pid(pid) -> :alive
end
{sup, status}
end)
end
def check_critical_genservers do
genservers = [
Sensocto.AttentionTracker,
Sensocto.SystemLoadMonitor,
Sensocto.Lenses.Router,
Sensocto.Iroh.RoomStore,
Sensocto.RoomStore
]
Enum.map(genservers, fn gs ->
{status, queue_len} =
case Process.whereis(gs) do
nil ->
{:dead, 0}
pid ->
case Process.info(pid, :message_queue_len) do
{:message_queue_len, len} -> {:alive, len}
nil -> {:dead, 0}
end
end
%{name: gs, status: status, queue_length: queue_len}
end)
end
def check_ets_tables do
tables = [
:attribute_store_hot,
:attribute_store_warm,
:attention_levels_cache,
:sensor_attention_cache,
:system_load_cache,
:bio_novelty_scores
]
Enum.map(tables, fn table ->
case :ets.whereis(table) do
:undefined ->
%{table: table, exists: false, size: 0, memory_kb: 0}
_tid ->
size = :ets.info(table, :size)
memory = :ets.info(table, :memory) * :erlang.system_info(:wordsize)
memory_kb = div(memory, 1024)
%{table: table, exists: true, size: size, memory_kb: memory_kb}
end
end)
end
def check_pubsub do
ref = make_ref()
topic = "health_check:#{inspect(ref)}"
try do
Phoenix.PubSub.subscribe(Sensocto.PubSub, topic)
Phoenix.PubSub.broadcast(Sensocto.PubSub, topic, {:ping, ref})
receive do
{:ping, ^ref} ->
Phoenix.PubSub.unsubscribe(Sensocto.PubSub, topic)
%{healthy: true, latency_ms: 0}
after
1000 ->
Phoenix.PubSub.unsubscribe(Sensocto.PubSub, topic)
%{healthy: false, error: :timeout}
end
rescue
e -> %{healthy: false, error: Exception.message(e)}
end
end
def check_database do
start_time = System.monotonic_time(:millisecond)
try do
Sensocto.Repo.query!("SELECT 1", [], timeout: 5000)
latency = System.monotonic_time(:millisecond) - start_time
%{healthy: true, latency_ms: latency}
rescue
e -> %{healthy: false, error: Exception.message(e)}
catch
:exit, _ -> %{healthy: false, error: "connection_timeout"}
end
end
def check_memory do
mem = :erlang.memory()
%{
total_mb: div(Keyword.get(mem, :total, 0), 1024 * 1024),
processes_mb: div(Keyword.get(mem, :processes, 0), 1024 * 1024),
ets_mb: div(Keyword.get(mem, :ets, 0), 1024 * 1024),
binary_mb: div(Keyword.get(mem, :binary, 0), 1024 * 1024)
}
end
def check_system_load do
try do
Sensocto.SystemLoadMonitor.get_metrics()
rescue
_ -> %{error: "SystemLoadMonitor not available"}
catch
:exit, _ -> %{error: "SystemLoadMonitor not available"}
end
end
end
# Run all checks
results = ResilienceCheck.run_all_checks()
# Display supervisor status
supervisor_status =
results.supervisors
|> Enum.map(fn {name, status} ->
status_emoji = if status == :alive, do: "✅", else: "❌"
"#{status_emoji} #{inspect(name)}"
end)
|> Enum.join("\n")
Kino.Markdown.new("""
## Supervisor Status
#{supervisor_status}
""")
# Display GenServer health
genserver_data =
results.genservers
|> Enum.map(fn gs ->
%{
"Name" => gs.name |> Module.split() |> Enum.take(-2) |> Enum.join("."),
"Status" => if(gs.status == :alive, do: "✅ Alive", else: "❌ Dead"),
"Queue" => gs.queue_length,
"Health" => if(gs.queue_length < 100, do: "🟢", else: "🔴")
}
end)
Kino.DataTable.new(genserver_data, name: "Critical GenServers")
# Display ETS table memory usage
ets_data =
results.ets_tables
|> Enum.map(fn t ->
%{
"Table" => t.table,
"Exists" => if(t.exists, do: "✅", else: "❌"),
"Entries" => t.size,
"Memory (KB)" => t.memory_kb
}
end)
Kino.DataTable.new(ets_data, name: "ETS Table Memory")
3. Backpressure System Visualization
backpressure_diagram = """
flowchart TB
subgraph Layer1["Layer 1: Attention-Based"]
A1[":high = 0.2x
User focused"]
A2[":medium = 0.4x
User viewing"]
A3[":low = 4.0x
No viewers"]
A4[":none = 10.0x
No connections"]
end
subgraph Layer2["Layer 2: System Load"]
L1[":normal = 1.0x
CPU < 50%"]
L2[":elevated = 1.5x
CPU < 70%"]
L3[":high = 3.0x
CPU < 85%"]
L4[":critical = 5.0x
CPU >= 85%"]
end
subgraph Layer3["Layer 3: Biomimetic"]
B1["NoveltyDetector
0.5x for anomalies"]
B2["PredictiveLoadBalancer
0.75x - 1.2x"]
B3["ResourceArbiter
0.5x - 5.0x"]
B4["CircadianScheduler
0.85x - 1.2x"]
end
subgraph Layer4["Layer 4: ETS Limits"]
E1["Hot: 1000 → 200"]
E2["Warm: 60000 → 3000"]
end
Layer1 --> Layer2
Layer2 --> Layer3
Layer3 --> Layer4
Layer4 --> Final["Final Window
= base × all multipliers"]
style Layer1 fill:#3498db,color:#fff
style Layer2 fill:#e74c3c,color:#fff
style Layer3 fill:#27ae60,color:#fff
style Layer4 fill:#9b59b6,color:#fff
"""
Kino.Mermaid.new(backpressure_diagram)
4. Live Attention Metrics
# Get current attention state
attention_state =
try do
Sensocto.AttentionTracker.get_state()
rescue
_ -> %{attention_state: %{}, pinned_sensors: %{}, battery_states: %{}}
catch
:exit, _ -> %{attention_state: %{}, pinned_sensors: %{}, battery_states: %{}}
end
sensor_count = attention_state.attention_state |> Map.keys() |> length()
pinned_count = attention_state.pinned_sensors |> Map.keys() |> length()
battery_count = attention_state.battery_states |> Map.keys() |> length()
Kino.Markdown.new("""
## Attention Tracker State
| Metric | Value |
|--------|-------|
| Sensors with attention | #{sensor_count} |
| Pinned sensors | #{pinned_count} |
| Battery states tracked | #{battery_count} |
""")
5. System Load Metrics
load_metrics = results.load
case load_metrics do
%{error: _} ->
Kino.Markdown.new("⚠️ SystemLoadMonitor not available")
metrics ->
# Create gauge-style visualization
cpu_pct = Float.round((metrics.scheduler_utilization || 0) * 100, 1)
mem_pct = Float.round((metrics.memory_pressure || 0) * 100, 1)
pubsub_pct = Float.round((metrics.pubsub_pressure || 0) * 100, 1)
queue_pct = Float.round((metrics.message_queue_pressure || 0) * 100, 1)
load_level = metrics.load_level || :unknown
level_emoji =
case load_level do
:normal -> "🟢"
:elevated -> "🟡"
:high -> "🟠"
:critical -> "🔴"
_ -> "⚪"
end
Kino.Markdown.new("""
## System Load: #{level_emoji} #{load_level}
| Metric | Value | Threshold |
|--------|-------|-----------|
| CPU (Scheduler) | #{cpu_pct}% | 50% / 70% / 85% |
| Memory Pressure | #{mem_pct}% | 70% / 80% / 90% |
| PubSub Pressure | #{pubsub_pct}% | N/A |
| Queue Pressure | #{queue_pct}% | N/A |
| Load Multiplier | #{metrics.load_multiplier || 1.0}x | 1.0x - 5.0x |
""")
end
6. Memory Budget Calculator
defmodule MemoryCalculator do
@hot_entry_size 200 # bytes per entry
@warm_entry_size 200 # bytes per entry
@default_hot_limit 1000
@default_warm_limit 60_000
def calculate(sensor_count, attrs_per_sensor) do
hot_per_sensor = @default_hot_limit * attrs_per_sensor * @hot_entry_size
warm_per_sensor = @default_warm_limit * attrs_per_sensor * @warm_entry_size
total_hot = sensor_count * hot_per_sensor
total_warm = sensor_count * warm_per_sensor
%{
hot_mb: Float.round(total_hot / (1024 * 1024), 2),
warm_mb: Float.round(total_warm / (1024 * 1024), 2),
total_mb: Float.round((total_hot + total_warm) / (1024 * 1024), 2)
}
end
def calculate_with_load(sensor_count, attrs_per_sensor, load_level) do
multipliers = %{
normal: %{hot: 1.0, warm: 1.0},
elevated: %{hot: 0.8, warm: 0.5},
high: %{hot: 0.4, warm: 0.2},
critical: %{hot: 0.2, warm: 0.05}
}
mult = Map.get(multipliers, load_level, multipliers.normal)
hot_limit = round(@default_hot_limit * mult.hot)
warm_limit = round(@default_warm_limit * mult.warm)
hot_per_sensor = hot_limit * attrs_per_sensor * @hot_entry_size
warm_per_sensor = warm_limit * attrs_per_sensor * @warm_entry_size
total_hot = sensor_count * hot_per_sensor
total_warm = sensor_count * warm_per_sensor
%{
hot_mb: Float.round(total_hot / (1024 * 1024), 2),
warm_mb: Float.round(total_warm / (1024 * 1024), 2),
total_mb: Float.round((total_hot + total_warm) / (1024 * 1024), 2),
hot_limit: hot_limit,
warm_limit: warm_limit
}
end
end
# Calculate for different scenarios
scenarios = [
{100, 5, :normal},
{100, 5, :critical},
{500, 5, :normal},
{500, 5, :critical},
{1000, 5, :normal},
{1000, 5, :critical}
]
scenario_data =
Enum.map(scenarios, fn {sensors, attrs, load} ->
result = MemoryCalculator.calculate_with_load(sensors, attrs, load)
%{
"Sensors" => sensors,
"Attrs" => attrs,
"Load" => load,
"Hot Limit" => result.hot_limit,
"Warm Limit" => result.warm_limit,
"Hot (MB)" => result.hot_mb,
"Warm (MB)" => result.warm_mb,
"Total (MB)" => result.total_mb
}
end)
Kino.DataTable.new(scenario_data, name: "Memory Budget by Scenario")
7. Circuit Breaker Simulation
defmodule CircuitBreakerDemo do
@moduledoc """
Demonstrates how a circuit breaker would work in Sensocto.
This is NOT the actual implementation - it's a demonstration.
"""
defstruct [
state: :closed,
failure_count: 0,
success_count: 0,
last_failure_time: nil,
failure_threshold: 5,
success_threshold: 3,
timeout_ms: 30_000
]
def new(opts \\ []) do
%__MODULE__{
failure_threshold: Keyword.get(opts, :failure_threshold, 5),
success_threshold: Keyword.get(opts, :success_threshold, 3),
timeout_ms: Keyword.get(opts, :timeout_ms, 30_000)
}
end
def call(cb, success?) do
case cb.state do
:closed ->
if success? do
{:ok, %{cb | failure_count: 0}}
else
new_count = cb.failure_count + 1
if new_count >= cb.failure_threshold do
{:tripped, %{cb |
state: :open,
failure_count: new_count,
last_failure_time: System.monotonic_time(:millisecond)
}}
else
{:failed, %{cb | failure_count: new_count}}
end
end
:open ->
now = System.monotonic_time(:millisecond)
if now - cb.last_failure_time > cb.timeout_ms do
# Try half-open
if success? do
{:recovering, %{cb | state: :half_open, success_count: 1}}
else
{:still_failing, %{cb | last_failure_time: now}}
end
else
{:blocked, cb}
end
:half_open ->
if success? do
new_count = cb.success_count + 1
if new_count >= cb.success_threshold do
{:recovered, %{cb | state: :closed, success_count: 0, failure_count: 0}}
else
{:improving, %{cb | success_count: new_count}}
end
else
{:back_to_open, %{cb |
state: :open,
failure_count: cb.failure_threshold,
success_count: 0,
last_failure_time: System.monotonic_time(:millisecond)
}}
end
end
end
def simulate_sequence(sequence) do
initial = new(failure_threshold: 3, success_threshold: 2, timeout_ms: 1000)
{_, history} =
Enum.reduce(sequence, {initial, []}, fn event, {cb, hist} ->
{result, new_cb} = call(cb, event == :success)
entry = %{
event: event,
result: result,
state: new_cb.state,
failures: new_cb.failure_count,
successes: new_cb.success_count
}
{new_cb, [entry | hist]}
end)
Enum.reverse(history)
end
end
# Simulate a failure sequence
sequence = [
:success, :success, :failure, :failure, :failure, # Trip the breaker
:blocked, :blocked, # Blocked calls
# Wait for timeout...
:success, :success, # Recover
:success # Back to normal
]
# For demo, we'll simulate manually
demo_sequence = [:success, :success, :failure, :failure, :failure]
results = CircuitBreakerDemo.simulate_sequence(demo_sequence)
circuit_data =
Enum.with_index(results, 1)
|> Enum.map(fn {r, i} ->
%{
"Step" => i,
"Event" => r.event,
"Result" => r.result,
"State" => r.state,
"Failures" => r.failures
}
end)
Kino.DataTable.new(circuit_data, name: "Circuit Breaker Simulation")
circuit_diagram = """
stateDiagram-v2
[*] --> Closed
Closed --> Open: failures >= threshold
Open --> HalfOpen: timeout elapsed
HalfOpen --> Closed: successes >= threshold
HalfOpen --> Open: any failure
Closed --> Closed: success
note right of Closed
Normal operation
Tracking failures
end note
note right of Open
All calls blocked
Wait for timeout
end note
note right of HalfOpen
Testing recovery
Limited calls allowed
end note
"""
Kino.Mermaid.new(circuit_diagram)
8. Blast Radius Analysis
blast_radius_data = [
%{
"Component" => "Single Sensor",
"Blast Radius" => "Single sensor only",
"Recovery" => "< 1s (auto-restart)",
"Impact" => "🟢 Minimal"
},
%{
"Component" => "AttentionTracker",
"Blast Radius" => "All backpressure",
"Recovery" => "< 5s",
"Impact" => "🟡 Low - defaults to :none"
},
%{
"Component" => "SystemLoadMonitor",
"Blast Radius" => "Load detection",
"Recovery" => "< 5s",
"Impact" => "🟡 Low - defaults to 1.0x"
},
%{
"Component" => "LensRouter",
"Blast Radius" => "Lens data routing",
"Recovery" => "< 5s",
"Impact" => "🟠 Moderate - sensors unaffected"
},
%{
"Component" => "TableOwner",
"Blast Radius" => "All ETS tables lost",
"Recovery" => "< 10s",
"Impact" => "🔴 High - sensors re-register"
},
%{
"Component" => "Iroh.RoomStore",
"Blast Radius" => "Storage cascade",
"Recovery" => "10-30s",
"Impact" => "🔴 Moderate - in-memory preserved"
},
%{
"Component" => "Phoenix.PubSub",
"Blast Radius" => "All real-time updates",
"Recovery" => "10-30s",
"Impact" => "🔴 High"
},
%{
"Component" => "Sensocto.Repo",
"Blast Radius" => "All database ops",
"Recovery" => "10-60s",
"Impact" => "🔴 Critical"
}
]
Kino.DataTable.new(blast_radius_data, name: "Blast Radius Analysis")
9. Recommendations Summary
recommendations = """
## Priority Actions
### P0 - Critical (Do Immediately)
1. **Add Circuit Breakers**
- Target: `Iroh.RoomStore`, `Sensocto.Repo`
- Library: Consider `fuse` or custom GenServer
- Impact: Prevents cascade failures
2. **Add GenServer Call Timeouts**
- Target: All cross-process calls
- Default: 5000ms
- Impact: Prevents hanging processes
### P1 - High (Next Sprint)
3. **Clean NoveltyDetector State**
- Add periodic cleanup of deleted sensors
- Bound `sensor_stats` map growth
- Impact: Memory stability
4. **Move GuestUserStore Under Supervisor**
- Create `Session.Supervisor`
- Add `ChatStore` as sibling
- Impact: Proper failure isolation
### P2 - Medium (Future)
5. **Add Iroh to Health Check**
- Check `Iroh.RoomStore.ready?()`
- Include in `/health/ready`
- Impact: Better observability
6. **Consider LensRouter Sharding**
- At 1000+ sensors
- Partition by sensor_id hash
- Impact: Horizontal scalability
"""
Kino.Markdown.new(recommendations)
10. Interactive Health Dashboard
# Create a live-updating health dashboard
frame = Kino.Frame.new()
update_dashboard = fn ->
checks = ResilienceCheck.run_all_checks()
# Supervisor health
sup_health =
checks.supervisors
|> Enum.all?(fn {_, status} -> status == :alive end)
# GenServer health
gs_health =
checks.genservers
|> Enum.all?(fn gs -> gs.status == :alive and gs.queue_length < 100 end)
# PubSub health
pubsub_health = checks.pubsub.healthy
# Database health
db_health = checks.database.healthy
overall =
cond do
sup_health and gs_health and pubsub_health and db_health -> "🟢 Healthy"
db_health and sup_health -> "🟡 Degraded"
sup_health -> "🟠 Impaired"
true -> "🔴 Critical"
end
memory = checks.memory
content = Kino.Markdown.new("""
## System Health: #{overall}
| Component | Status |
|-----------|--------|
| Supervisors | #{if sup_health, do: "✅", else: "❌"} |
| GenServers | #{if gs_health, do: "✅", else: "❌"} |
| PubSub | #{if pubsub_health, do: "✅", else: "❌"} |
| Database | #{if db_health, do: "✅", else: "❌"} |
### Memory Usage
- Total: #{memory.total_mb} MB
- Processes: #{memory.processes_mb} MB
- ETS: #{memory.ets_mb} MB
- Binary: #{memory.binary_mb} MB
_Last updated: #{DateTime.utc_now() |> DateTime.to_string()}_
""")
Kino.Frame.render(frame, content)
end
# Initial render
update_dashboard.()
# Show frame
frame
# Button to refresh dashboard
button = Kino.Control.button("Refresh Health Check")
Kino.listen(button, fn _event ->
update_dashboard.()
end)
button
Conclusion
This Livebook provides interactive tools for assessing Sensocto’s resilience:
- Supervision Tree Visualization - Understand failure isolation domains
- Live Health Checks - Verify system components are running
- Backpressure Analysis - Understand the multi-layer throttling system
- Memory Budget Calculator - Plan for scaling
- Circuit Breaker Demo - See how circuit breakers would protect the system
- Blast Radius Analysis - Understand failure impact
Run this regularly to monitor system health and identify potential issues before they become critical.