Powered by AppSignal & Oban Pro

Choreo ThreatModel: Comprehensive Walkthrough

threat_model_walkthrough.livemd

Choreo ThreatModel: Comprehensive Walkthrough

# For Hex publication/readers:
# Mix.install([{:choreo, "~> 0.9"}, {:kino_vizjs, "~> 0.8.0"}])

# For local development:
Mix.install([
  {:choreo, path: Path.expand("~/repos/elixir/choreo")},
  {:kino_vizjs, "~> 0.8.0"}
])

Section

Rendering diagrams: This livebook uses Kino.VizJS to render DOT diagrams inline. It also supports rendering to native Mermaid.js syntax using to_mermaid/2 via Kino.Mermaid, which is supported natively in Livebook!


What is Threat Modeling?

Threat modeling is a structured way to identify, quantify, and address security risks in a system. Instead of waiting for a penetration test to reveal vulnerabilities, you analyze the architecture before writing code and ask: “What could go wrong?”

STRIDE is a popular mnemonic for threat categories:

Category Question Example
Spoofing Can an attacker pretend to be someone else? Stolen credentials, forged tokens
Tampering Can data be modified in transit or at rest? Man-in-the-middle, SQL injection
Repudiation Can actions be denied? Missing audit logs
Information Disclosure Is sensitive data exposed? Unencrypted backups, verbose errors
Denial of Service Can the system be overloaded? DDoS, resource exhaustion
Elevation of Privilege Can a user gain more access? Horizontal/vertical privilege escalation

Choreo.ThreatModel lets you describe your architecture as code, define trust boundaries, and automatically generate STRIDE threats with severity scoring.


Example 1: Classic Three-Tier Web Application

Let’s start with the simplest useful model: a user, a web API, and a database. This mirrors the classic pytm getting-started example.

alias Choreo.ThreatModel
alias Choreo.ThreatModel.Analysis

web_app =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("internet", level: 0, label: "Internet")
  |> ThreatModel.add_trust_boundary("app", level: 2, label: "Application Zone")
  |> ThreatModel.add_trust_boundary("data", level: 3, label: "Data Layer")
  |> ThreatModel.add_external_entity(:user,
    label: "Customer",
    boundary: "internet",
    description: "End user browsing the site"
  )
  |> ThreatModel.add_process(:web_api,
    label: "Web API",
    boundary: "app",
    privilege: :user,
    description: "Handles HTTP requests"
  )
  |> ThreatModel.add_data_store(:postgres,
    label: "Postgres",
    boundary: "data",
    sensitivity: :confidential,
    description: "Primary application database"
  )
  |> ThreatModel.data_flow(:user, :web_api,
    label: "HTTPS login",
    encrypted: true
  )
  |> ThreatModel.data_flow(:web_api, :postgres,
    label: "SQL query"
  )

tabs = [
  {"Mermaid", Kino.Mermaid.new(ThreatModel.to_mermaid(web_app))},
  {"Graphviz", Kino.VizJS.render(ThreatModel.to_dot(web_app))}
]

Kino.Layout.tabs(tabs)

Notice the color coding:

  • Red dashed edges cross trust boundaries unencrypted.
  • Orange edges cross boundaries but are encrypted.
  • Green nodes live in higher-trust zones.

Generating STRIDE Threats

threats = Analysis.stride_threats(web_app)

threats
|> Enum.sort_by(& &1.severity, :desc)
|> Enum.each(fn t ->
  target = if is_tuple(t.target), do: "#{elem(t.target, 0)}#{elem(t.target, 1)}", else: t.target
  IO.puts("[#{String.upcase(to_string(t.severity))}] #{t.id}#{t.category} @ #{target}")
  IO.puts("  #{t.description}")
  IO.puts("  Mitigation: #{t.mitigation}\n")
end)

The model automatically flagged the unencrypted web_api → postgres flow as high-risk information disclosure because it crosses from the application zone into the data layer without encryption.

Validation

Analysis.validate(web_app)
|> Enum.each(fn {sev, msg} ->
  icon = if sev == :error, do: "❌", else: "⚠️"
  IO.puts("#{icon} #{msg}")
end)

Example 2: Microservices with an API Gateway

A more realistic architecture: an API Gateway fronts multiple services. Some flows are internal; some cross from the public internet into the VPC.

