Iroh Graphviz Observer
Mix.install([
{:kino, "~> 0.15.3"},
# {:iroh_ex, "~> 0.0.10"},
{:iroh_ex, path: "/Users/adrianibanez/Documents/projects/2024_sensor-platform/checkouts/iroh_ex"},
# {:iroh_ex, path: "/home/adrianibanez/projects/2025_sensocto/checkouts/iroh_ex"},
{:rustler, "~> 0.36.1", override: true},
{:rustler_precompiled, "~> 0.7"}
])
Livebook Sigma
defmodule SigmaGraph do
use Kino.JS
use Kino.JS.Live
def new do
Kino.JS.Live.new(__MODULE__, %{})
end
# Add a node
def add_node(kino, node, replace \\ false) do
Kino.JS.Live.cast(kino, {:add_node, node, replace})
end
def merge_node(kino, node) do
Kino.JS.Live.cast(kino, {:merge_node, node})
end
# Remove a node
def remove_node(kino, node_id) do
Kino.JS.Live.cast(kino, {:remove_node, node_id})
end
# Add an edge
def add_edge(kino, edge) do
Kino.JS.Live.cast(kino, {:add_edge, edge})
end
# Add many edges
def add_edges_batch(kino, edges) do
Kino.JS.Live.cast(kino, {:add_edges_batch, edges})
end
# Remove an edge
def remove_edge(kino, edge) do
Kino.JS.Live.cast(kino, {:remove_edge, edge})
end
def export_image(kino, config) do
Kino.JS.Live.cast(kino, {:export_image, config})
end
def render(kino) do
Kino.JS.Live.cast(kino, :render)
end
def clear(kino) do
Kino.JS.Live.cast(kino, :clear)
end
def test(kino, payload) do
Kino.JS.Live.cast(kino, {:test, payload})
end
@impl true
def init(_, ctx) do
{:ok, assign(ctx, nodes: %{}, edges: [])}
end
@impl true
def handle_connect(ctx) do
{:ok, ctx.assigns, ctx}
end
@impl true
def handle_cast({:test, payload}, ctx) do
# IO.inspect(payload, label: "Test")
broadcast_event(ctx, "test", payload)
{:noreply, ctx}
end
@impl true
def handle_cast(:clear, ctx) do
# IO.inspect(payload, label: "Test")
broadcast_event(ctx, "clear", [])
{:noreply, ctx}
end
@impl true
def handle_cast({:add_node, node, replace}, ctx) do
default_values = %{
x: :rand.uniform(10),
y: :rand.uniform(10),
size: 10,
color: "#1f77b4",
type: "border",
borderColor: "#ddd"
}
node = Map.merge(default_values, node)
# IO.inspect(node, label: "Node debug")
ctx = update_in(ctx.assigns.nodes, &Map.put(&1, node.id, node))
# IO.inspect(node, label: "Add Node")
# IO.inspect(ctx, label: "Add Node ctx")
broadcast_event(
ctx,
"add_node",
node |> Map.put(:replace, replace)
)
{:noreply, ctx}
end
@impl true
def handle_cast({:merge_node, node}, ctx) do
ctx =
update_in(ctx.assigns.nodes, fn nodes ->
Map.update(nodes, node.id, node, &Map.merge(&1, node))
end)
broadcast_event(
ctx,
"merge_node",
node
)
{:noreply, ctx}
end
@impl true
def handle_cast({:add_edge, edge}, ctx) do
ctx = update_in(ctx.assigns.edges, &[edge | &1])
broadcast_event(ctx, "add_edge", edge)
{:noreply, ctx}
end
@impl true
def handle_cast({:add_edges_batch, edges}, ctx) do
ctx = update_in(ctx.assigns.edges, fn
nil -> edges
existing_edges -> existing_edges ++ edges
end)
broadcast_event(ctx, "add_edges_batch", %{edges: edges})
{:noreply, ctx}
end
@impl true
def handle_cast(:render, ctx) do
broadcast_event(ctx, "render", [])
{:noreply, ctx}
end
@impl true
def handle_cast({:remove_node, node_id}, ctx) do
ctx = update_in(ctx.assigns.nodes, &Map.delete(&1, node_id))
# Correctly filter out edges that involve the removed node
ctx =
update_in(
ctx.assigns.edges,
&Enum.reject(&1, fn edge ->
edge.source == node_id || edge.target == node_id
end)
)
broadcast_event(ctx, "remove_node", node_id)
{:noreply, ctx}
end
@impl true
def handle_cast({:remove_edge, edge}, ctx) do
ctx =
update_in(
ctx.assigns.edges,
&Enum.reject(&1, fn
e -> edge.source == e.source && edge.target == e.target
end)
)
broadcast_event(ctx, "remove_edge", edge)
{:noreply, ctx}
end
@impl true
def handle_cast({:export_image, _config}, ctx) do
broadcast_event(ctx, "export_image", %{width: 3000, height: 3000})
{:noreply, ctx}
end
@impl true
def handle_info(msg, ctx) do
IO.puts("#{__MODULE__} Catchall #{inspect(msg)}")
{:noreply, ctx}
end
asset "main.css" do
"""
#container {
min-height: 800px;
min-width: 1500px;
}
"""
end
asset "main.js" do
"""
import Sigma from 'https://cdn.jsdelivr.net/npm/sigma@3.0.1/+esm';
import Graph from 'https://cdn.jsdelivr.net/npm/graphology@0.26.0/+esm';
import graphologyLayout from 'https://cdn.jsdelivr.net/npm/graphology-layout@0.6.1/+esm';
import FA2Layout from 'https://cdn.jsdelivr.net/npm/graphology-layout-forceatlas2@0.10.1/+esm';
import graphOlogyLayoutForce from 'https://cdn.jsdelivr.net/npm/graphology-layout-force@0.2.4/+esm';
import graphologyLayoutNoverlap from 'https://cdn.jsdelivr.net/npm/graphology-layout-noverlap@0.4.2/+esm';
import sigmaEdgeCurve from 'https://cdn.jsdelivr.net/npm/@sigma/edge-curve@3.1.0/+esm';
import debounce from 'https://cdn.jsdelivr.net/npm/debounce@2.2.0/+esm';
import { downloadAsImage } from 'https://cdn.jsdelivr.net/npm/@sigma/export-image@3.0.0/+esm';
import { NodeBorderProgram } from 'https://cdn.jsdelivr.net/npm/@sigma/node-border@3.0.0/+esm';
import sigmanodeImage from 'https://cdn.jsdelivr.net/npm/@sigma/node-image@3.0.0/+esm';
const { circular } = graphologyLayout;
const gridDefaultOffset = 30;
var gridOffset = gridDefaultOffset;
var gridYOffset = 0;
// --- Configuration ---
const DIMMED_COLOR = "rgba(50, 50, 50, 0.7)";
// const DIMMED_COLOR = "#ddd";
export function init(ctx, payload) {
const container = document.createElement("div");
container.style.width = "100%";
container.style.height = "800px";
container.style.resize = "both";
container.style.overflow = "auto";
container.style.border = "1px solid #ccc";
container.style.backgroundColor = "#000";
ctx.root.appendChild(container);
// --- Reducers ---
let exportMode = false;
/**
* Decides the final attributes for each node based on the hover state.
* This function is passed to the Sigma constructor.
*/
const nodeReducer = (nodeId, attributes) => {
// Base attributes to potentially modify
const newAttributes = { ...attributes };
if (exportMode) {
newAttributes.size = attributes.size * 3; // Increase size only for export
}
if (hoveredNodeId) {
// A node is hovered
if (nodeId === hoveredNodeId || hoveredNeighbors.has(nodeId)) {
// This node IS the hovered one or a direct neighbor: HIGHLIGHT
newAttributes.zIndex = 1;
// ** Crucially: DO NOT set 'color' here **
// By not setting it, Sigma will use the color from the
// original graph data (your peer count color) or the default.
// We only remove the potential 'color' override below if needed.
if (newAttributes.color === DIMMED_COLOR) {
// If it was previously dimmed, remove the override
delete newAttributes.color;
}
} else {
// This node is NOT hovered or a neighbor: DIM
newAttributes.color = DIMMED_COLOR;
newAttributes.zIndex = 0;
}
} else {
// No node is hovered: RESET state for all nodes
newAttributes.zIndex = 0;
// If the node currently has the dimmed color, remove the override
// to revert to its original/default color.
if (newAttributes.color === DIMMED_COLOR) {
delete newAttributes.color;
}
}
return newAttributes;
};
/**
* Decides the final attributes for each edge based on the hover state.
* This function is passed to the Sigma constructor.
*/
const edgeReducer = (edgeId, attributes, sourceId, targetId) => {
const newAttributes = { ...attributes };
if (hoveredNodeId) {
const isSourceHovered = sourceId === hoveredNodeId;
const isTargetHovered = targetId === hoveredNodeId;
const isSourceNeighbor = hoveredNeighbors.has(sourceId);
const isTargetNeighbor = hoveredNeighbors.has(targetId);
// Highlight edges that connect the hovered node directly to a neighbor
if (isSourceHovered || isTargetHovered || isSourceNeighbor || isTargetNeighbor) {
newAttributes.zIndex = 1; // Bring to front
newAttributes.color = attributes.originalColor || attributes.color; // Restore original color
} else {
newAttributes.zIndex = -2; // Send to back, but not invisible
newAttributes.color = DIMMED_COLOR; // Make it less prominent
}
} else {
// Restore default edge appearance when no node is hovered
newAttributes.zIndex = 0;
newAttributes.color = attributes.originalColor || attributes.color; // Keep the original color
}
return newAttributes;
};
const graph = new Graph();
const renderer = new Sigma(graph, container, {
labelDensity: 0.2,
labelRenderedSizeThreshold: 10,
defaultLabelSize: 14,
nodeReducer: nodeReducer,
edgeReducer: edgeReducer,
nodeProgramClasses: {
border: NodeBorderProgram,
},
edgeProgramClasses: {
curved: sigmaEdgeCurve,
},
});
graphologyLayoutNoverlap
// const positions = graphologyLayoutNoverlap(graph, {maxIterations: 50});
// With settings:
/*const positions = graphologyLayoutNoverlap(graph, {
maxIterations: 50,
settings: {
gridSize: 1,
ratio: 2
}
});*/
//graphologyLayoutNoverlap.assign(graph);
const sensibleSettings = FA2Layout.inferSettings(graph);
/*FA2Layout.assign(graph, {
iterations: 200, // adjust based on node count
settings: {
gravity: 0.05, // Lower gravity to avoid clustering
scalingRatio: 4.0, // Prevent nodes from getting too close
strongGravityMode: false,
barnesHutOptimize: true, // Faster computation for large graphs
barnesHutTheta: 0.6,
adjustSizes: true, // Keeps nodes from overlapping
edgeWeightInfluence: 0.1, // Less influence from edges
slowDown: 10, // Ensures more gradual movement
}
});*/
/* settings: {
gravity: 0.1, // Weak gravity to allow movement
scalingRatio: 5.0, // Increase spacing between nodes
strongGravityMode: false,
barnesHutOptimize: true, // Improve speed
barnesHutTheta: 0.5,
adjustSizes: true, // Prevents overlap by using node size
edgeWeightInfluence: 0.2, // Reduce edge attraction force
slowDown: 5, // Increase layout stability
} */
// Store full labels separately
const fullLabels = {};
function placeNodesInMultiRings(centerX, centerY, baseRadius, minSpacing) {
const ringNodes = graph.nodes().filter(n => graph.getNodeAttribute(n, "layout") === "ring");
if (ringNodes.length === 0) return;
// console.log("multi rings", ringNodes.length);
let rings = [];
let nodesPlaced = 0;
let currentRadius = baseRadius;
/*function maxNodesForRadius(radius) {
let avgNodeSize = ringNodes.reduce((sum, node) => sum + graph.getNodeAttribute(node, "size"), 0) / ringNodes.length;
let spacingFactor = avgNodeSize + minSpacing;
return Math.max(6, Math.floor((2 * Math.PI * radius) / spacingFactor)); // Ensure at least 6 nodes per ring
}*/
function maxNodesForRadius(radius) {
const avgNodeSize = ringNodes.reduce((sum, node) => sum + graph.getNodeAttribute(node, "size"), 0) / ringNodes.length;
const spacingFactor = avgNodeSize + minSpacing;
// Ensure each ring can fit at least 8-20 nodes to avoid clustering
return Math.max(20, Math.floor((2 * Math.PI * radius) / spacingFactor));
}
// **Distribute Nodes into Multiple Rings Correctly**
while (nodesPlaced < ringNodes.length) {
let maxNodesInRing = maxNodesForRadius(currentRadius);
let nodesInThisRing = Math.min(maxNodesInRing, ringNodes.length - nodesPlaced);
let currentRingNodes = ringNodes.slice(nodesPlaced, nodesPlaced + nodesInThisRing);
// console.log("multi rings", maxNodesInRing, nodesInThisRing, currentRingNodes);
rings.push({ nodes: currentRingNodes, radius: currentRadius });
nodesPlaced += nodesInThisRing;
// Increase radius for the next ring
currentRadius += baseRadius + minSpacing * 2;
}
// console.log("multi rings", rings);
// **Place Nodes in Their Respective Rings**
rings.forEach(({ nodes, radius }) => {
// console.log("ring ", radius, nodes.length);
placeNodesInRing(centerX, centerY, radius, nodes);
});
gridOffset = gridDefaultOffset + (baseRadius + minSpacing) * rings.length;
}
function placeNodesInRing(centerX, centerY, radius, nodes) {
if (nodes.length === 0) return;
const angleStep = (2 * Math.PI) / nodes.length;
const jitterStrength = 1; // max random offset in pixels
nodes.forEach((node, i) => {
const angle = i * angleStep;
// Radial jitter (push/pull a little)
const radialOffset = (Math.random() - 0.5) * 2 * jitterStrength;
// Tangential jitter (slide along the circle a bit)
const tangentialOffset = (Math.random() - 0.5) * 2 * jitterStrength;
// Base position
const baseX = centerX + (radius + radialOffset) * Math.cos(angle);
const baseY = centerY + (radius + radialOffset) * Math.sin(angle);
// Compute tangential vector (perpendicular to radius vector)
const tangentX = -Math.sin(angle);
const tangentY = Math.cos(angle);
// Final position with tangential jitter
const x = baseX + tangentX * tangentialOffset;
const y = baseY + tangentY * tangentialOffset;
graph.mergeNodeAttributes(node, { x, y });
});
}
/*function placeNodesInRing(centerX, centerY, radius) {
const ringNodes = graph.nodes().filter(n => graph.getNodeAttribute(n, "layout") === "ring");
const angleStep = (2 * Math.PI) / ringNodes.length; // Evenly distribute
ringNodes.forEach((node, i) => {
const angle = i * angleStep;
graph.mergeNodeAttributes(node, {
x: centerX + radius * Math.cos(angle),
y: centerY + radius * Math.sin(angle),
});
});
}*/
function placeNodesInGrid(gridStartX, gridStartY, cols, spacing) {
const gridNodes = graph.nodes().filter(n => graph.getNodeAttribute(n, "layout") === "grid");
const nodeCount = gridNodes.length;
const rows = Math.ceil(nodeCount / cols);
const totalHeight = (rows - 1) * spacing;
const yOffset = -totalHeight / 2; // center the grid vertically around gridStartY
gridNodes.forEach((node, i) => {
const row = Math.floor(i / cols);
const col = i % cols;
graph.mergeNodeAttributes(node, {
x: gridStartX + col * spacing,
y: gridStartY + yOffset + row * spacing,
});
});
// Optional: set gridYOffset globally if needed elsewhere
window.gridYOffset = yOffset;
}
function render() {
adjustLabelPositions();
renderer.refresh();
}
function formatLabel(label) {
return label.length > 20 ? label.substring(0, 20) + "..." : label;
}
function adjustLabelPositions() {
graph.updateEachNodeAttributes((node, attr) => ({
...attr,
labelY: attr.y + 0.2, // Move label slightly outside the node
}));
}
ctx.handleEvent("clear", (n) => {
gridOffset = 20;
graph.clear();
renderer.refresh();
});
ctx.handleEvent("add_node", (node) => {
// console.log("Add node", node);
if (node.replace || !graph.hasNode(node.id)) {
if (node.replace && graph.hasNode(node.id)) {
graph.dropNode(node.id);
}
fullLabels[node.id] = node.label;
graph.addNode(node.id, node);
// placeNodesInRing(centerX, centerY, radius)
if(node.layout == 'ring') {
// placeNodesInRing(0, 0, 20); // circular.assign(graph);
placeNodesInMultiRings(0, 0, 20, 3);
}
// placeNodesInGrid(gridStartX, gridStartY, cols, spacing)
if(node.layout == 'grid') placeNodesInGrid(-1 * gridOffset, 0, 4, 5);
} else {
// console.log("Node " + node.id + " already exists, ignore");
}
debounce(render, 100);
});
ctx.handleEvent("merge_node", (node) => {
// console.log("Merge node", node);
if (graph.hasNode(node.id)) {
graph.mergeNode(node.id, node);
fullLabels[node.id] = node.label;
adjustLabelPositions();
debounce(render, 100);
}
});
ctx.handleEvent("remove_node", (nodeId) => {
if (graph.hasNode(nodeId)) {
graph.dropNode(nodeId);
delete fullLabels[nodeId];
}
debounce(render, 100);
});
ctx.handleEvent("add_edge", ({ source, target, props }) => {
if (graph.hasNode(source) && graph.hasNode(target)) {
if( graph.hasEdge(source, target) ) graph.dropEdge(source, target);
graph.addEdge(source, target, props);
}
debounce(render, graph.size > 100 ? 500 : 100);
});
ctx.handleEvent("add_edges_batch", ({ edges }) => {
edges.forEach((edge, i) => {
if (graph.hasNode(edge.source) && graph.hasNode(edge.target) && !graph.hasEdge(edge.source, edge.target)) {
graph.addEdge(edge.source, edge.target, edge.props);
}
});
debounce(render, graph.size > 100 ? 500 : 100);
});
ctx.handleEvent("render", (ctx) => {
renderer.refresh();
});
ctx.handleEvent("remove_edge", ({ source, target }) => {
if (graph.hasEdge(source, target)) {
graph.dropEdge(source, target);
//console.log("dropped edge with ", source, target);
} else {
// console.log("no edge with ", source, target);
}
debounce(render, graph.size > 100 ? 500 : 100);
});
ctx.handleEvent("export_image", ({ width, height }) => {
exportMode = true;
renderer.refresh();
setTimeout(() => {
downloadAsImage(renderer, {
layers: ["edges", "nodes"],
imgFormat: "png",
fileName: "iroh_swarm.png",
backgroundColor: "#000",
width: 3000,
height: 3000,
cameraState: { x: 0.5, y: 0.5, angle: 0, ratio: 1.2 }
});
// Reset export mode and refresh back
exportMode = false;
renderer.refresh();
}, 200); // Small delay ensures updated rendering
});
// --- External State ---
// Holds the ID of the node currently being hovered, or null if none.
let hoveredNodeId = null;
// Optional: Store neighbors temporarily for performance if graph is large
let hoveredNeighbors = new Set();
// --- Event Handlers (Now much simpler!) ---
renderer.on("enterNode", ({ node }) => {
// Set the hover state
hoveredNodeId = node;
hoveredNeighbors = new Set(graph.neighbors(node)); // Calculate neighbors on enter
// Set the specific label for the hovered node (direct manipulation is fine here)
if (fullLabels && fullLabels[node]) { // Check if fullLabels exists and has entry
graph.setNodeAttribute(node, "label", fullLabels[node]);
}
// Trigger reducers to re-evaluate based on the new state
renderer.refresh();
});
renderer.on("leaveNode", ({ node }) => {
// Reset the hover state ONLY if leaving the currently hovered node
// (Prevents issues if mouse moves quickly between nodes)
if (hoveredNodeId === node) {
hoveredNodeId = null;
hoveredNeighbors = new Set(); // Clear neighbors
// Reset the label (remove attribute to revert to default/data label)
graph.removeNodeAttribute(node, "label");
// Trigger reducers to re-evaluate (will now reset everything)
renderer.refresh();
}
});
};
"""
end
end
defmodule IrohLogger do
use GenServer
@base_colors %{
"red" => "#cc4d4d",
"orange" => "#cc944d",
"yellow" => "#cccc66",
"green" => "#3a6b3a",
"blue" => "#3a5f7a"
}
@brightness_levels [
"33", # 20%
"4D", # 30%
"66", # 40%
"80", # 50%
"99", # 60%
"B3", # 70%
"CC", # 80%
"E6", # 90%
]
@border_color "#CCCCB3"
@impl true
def init(config) do
{:ok, %{graph: config.graph, datatable: config.datatable, nodes: [], relays: [], peers: %{}, msg_received_cnt: 0, messages: []}}
end
def get_node_size(nodes_cnt) do
size =
cond do
nodes_cnt < 100 -> 2
nodes_cnt >= 100 -> 2
end
# IO.puts("#{__MODULE__} nodes_cnt: #{nodes_cnt} size: #{size}")
size
end
@impl true
def handle_info({ :iroh_node_setup, node_addr }, %{ graph: graph, nodes: nodes } = state ) do
# IO.puts("Node setup: #{node_addr}")
SigmaGraph.add_node(
graph,
%{
id: "#{node_addr}",
layout: "ring",
# size: get_node_size(Enum.count(nodes)),
size: 4,
color: "grey",
label: "#{node_addr}",
type: "border",
borderColor: @border_color
},
true
)
{:noreply,
state
#|> Map.put(:nodes, nodes ++ [node_addr])
|> Map.put(:nodes, Enum.uniq([node_addr | nodes]))
}
end
@impl true
def handle_info({ :iroh_node_connected, node_addr, node_relay }, %{ graph: graph, nodes: nodes, relays: relays } = state ) do
IO.puts("Node connected: #{node_addr} #{node_relay}")
SigmaGraph.merge_node(
graph,
%{
id: "#{node_addr}",
layout: "ring",
size: get_node_size(Enum.count(nodes)),
color: "grey",
label: "#{node_addr}",
type: "border",
borderColor: @border_color
}
)
SigmaGraph.merge_node(
graph,
%{
id: node_relay,
size: get_node_size(Enum.count(nodes)),
color: "blue",
label: node_relay,
x: -15,
y: 0,
}
)
SigmaGraph.add_edge(
graph,
%{
source: node_addr,
target: node_relay,
props: %{ type: "curved", color: "grey" }
}
)
{:noreply,
state
|> Map.put(:nodes, Enum.uniq([node_addr | nodes]))
|> Map.put(:relays, Enum.uniq([node_relay | relays]))
|> update_in([:peers, node_addr], fn
nil -> []
peers -> peers
end)
}
end
@impl true
def handle_info({:iroh_gossip_neighbor_up, source, target, remote_info}, %{ graph: graph, peers: peers, nodes: nodes } = state) do
case remote_info["conn_type"] do
"None" ->
IO.puts("Neighbor up: #{source} #{target} None connection type!")
{:noreply, state}
_ ->
IO.puts("Neighbor up: #{source} #{target} #{inspect(remote_info)}")
new_peers = case peers[source] do
nil -> [target]
peers -> peers ++ [target]
end
peers_cnt = Enum.count(new_peers)
nodes_cnt = Enum.count(nodes)
grow_size =
cond do
nodes_cnt < 100 -> peers_cnt
nodes_cnt >= 100 -> peers_cnt / 2
end
color =
cond do
peers_cnt == 0 -> vary_color("red", "E6")
peers_cnt == 1 -> vary_color("orange", "E6")
peers_cnt < 3 -> vary_color("yellow", "E6")
#peers_cnt >= 3 -> vary_color("green", "E6")
peers_cnt < 6 -> vary_color("green", "E6")
true -> vary_color("blue", "CC") # whale node = blue + 80% alpha
end
SigmaGraph.merge_node(
graph,
%{
id: source,
layout: "ring",
size: get_node_size(nodes_cnt) + grow_size,
color: color,
label: "#{source} P:#{peers_cnt}",
type: "border",
borderColor: @border_color
}
)
has_inverse_edge = Map.get(peers, target, []) |> Enum.member?(source)
unless has_inverse_edge do
latency_ms = extract_latency_ms(remote_info)
edge_color =
case latency_ms do
nil -> grey_tone()
ms -> latency_tinted_color(ms)
end
SigmaGraph.add_edge(
graph,
%{
source: source,
target: target,
props: %{ type: "curved", curvature: 0.03, color: edge_color, size: 1 }
}
)
end
{:noreply,
state
|> update_in([:peers, source], fn _ -> new_peers end)
}
end
end
@impl true
def handle_info({:iroh_gossip_neighbor_down, source, target}, %{ graph: graph } = state) do
IO.puts("Neighbor down: #{source} #{target}")
SigmaGraph.remove_edge(
graph,
%{
source: source,
target: target,
}
)
{:noreply,
state
|> update_in([:peers, source], fn
nil -> []
peers -> peers -- [target]
end)
}
end
@impl true
def handle_info({:iroh_gossip_node_discovered, source, target}, %{ graph: graph } = state) do
SigmaGraph.add_edge(
graph,
%{
source: source,
target: target,
props: %{ color: "#ddd" },
type: "border",
borderColor: @border_color
}
)
{:noreply, state}
end
@impl true
def handle_info({:iroh_node_terminating, node_addr}, %{ graph: graph } = state) do
IO.puts("Node terminating: #{node_addr}")
SigmaGraph.merge_node(
graph,
%{
id: node_addr,
layout: "ring",
size: 4,
color: "red",
label: "Killed by chaos monkey",
type: "border",
borderColor: @border_color
}
)
Process.send_after(self(), {:cleanup_node, node_addr }, 3000)
{:noreply, state }
end
@impl true
def handle_info({:cleanup_node, node_addr}, %{ graph: graph } = state) do
IO.puts("Node cleanup: #{node_addr}")
SigmaGraph.remove_node(
graph,
node_addr
)
{:noreply, state }
end
@impl true
def handle_info({:iroh_gossip_message_received, source, payload}, %{ msg_received_cnt: msg_received_cnt, graph: graph, messages: messages, datatable: datatable } = state) do
edge = %{source: payload, target: source}
# IO.puts("#{__MODULE__} handle_info :iroh_gossip_message_received #{source} #{payload} #{inspect(edge)}")
Task.async(fn -> SigmaGraph.remove_edge(graph, edge) end)
new_messages = messages ++ [%{source: source, payload: payload}]
Task.async(fn -> Kino.DataTable.update(datatable, new_messages) end)
{:noreply, state
|> Map.put(:msg_received_cnt, msg_received_cnt + 1)
|> Map.put(:messages, new_messages)
}
end
@impl true
def handle_info(_msg, state) do
# IO.puts("#{__MODULE__} Catchall #{inspect(msg)}")
{:noreply, state}
end
@impl true
def handle_call(:reset, _from, %{graph: graph, datatable: datatable} = _state) do
{:reply, [], %{graph: graph, datatable: datatable, nodes: [], relays: [], peers: %{}, msg_received_cnt: 0, messages: []}}
end
defp extract_latency_ms(remote_info) do
with latency when is_binary(latency) <- Map.get(remote_info, "latency"),
[ms_string] <- Regex.run(~r/[\d.]+/, latency),
{ms, _} <- Float.parse(ms_string) do
ms
else
_ -> nil
end
end
defp grey_tone do
shade = Enum.random(50..100) # Random grey from light to dark
"rgb(#{shade},#{shade},#{shade}, 0.5)"
end
defp vary_color(base, brightness) do
base_color = @base_colors[base]
alpha = brightness || Enum.random(@brightness_levels)
base_color <> alpha
end
defp latency_tinted_color(latency_ms) when is_float(latency_ms) do
latency_ms = :rand.uniform(300)
amount = 0.1
cond do
latency_ms < 10 ->
tint_grey(:green, amount)
latency_ms < 100 ->
tint_grey(:yellow, amount)
true ->
tint_grey(:orange, amount)
end
end
defp tint_grey(color, amount) do
base = grey_rgb()
tint = color_rgb(color)
mixed = Enum.zip(base, tint)
|> Enum.map(fn {g, c} -> round(g * (1 - amount) + c * amount) end)
"rgb(#{Enum.join(mixed, ",")})"
end
defp grey_rgb(), do: [120, 120, 120]
defp color_rgb(:green), do: [0, 200, 0]
defp color_rgb(:yellow), do: [255, 255, 0]
defp color_rgb(:orange), do: [255, 165, 0]
end
defmodule UIGenServer do
use GenServer
require Logger
@impl true
def init( init_state ) do
graph = init_state.graph
datatable = init_state.datatable
{:ok, logger_pid} = GenServer.start_link(IrohLogger, %{graph: graph, datatable: datatable})
# {:ok, observer_pid} = IrohObserver.start_link({graph, datatable})
{ :ok, manager_pid } = IrohEx.NodeManager.start_link(logger_pid)
{:ok, init_state
|> Map.put(:graph, graph)
|> Map.put(:datatable, datatable)
|> Map.put(:logger_pid, logger_pid)
|> Map.put(:manager_pid, manager_pid)
|> Map.put(:msg_counter, 1)
|> Map.put(:chaos_monkey, 0)
}
end
def handle_call({:config, key, value}, _from, state) do
# IO.puts("#{__MODULE__} :config #{key} #{value}")
{:reply, value, Map.put(state, key, value)}
end
def handle_call(:reset, _from, %{
datatable: datatable,
graph: graph,
manager_pid: manager_pid,
logger_pid: logger_pid,
} = state) do
Kino.DataTable.update(datatable, [])
SigmaGraph.clear(graph)
GenServer.call(manager_pid, :reset)
GenServer.call(logger_pid, :reset)
:erlang.garbage_collect()
{:reply, [], state}
end
def handle_call(:start_sim, _from, %{
datatable: datatable,
graph: graph,
num_nodes: num_nodes,
delay_ms: delay_ms,
manager_pid: manager_pid,
logger_pid: logger_pid,
whale_node_prob: whale_node_prob,
#delay_msg_ms: delay_msg_ms
} = state) do
# IO.puts("#{__MODULE__} handle_call :start_sim num_nodes:#{num_nodes}, whale:#{whale_node_prob}")
Kino.DataTable.update(datatable, [])
SigmaGraph.clear(graph)
GenServer.call(manager_pid, {:create_nodes, whale_node_prob, num_nodes, logger_pid, delay_ms}, 30_000)
# why?
Process.sleep(delay_ms)
{:reply, [], state}
end
@impl true
def handle_call(:send_single_message, _from, %{
datatable: _datatable,
graph: _graph,
use_random: _use_random,
delay_ms: delay_ms,
msg_counter: msg_counter
} = state) do
nodes = IrohEx.NodeSupervisor.get_children()
node = Enum.random(nodes)
msg = "MSG:#{msg_counter}"
msg_delay = :rand.uniform(delay_ms)
recipient_nodes = nodes -- [node]
#Native.send_message(node, msg)
#GenServer.cast(manager_pid, {:send_message, node, msg})
# GenServer.cast(node, {:send_message, msg})
# Process.sleep(msg_delay)
Process.send_after(self(), {:add_inflight_messages, recipient_nodes, msg}, 0)
Process.send_after(node, {:send_message, msg}, msg_delay)
{:reply, [], state
# |> Map.put(:msg_counter, msg_counter + 1)
}
end
def handle_call(:start_chaos_monkey, _from, state) do
IO.puts("Start chaos monkey")
Process.send_after(self(), :chaos_monkey_round, :rand.uniform(2000))
{:reply, [], state}
end
@impl true
def handle_call(:send_messages, _from, %{
datatable: _datatable,
graph: graph,
num_messages: num_messages,
use_random: use_random,
delay_ms: delay_ms,
} = state) do
nodes = IrohEx.NodeSupervisor.get_children()
# IO.inspect(nodes, label: "Nodes test")
send_many_messages(self(), graph, nodes, num_messages, use_random, 100 )
Process.sleep(delay_ms)
reports = Enum.map(nodes, fn n ->
#IO.puts("Process alive? #{Process.alive?(n)}")
#IO.puts("node report: #{inspect(n)}")
report = GenServer.call(n, :report)
#IO.puts("report: #{inspect(report)}")
report
end)
IO.puts("Node reports #{inspect(reports)}" )
{:reply, [], state}
end
def send_many_messages(pid, _graph, nodes, msg_cnt, use_random_node, rand_msg_delay \\ 100) do
no_random_node = Enum.random(nodes)
stream =
Stream.map(1..msg_cnt, fn x ->
# Return the *action* to be performed
fn ->
node = case use_random_node do
true -> Enum.random(nodes)
false -> no_random_node
end
_node_id = GenServer.call(node, :get_node_addr)
#Native.gen_node_addr(node)
# Sleep *before* the task
# Process.sleep(rand_msg_delay)
msg = "MSG:#{x}"
msg_delay = :rand.uniform(rand_msg_delay)
recipient_nodes = nodes -- [node]
#Native.send_message(node, msg)
#GenServer.cast(manager_pid, {:send_message, node, msg})
# GenServer.cast(node, {:send_message, msg})
# Process.sleep(msg_delay)
Process.send_after(pid, {:add_inflight_messages, recipient_nodes, msg}, 0)
Process.send_after(node, {:send_message, msg}, msg_delay)
# add msg node and edges
# IO.inspect(recipient_nodes, label: "msg recipients")
end
end)
stream
|> Task.async_stream(fn action -> action.() end, max_concurrency: Enum.count(nodes))
# |> Task.async_stream(fn action -> action.() end, max_concurrency: Enum.count(nodes))
|> Enum.to_list()
Process.sleep(1000)
end
@impl true
def handle_info(:chaos_monkey_round, %{ logger_pid: logger_pid } = state) do
children = IrohEx.NodeSupervisor.get_children()
Logger.debug("Current children count: #{length(children)}")
doomed_nodes = if length(children) > 20 do # Increased minimum
nodes_to_kill = :rand.uniform(3) # Always kill exactly one node
doomed = Enum.take_random(children, nodes_to_kill)
Logger.debug("Will kill #{nodes_to_kill} nodes")
doomed
else
Logger.debug("Not enough children to kill (need > 20)")
[]
end
Enum.each(doomed_nodes, fn pid ->
if Process.alive?(pid) do
node_addr = GenServer.call(pid, :get_node_addr)
Process.send_after(logger_pid, {:iroh_node_terminating, node_addr}, 0)
Logger.debug("Killing node #{inspect(pid)}")
Process.exit(pid, :kill)
Logger.debug("Crashed node #{inspect(pid)}")
# Wait longer between kills
Process.sleep(2000) # Increased to 5 seconds
else
Logger.debug("Node #{inspect(pid)} already dead")
end
end)
# Schedule next round with longer delay
Process.send_after(self(), :chaos_monkey_round, :rand.uniform(3000) + 1000)
{:noreply, state}
end
@impl true
def handle_info({:add_inflight_messages, recipient_nodes, msg }, %{ graph: graph } = state ) do
IO.puts("#{__MODULE__} handle_info :add_inflight_messages #{Enum.count(recipient_nodes)} #{inspect(recipient_nodes)} #{inspect(msg)}")
SigmaGraph.add_node(
graph,
%{id: msg, layout: "grid", size: 5, label: msg, color: "grey"},
true
)
edges =
Enum.reduce(recipient_nodes, [], fn n, acc ->
node_id = GenServer.call(n, :get_node_addr)
edge = %{source: msg, target: node_id, props: %{type: "curved", color: "#E2B12D4A"}}
[edge | acc]
end)
SigmaGraph.add_edges_batch(graph, edges)
{:noreply, state}
end
@impl true
def handle_info(msg, state) do
IO.puts("#{__MODULE__} catchall handle_info #{inspect(msg)}")
{:noreply, state}
end
end
graph = SigmaGraph.new()
# do not show graph yet
"Graph initialized"
Run stuff
# UI Controls
num_nodes = Kino.Input.number("Number of Nodes", default: 50, min: 3)
delay_ms = Kino.Input.number("Delay after connecting a batch of nodes", default: 300, min: 0)
delay_msg_ms = Kino.Input.number("Delay msg random", default: 500, min: 0)
whale_node_prob = Kino.Input.range("Whale Node Ratio (%)", default: 10, min: 0)
num_messages = Kino.Input.number("Number of Messages", default: 3, min: 1)
use_random = Kino.Input.checkbox("Use Random Sender", default: false)
chaos_monkey_button = Kino.Control.button("Chaos monkey")
chaos_monkey = Kino.Input.checkbox("Enable chaos monkey", default: false)
chaos_monkey_batchsize = Kino.Input.range("Nodes Killed per batch", default: 5, min: 0, max: 10)
setup_button = Kino.Control.button("Setup nodes")
send_button = Kino.Control.button("Send messages")
send_single_button = Kino.Control.button("Send message")
send_continuous_button = Kino.Input.checkbox("Continuous")
reset_button = Kino.Control.button("Reset")
export_button = Kino.Control.button("Export png")
datatable =
Kino.DataTable.new(
[],
keys: [:source, :payload]
)
# render but hide
frame_hidden = Kino.Frame.new()
Kino.Frame.render(frame_hidden, Kino.Layout.grid([send_button, send_continuous_button, num_messages, delay_msg_ms ]))
Kino.Frame.render(frame_hidden, nil)
buttons = Kino.Layout.grid([setup_button,
# send_button,
send_single_button,
# send_continuous_button,
chaos_monkey_button,
export_button,
reset_button
], columns: 5)
# Display Controls
grid =
Kino.Layout.grid([
graph,
buttons,
Kino.Layout.grid([num_nodes,
delay_ms,
#delay_msg_ms,
# num_messages,
use_random,
whale_node_prob,
chaos_monkey,
chaos_monkey_batchsize
],
columns: 2
),
datatable
])
grid
num_nodes_value = Kino.Input.read(num_nodes)
num_messages_value = 1 # Kino.Input.read(num_messages) #Kino.Input.read(num_messages)
use_random_value = true # Kino.Input.read(use_random)
delay_ms_value = Kino.Input.read(delay_ms)
delay_msg_ms_value = 100 # Kino.Input.read(delay_msg_ms)
whale_node_prob_value = Kino.Input.read(whale_node_prob)
{:ok, ui_pid} = GenServer.start_link(UIGenServer, %{
graph: graph,
datatable: datatable,
num_nodes: num_nodes_value, #Kino.Input.read(num_nodes),
num_messages: num_messages_value, #Kino.Input.read(num_messages),
use_random: use_random_value, #Kino.Input.read(use_random),
delay_ms: delay_ms_value, # Kino.Input.read(delay_ms),
delay_msg_ms: delay_msg_ms_value, # Kino.Input.read(delay_msg_ms),
whale_node_prob: whale_node_prob_value
})
Kino.Control.tagged_stream(
num_nodes: num_nodes,
num_messages: num_messages,
use_random: use_random,
delay_ms: delay_ms,
delay_msg_ms: delay_msg_ms,
whale_node_prob: whale_node_prob,
setup_button: setup_button,
send_button: send_button,
send_single_button: send_single_button,
reset_button: reset_button,
export_button: export_button,
chaos_monkey: chaos_monkey,
chaos_monkey_batchsize: chaos_monkey_batchsize,
chaos_monkey_button: chaos_monkey_button
)
|> Kino.listen(fn {key, event} ->
case event.type do
:change ->
if Map.has_key?(event, :type) and Map.has_key?(event, :value) do
IO.puts(":config #{key} #{event.value}")
GenServer.call(ui_pid, { :config, key, event.value })
end
:click ->
case key do
:setup_button ->
GenServer.call(ui_pid, :reset)
GenServer.call(ui_pid, :start_sim, 30_000)
:send_button -> GenServer.call(ui_pid, :send_messages, 30_000)
:send_single_button -> GenServer.call(ui_pid, :send_single_message, 30_000)
:reset_button -> GenServer.call(ui_pid, :reset)
:export_button -> SigmaGraph.export_image(
graph,
%{width: 3000, height: 3000}
)
:chaos_monkey_button -> GenServer.call(ui_pid, :start_chaos_monkey)
end
end
IO.puts("#{key}, #{inspect(event)}")
#IO.inspect(event, label: "event")
end)