Choreo ThreatModel: Comprehensive Walkthrough
# For Hex publication/readers:
# Mix.install([{:choreo, "~> 0.9"}, {:kino_vizjs, "~> 0.8.0"}])
# For local development:
Mix.install([
{:choreo, path: Path.expand("~/repos/elixir/choreo")},
{:kino_vizjs, "~> 0.8.0"}
])
Section
Rendering diagrams: This livebook uses
Kino.VizJSto render DOT diagrams inline. It also supports rendering to native Mermaid.js syntax usingto_mermaid/2viaKino.Mermaid, which is supported natively in Livebook!
What is Threat Modeling?
Threat modeling is a structured way to identify, quantify, and address security risks in a system. Instead of waiting for a penetration test to reveal vulnerabilities, you analyze the architecture before writing code and ask: “What could go wrong?”
STRIDE is a popular mnemonic for threat categories:
| Category | Question | Example |
|---|---|---|
| Spoofing | Can an attacker pretend to be someone else? | Stolen credentials, forged tokens |
| Tampering | Can data be modified in transit or at rest? | Man-in-the-middle, SQL injection |
| Repudiation | Can actions be denied? | Missing audit logs |
| Information Disclosure | Is sensitive data exposed? | Unencrypted backups, verbose errors |
| Denial of Service | Can the system be overloaded? | DDoS, resource exhaustion |
| Elevation of Privilege | Can a user gain more access? | Horizontal/vertical privilege escalation |
Choreo.ThreatModel lets you describe your architecture as code, define trust boundaries, and automatically generate STRIDE threats with severity scoring.
Example 1: Classic Three-Tier Web Application
Let’s start with the simplest useful model: a user, a web API, and a database. This mirrors the classic pytm getting-started example.
alias Choreo.ThreatModel
alias Choreo.ThreatModel.Analysis
web_app =
ThreatModel.new()
|> ThreatModel.add_trust_boundary("internet", level: 0, label: "Internet")
|> ThreatModel.add_trust_boundary("app", level: 2, label: "Application Zone")
|> ThreatModel.add_trust_boundary("data", level: 3, label: "Data Layer")
|> ThreatModel.add_external_entity(:user,
label: "Customer",
boundary: "internet",
description: "End user browsing the site"
)
|> ThreatModel.add_process(:web_api,
label: "Web API",
boundary: "app",
privilege: :user,
description: "Handles HTTP requests"
)
|> ThreatModel.add_data_store(:postgres,
label: "Postgres",
boundary: "data",
sensitivity: :confidential,
description: "Primary application database"
)
|> ThreatModel.data_flow(:user, :web_api,
label: "HTTPS login",
encrypted: true
)
|> ThreatModel.data_flow(:web_api, :postgres,
label: "SQL query"
)
tabs = [
{"Mermaid", Kino.Mermaid.new(ThreatModel.to_mermaid(web_app))},
{"Graphviz", Kino.VizJS.render(ThreatModel.to_dot(web_app))}
]
Kino.Layout.tabs(tabs)
Notice the color coding:
- Red dashed edges cross trust boundaries unencrypted.
- Orange edges cross boundaries but are encrypted.
- Green nodes live in higher-trust zones.
Generating STRIDE Threats
threats = Analysis.stride_threats(web_app)
threats
|> Enum.sort_by(& &1.severity, :desc)
|> Enum.each(fn t ->
target = if is_tuple(t.target), do: "#{elem(t.target, 0)} → #{elem(t.target, 1)}", else: t.target
IO.puts("[#{String.upcase(to_string(t.severity))}] #{t.id} — #{t.category} @ #{target}")
IO.puts(" #{t.description}")
IO.puts(" Mitigation: #{t.mitigation}\n")
end)
The model automatically flagged the unencrypted web_api → postgres flow as high-risk information disclosure because it crosses from the application zone into the data layer without encryption.
Validation
Analysis.validate(web_app)
|> Enum.each(fn {sev, msg} ->
icon = if sev == :error, do: "❌", else: "⚠️"
IO.puts("#{icon} #{msg}")
end)
Example 2: Microservices with an API Gateway
A more realistic architecture: an API Gateway fronts multiple services. Some flows are internal; some cross from the public internet into the VPC.
microservices =
ThreatModel.new()
|> ThreatModel.add_trust_boundary("internet", level: 0)
|> ThreatModel.add_trust_boundary("dmz", level: 1)
|> ThreatModel.add_trust_boundary("vpc", level: 2)
|> ThreatModel.add_external_entity(:mobile_app, label: "Mobile App", boundary: "internet")
|> ThreatModel.add_external_entity(:spa, label: "Web SPA", boundary: "internet")
|> ThreatModel.add_process(:api_gateway,
label: "API Gateway",
boundary: "dmz",
privilege: :none,
description: "Rate limiting, routing, WAF"
)
|> ThreatModel.add_process(:auth_service,
label: "Auth Service",
boundary: "vpc",
privilege: :admin,
description: "Issues and validates JWTs"
)
|> ThreatModel.add_process(:order_service,
label: "Order Service",
boundary: "vpc",
privilege: :user
)
|> ThreatModel.add_process(:payment_webhook,
label: "Payment Webhook",
boundary: "vpc",
privilege: :none,
description: "Receives async callbacks from Stripe"
)
|> ThreatModel.add_data_store(:orders_db,
label: "Orders DB",
boundary: "vpc",
sensitivity: :confidential
)
|> ThreatModel.add_data_store(:redis_cache,
label: "Redis",
boundary: "vpc",
sensitivity: :internal
)
|> ThreatModel.data_flow(:mobile_app, :api_gateway, label: "REST + mTLS", encrypted: true)
|> ThreatModel.data_flow(:spa, :api_gateway, label: "HTTPS", encrypted: true)
|> ThreatModel.data_flow(:api_gateway, :auth_service, label: "gRPC", encrypted: true)
|> ThreatModel.data_flow(:api_gateway, :order_service, label: "gRPC", encrypted: true)
|> ThreatModel.data_flow(:order_service, :orders_db, label: "SQL/TLS", encrypted: true)
|> ThreatModel.data_flow(:order_service, :redis_cache, label: "Redis protocol")
|> ThreatModel.data_flow(:payment_webhook, :order_service, label: "Event bus")
tabs = [
{"Mermaid", Kino.Mermaid.new(ThreatModel.to_mermaid(microservices))},
{"Graphviz", Kino.VizJS.render(ThreatModel.to_dot(microservices))}
]
Kino.Layout.tabs(tabs)
Cross-Boundary Analysis
Which flows cross a trust boundary without encryption?
Analysis.unencrypted_boundary_flows(microservices)
|> Enum.each(fn {from, to} ->
IO.puts("🔓 Unencrypted boundary crossing: #{from} → #{to}")
end)
Exposed Data Stores
Which databases are reachable from an external entity?
Analysis.exposed_data_stores(microservices)
|> IO.inspect(label: "Exposed data stores")
High-Risk Processes
Which processes touch sensitive data?
Analysis.high_risk_processes(microservices)
|> IO.inspect(label: "High-risk processes")
Attack Paths
What are the complete paths from an external entry point to a data store?
Analysis.attack_paths(microservices)
|> Enum.each(fn path ->
IO.puts("🎯 #{Enum.join(path, " → ")}")
end)
Threat Summary
summary = Analysis.threat_summary(microservices)
IO.puts("Total threats: #{summary.total}")
IO.puts("\nBy severity:")
Enum.each(summary.by_severity, fn {sev, count} ->
IO.puts(" #{sev}: #{count}")
end)
Example 3: Multi-Tenant SaaS with Custom Compliance Rules
Real organizations have compliance requirements beyond STRIDE. The :rules option lets you inject custom threat generators via the Choreo.ThreatModel.Analysis.Rule behaviour.
defmodule GDPRRule do
@behaviour Analysis.Rule
@impl true
def threats_for_element(_model, id, data) do
sensitivity = data[:sensitivity]
if sensitivity in [:confidential, :restricted] do
[
%{
id: "GDPR-#{id}-1",
category: :compliance,
target: id,
description: "#{data.label} stores personal data without documented retention policy.",
severity: :high,
mitigation: "Implement automated data purging and Right-to-be-Forgotten endpoints."
}
]
else
[]
end
end
@impl true
def threats_for_flow(_model, from, to, meta) do
# Cross-boundary flows with personal data that are unencrypted need DPA review
if meta[:encrypted] do
[]
else
[
%{
id: "GDPR-FLOW-#{from}-#{to}",
category: :compliance,
target: {from, to},
description: "Unencrypted data flow #{from} → #{to} may require Data Processing Agreement.",
severity: :high,
mitigation: "Review DPA coverage and SCCs for third-party processors."
}
]
end
end
end
saas =
ThreatModel.new()
|> ThreatModel.add_trust_boundary("public", level: 0)
|> ThreatModel.add_trust_boundary("tenant_isolation", level: 2)
|> ThreatModel.add_external_entity(:tenant_admin, label: "Tenant Admin", boundary: "public")
|> ThreatModel.add_process(:app_server,
label: "App Server",
boundary: "tenant_isolation",
privilege: :user
)
|> ThreatModel.add_data_store(:tenant_db,
label: "Tenant DB",
boundary: "tenant_isolation",
sensitivity: :confidential,
retention: "90d"
)
|> ThreatModel.data_flow(:tenant_admin, :app_server, encrypted: true)
|> ThreatModel.data_flow(:app_server, :tenant_db, encrypted: true)
# Base STRIDE only
base_threats = Analysis.stride_threats(saas)
IO.puts("Base STRIDE threats: #{length(base_threats)}")
# STRIDE + GDPR custom rules
all_threats = Analysis.stride_threats(saas, rules: [GDPRRule])
IO.puts("With GDPR rules: #{length(all_threats)}")
gdpr_only = Enum.filter(all_threats, &(&1.category == :compliance))
IO.puts("GDPR-specific threats:")
Enum.each(gdpr_only, fn t ->
target = if is_tuple(t.target), do: "flow", else: t.target
IO.puts(" #{t.id} @ #{target} — #{t.description}")
end)
Kino.VizJS.render(ThreatModel.to_dot(saas))
Example 4: Sequence Diagrams
While Data Flow Diagrams illustrate security boundaries, Sequence Diagrams visualize interactions chronologically. Choreo supports both Mermaid.js and PlantUML formats.
tabs = [
{"Mermaid", Kino.Mermaid.new(ThreatModel.to_sequence(microservices))},
{"PlantUML", Kino.Markdown.new("```plantuml\n#{ThreatModel.to_plantuml(microservices)}\n```")}
]
Kino.Layout.tabs(tabs)
You can copy these payloads directly into PlantText, Mermaid Live Editor, or your Confluence macro blocks securely.
Deep Dive: Understanding Severity Scoring
Choreo assigns severity based on element properties and flow context. You don’t have to guess — the scoring is derived from the model itself.
Severity rules of thumb:
| Factor | Impact on Severity |
|---|---|
| Crosses a trust boundary | Bumps up by one level |
| Unencrypted | Bumps up by one level |
sensitivity: :restricted |
Critical |
sensitivity: :confidential |
High |
sensitivity: :internal |
Medium |
privilege: :admin |
Higher impact if compromised |
privilege: :user |
Standard impact |
privilege: :none |
Lower impact |
# Experiment: change sensitivity and watch severity shift
restricted_model =
ThreatModel.new()
|> ThreatModel.add_trust_boundary("app", level: 2)
|> ThreatModel.add_process(:api, boundary: "app", privilege: :admin)
|> ThreatModel.add_data_store(:vault, boundary: "app", sensitivity: :restricted)
|> ThreatModel.data_flow(:api, :vault)
Analysis.stride_threats(restricted_model)
|> Enum.filter(&(&1.target == :vault))
|> Enum.each(fn t ->
IO.puts("#{t.category} → #{t.severity}")
end)
Fixing Issues: Validation-Driven Hardening
Start with a sloppy model, then use validate/1 as a checklist to harden it.
sloppy =
ThreatModel.new()
|> ThreatModel.add_process(:api, label: "API")
|> ThreatModel.add_data_store(:db, label: "DB")
|> ThreatModel.data_flow(:api, :db)
IO.puts("Issues found:")
Analysis.validate(sloppy) |> Enum.each(fn {sev, msg} -> IO.puts(" <#{sev}> #{msg}") end)
Now fix each issue step by step:
hardened =
ThreatModel.new()
|> ThreatModel.add_trust_boundary("app", level: 2)
|> ThreatModel.add_process(:api, label: "API", boundary: "app", privilege: :user)
|> ThreatModel.add_data_store(:db, label: "DB", boundary: "app", sensitivity: :confidential)
|> ThreatModel.data_flow(:api, :db, encrypted: true)
IO.puts("After hardening:")
Analysis.validate(hardened) |> IO.inspect()
Kino.VizJS.render(ThreatModel.to_dot(hardened))
Rendering for Stakeholders
Dark theme for presentations
Kino.VizJS.render(ThreatModel.to_dot(microservices, theme: :dark))
Example 5: Multigraph (Parallel Data Flows)
Sometimes multiple distinct data payloads cross boundaries between the same entities (e.g., an HTTPS request and a WebSocket notification channel).
parallel_flows =
ThreatModel.new()
|> ThreatModel.add_trust_boundary("internet", level: 0)
|> ThreatModel.add_trust_boundary("app", level: 2)
|> ThreatModel.add_external_entity(:user, boundary: "internet")
|> ThreatModel.add_process(:api, boundary: "app")
|> ThreatModel.data_flow(:user, :api, label: "Payload (HTTPS)", encrypted: true)
|> ThreatModel.data_flow(:user, :api, label: "Signal (WS)", encrypted: false)
IO.puts("Number of data flows: #{length(ThreatModel.flows(parallel_flows))}")
Kino.VizJS.render(ThreatModel.to_dot(parallel_flows))
Summary
| Task | Function |
|---|---|
| Model architecture |
ThreatModel.new/1, add_trust_boundary/3, add_external_entity/3, add_process/3, add_data_store/3, data_flow/4 |
| Auto-generate threats |
Analysis.stride_threats/2 |
| Custom rules |
Implement Analysis.Rule behaviour, pass via rules: [MyRule] |
| Find unencrypted flows |
Analysis.unencrypted_boundary_flows/1 |
| Find exposed databases |
Analysis.exposed_data_stores/1 |
| Find risky processes |
Analysis.high_risk_processes/1 |
| Attack paths |
Analysis.attack_paths/2 |
| Validate model |
Analysis.validate/1 |
| Summarise threats |
Analysis.threat_summary/1 |
| Render DFD |
ThreatModel.to_dot/2 (themes: :default, :dark, :warm, :forest, :ocean) |
| Render sequence |
ThreatModel.to_sequence/2 (Mermaid), ThreatModel.to_plantuml/2 |
Threat modeling as code means your security review is version-controlled, diffable, and repeatable. Every pull request can include an updated threat model alongside the code changes.