microservices =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("internet", level: 0)
  |> ThreatModel.add_trust_boundary("dmz", level: 1)
  |> ThreatModel.add_trust_boundary("vpc", level: 2)
  |> ThreatModel.add_external_entity(:mobile_app, label: "Mobile App", boundary: "internet")
  |> ThreatModel.add_external_entity(:spa, label: "Web SPA", boundary: "internet")
  |> ThreatModel.add_process(:api_gateway,
    label: "API Gateway",
    boundary: "dmz",
    privilege: :none,
    description: "Rate limiting, routing, WAF"
  )
  |> ThreatModel.add_process(:auth_service,
    label: "Auth Service",
    boundary: "vpc",
    privilege: :admin,
    description: "Issues and validates JWTs"
  )
  |> ThreatModel.add_process(:order_service,
    label: "Order Service",
    boundary: "vpc",
    privilege: :user
  )
  |> ThreatModel.add_process(:payment_webhook,
    label: "Payment Webhook",
    boundary: "vpc",
    privilege: :none,
    description: "Receives async callbacks from Stripe"
  )
  |> ThreatModel.add_data_store(:orders_db,
    label: "Orders DB",
    boundary: "vpc",
    sensitivity: :confidential
  )
  |> ThreatModel.add_data_store(:redis_cache,
    label: "Redis",
    boundary: "vpc",
    sensitivity: :internal
  )
  |> ThreatModel.data_flow(:mobile_app, :api_gateway, label: "REST + mTLS", encrypted: true)
  |> ThreatModel.data_flow(:spa, :api_gateway, label: "HTTPS", encrypted: true)
  |> ThreatModel.data_flow(:api_gateway, :auth_service, label: "gRPC", encrypted: true)
  |> ThreatModel.data_flow(:api_gateway, :order_service, label: "gRPC", encrypted: true)
  |> ThreatModel.data_flow(:order_service, :orders_db, label: "SQL/TLS", encrypted: true)
  |> ThreatModel.data_flow(:order_service, :redis_cache, label: "Redis protocol")
  |> ThreatModel.data_flow(:payment_webhook, :order_service, label: "Event bus")

tabs = [
  {"Mermaid", Kino.Mermaid.new(ThreatModel.to_mermaid(microservices))},
  {"Graphviz", Kino.VizJS.render(ThreatModel.to_dot(microservices))}
]

Kino.Layout.tabs(tabs)

Cross-Boundary Analysis

Which flows cross a trust boundary without encryption?

Analysis.unencrypted_boundary_flows(microservices)
|> Enum.each(fn {from, to} ->
  IO.puts("🔓 Unencrypted boundary crossing: #{from}#{to}")
end)

Exposed Data Stores

Which databases are reachable from an external entity?

Analysis.exposed_data_stores(microservices)
|> IO.inspect(label: "Exposed data stores")

High-Risk Processes

Which processes touch sensitive data?

Analysis.high_risk_processes(microservices)
|> IO.inspect(label: "High-risk processes")

Attack Paths

What are the complete paths from an external entry point to a data store?

Analysis.attack_paths(microservices)
|> Enum.each(fn path ->
  IO.puts("🎯 #{Enum.join(path, " → ")}")
end)

Threat Summary

summary = Analysis.threat_summary(microservices)

IO.puts("Total threats: #{summary.total}")
IO.puts("\nBy severity:")
Enum.each(summary.by_severity, fn {sev, count} ->
  IO.puts("  #{sev}: #{count}")
end)

Example 3: Multi-Tenant SaaS with Custom Compliance Rules

Real organizations have compliance requirements beyond STRIDE. The :rules option lets you inject custom threat generators via the Choreo.ThreatModel.Analysis.Rule behaviour.

defmodule GDPRRule do
  @behaviour Analysis.Rule

  @impl true
  def threats_for_element(_model, id, data) do
    sensitivity = data[:sensitivity]

    if sensitivity in [:confidential, :restricted] do
      [
        %{
          id: "GDPR-#{id}-1",
          category: :compliance,
          target: id,
          description: "#{data.label} stores personal data without documented retention policy.",
          severity: :high,
          mitigation: "Implement automated data purging and Right-to-be-Forgotten endpoints."
        }
      ]
    else
      []
    end
  end

  @impl true
  def threats_for_flow(_model, from, to, meta) do
    # Cross-boundary flows with personal data that are unencrypted need DPA review
    if meta[:encrypted] do
      []
    else
      [
        %{
          id: "GDPR-FLOW-#{from}-#{to}",
          category: :compliance,
          target: {from, to},
          description: "Unencrypted data flow #{from}#{to} may require Data Processing Agreement.",
          severity: :high,
          mitigation: "Review DPA coverage and SCCs for third-party processors."
        }
      ]
    end
  end
end
saas =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("public", level: 0)
  |> ThreatModel.add_trust_boundary("tenant_isolation", level: 2)
  |> ThreatModel.add_external_entity(:tenant_admin, label: "Tenant Admin", boundary: "public")
  |> ThreatModel.add_process(:app_server,
    label: "App Server",
    boundary: "tenant_isolation",
    privilege: :user
  )
  |> ThreatModel.add_data_store(:tenant_db,
    label: "Tenant DB",
    boundary: "tenant_isolation",
    sensitivity: :confidential,
    retention: "90d"
  )
  |> ThreatModel.data_flow(:tenant_admin, :app_server, encrypted: true)
  |> ThreatModel.data_flow(:app_server, :tenant_db, encrypted: true)

# Base STRIDE only
base_threats = Analysis.stride_threats(saas)
IO.puts("Base STRIDE threats: #{length(base_threats)}")

# STRIDE + GDPR custom rules
all_threats = Analysis.stride_threats(saas, rules: [GDPRRule])
IO.puts("With GDPR rules: #{length(all_threats)}")

gdpr_only = Enum.filter(all_threats, &(&1.category == :compliance))
IO.puts("GDPR-specific threats:")
Enum.each(gdpr_only, fn t ->
  target = if is_tuple(t.target), do: "flow", else: t.target
  IO.puts("  #{t.id} @ #{target}#{t.description}")
end)

Kino.VizJS.render(ThreatModel.to_dot(saas))

Example 4: Sequence Diagrams

While Data Flow Diagrams illustrate security boundaries, Sequence Diagrams visualize interactions chronologically. Choreo supports both Mermaid.js and PlantUML formats.

tabs = [
  {"Mermaid", Kino.Mermaid.new(ThreatModel.to_sequence(microservices))},
  {"PlantUML", Kino.Markdown.new("```plantuml\n#{ThreatModel.to_plantuml(microservices)}\n```")}
]

Kino.Layout.tabs(tabs)

You can copy these payloads directly into PlantText, Mermaid Live Editor, or your Confluence macro blocks securely.


Deep Dive: Understanding Severity Scoring

Choreo assigns severity based on element properties and flow context. You don’t have to guess — the scoring is derived from the model itself.

Severity rules of thumb:

Factor Impact on Severity
Crosses a trust boundary Bumps up by one level
Unencrypted Bumps up by one level
sensitivity: :restricted Critical
sensitivity: :confidential High
sensitivity: :internal Medium
privilege: :admin Higher impact if compromised
privilege: :user Standard impact
privilege: :none Lower impact
# Experiment: change sensitivity and watch severity shift
restricted_model =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("app", level: 2)
  |> ThreatModel.add_process(:api, boundary: "app", privilege: :admin)
  |> ThreatModel.add_data_store(:vault, boundary: "app", sensitivity: :restricted)
  |> ThreatModel.data_flow(:api, :vault)

Analysis.stride_threats(restricted_model)
|> Enum.filter(&(&1.target == :vault))
|> Enum.each(fn t ->
  IO.puts("#{t.category}#{t.severity}")
end)

Fixing Issues: Validation-Driven Hardening

Start with a sloppy model, then use validate/1 as a checklist to harden it.

sloppy =
  ThreatModel.new()
  |> ThreatModel.add_process(:api, label: "API")
  |> ThreatModel.add_data_store(:db, label: "DB")
  |> ThreatModel.data_flow(:api, :db)

IO.puts("Issues found:")

Analysis.validate(sloppy) |> Enum.each(fn {sev, msg} -> IO.puts(" <#{sev}> #{msg}") end)

Now fix each issue step by step:

hardened =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("app", level: 2)
  |> ThreatModel.add_process(:api, label: "API", boundary: "app", privilege: :user)
  |> ThreatModel.add_data_store(:db, label: "DB", boundary: "app", sensitivity: :confidential)
  |> ThreatModel.data_flow(:api, :db, encrypted: true)

IO.puts("After hardening:")
Analysis.validate(hardened) |> IO.inspect()

Kino.VizJS.render(ThreatModel.to_dot(hardened))

Rendering for Stakeholders

Dark theme for presentations

Kino.VizJS.render(ThreatModel.to_dot(microservices, theme: :dark))

Example 5: Multigraph (Parallel Data Flows)

Sometimes multiple distinct data payloads cross boundaries between the same entities (e.g., an HTTPS request and a WebSocket notification channel).

parallel_flows =
  ThreatModel.new()
  |> ThreatModel.add_trust_boundary("internet", level: 0)
  |> ThreatModel.add_trust_boundary("app", level: 2)
  |> ThreatModel.add_external_entity(:user, boundary: "internet")
  |> ThreatModel.add_process(:api, boundary: "app")
  |> ThreatModel.data_flow(:user, :api, label: "Payload (HTTPS)", encrypted: true)
  |> ThreatModel.data_flow(:user, :api, label: "Signal (WS)", encrypted: false)

IO.puts("Number of data flows: #{length(ThreatModel.flows(parallel_flows))}")
Kino.VizJS.render(ThreatModel.to_dot(parallel_flows))

Summary

Task Function
Model architecture ThreatModel.new/1, add_trust_boundary/3, add_external_entity/3, add_process/3, add_data_store/3, data_flow/4
Auto-generate threats Analysis.stride_threats/2
Custom rules Implement Analysis.Rule behaviour, pass via rules: [MyRule]
Find unencrypted flows Analysis.unencrypted_boundary_flows/1
Find exposed databases Analysis.exposed_data_stores/1
Find risky processes Analysis.high_risk_processes/1
Attack paths Analysis.attack_paths/2
Validate model Analysis.validate/1
Summarise threats Analysis.threat_summary/1
Render DFD ThreatModel.to_dot/2 (themes: :default, :dark, :warm, :forest, :ocean)
Render sequence ThreatModel.to_sequence/2 (Mermaid), ThreatModel.to_plantuml/2

Threat modeling as code means your security review is version-controlled, diffable, and repeatable. Every pull request can include an updated threat model alongside the code changes.