Powered by AppSignal & Oban Pro

Flow Metrics

flow_metrics.livemd

Flow Metrics

Purpose

This notebook computes flow metrics from YouTrack issues to help understand team progress, energy, and sustainability. It covers:

  • Throughput — cards completed per week (trend)
  • Cycle time — time from “In Progress” to “Resolved” (distribution + per-stream)
  • WIP (Work in Progress) — concurrent active items per person over time
  • Context switching index — distinct workstreams per person per week
  • Bus factor — unique assignees per workstream (knowledge silo detection)
  • Rework detection — issues reopened after resolution
  • Knowledge silo trend — bus factor tracked week-over-week
  • Interrupt source analysis — which workstreams generate the most unplanned work
  • Workstream rotation — how people rotate between workstreams over time

These metrics map to the Progress, Energy, Togetherness, and Autonomy dimensions of the PETALS team health framework.

Setup

Mix.install([
  {:youtrack, path: "./youtrack"},
  {:kino, "~> 0.14"},
  {:kino_vega_lite, "~> 0.1"},
  {:vega_lite, "~> 0.1"}
])

alias VegaLite, as: Vl
alias Youtrack.{Client, Fields, Status, StartAt, WorkItems, Workstreams, WorkstreamsLoader, Rework, Rotation}

Inputs

base_url_in =
  Kino.Input.text(
    "YouTrack base URL",
    default: System.get_env("YOUTRACK_BASE_URL") || "https://your-instance.youtrack.cloud"
  )

token_in =
  Kino.Input.password(
    "Permanent token",
    default: System.get_env("YOUTRACK_TOKEN") || ""
  )

base_query_in =
  Kino.Input.text(
    "YouTrack base query (e.g. project: MYPROJECT)",
    default: System.get_env("YOUTRACK_BASE_QUERY") || "project: MYPROJECT"
  )

days_back_in =
  Kino.Input.number(
    "Days back to fetch",
    default: String.to_integer(System.get_env("YOUTRACK_DAYS_BACK") || "90")
  )

state_field_in =
  Kino.Input.text("State field name (custom field)",
    default: System.get_env("YOUTRACK_STATE_FIELD") || "State"
  )

assignees_field_in =
  Kino.Input.text("Assignees field name (custom field)",
    default: System.get_env("YOUTRACK_ASSIGNEES_FIELD") || "Assignee"
  )

in_progress_names_in =
  Kino.Input.text(
    "Comma-separated In Progress state names",
    default: System.get_env("YOUTRACK_IN_PROGRESS") || "In Progress"
  )

done_state_names_in =
  Kino.Input.text(
    "Comma-separated Done state names (for rework detection)",
    default: System.get_env("YOUTRACK_DONE_STATES") || "Done, Verified, Fixed"
  )

use_activities_in =
  Kino.Input.checkbox(
    "Compute start_at via activities (State -> In Progress)",
    default: true
  )

project_prefix_in =
  Kino.Input.text(
    "Filter by issue ID prefix (leave empty for all)",
    default: System.get_env("YOUTRACK_PROJECT_PREFIX") || ""
  )

excluded_logins_in =
  Kino.Input.text(
    "Excluded assignee logins (comma-separated)",
    default: System.get_env("YOUTRACK_EXCLUDED_LOGINS") || ""
  )

include_substreams_in =
  Kino.Input.checkbox(
    "Include substreams (expand to parent workstreams)",
    default: true
  )

unplanned_tag_in =
  Kino.Input.text(
    "Unplanned work tag",
    default: System.get_env("YOUTRACK_UNPLANNED_TAG") || "on the ankles"
  )

Kino.Layout.grid(
  [
    base_url_in, token_in,
    base_query_in, days_back_in,
    state_field_in, assignees_field_in,
    in_progress_names_in, done_state_names_in,
    use_activities_in, project_prefix_in,
    excluded_logins_in, include_substreams_in,
    unplanned_tag_in
  ],
  columns: 2
)

Workstreams Config

{workstream_rules, workstreams_path} = WorkstreamsLoader.load_from_default_paths()

if workstreams_path do
  Kino.render(Kino.Markdown.new("Loading workstreams from `#{workstreams_path}`"))
else
  Kino.render(Kino.Markdown.new("⚠️ No workstreams.yaml found, using empty config"))
end

Kino.nothing()

Fetch Issues

base_url = Kino.Input.read(base_url_in) |> String.trim()
token = Kino.Input.read(token_in) |> String.trim()
base_query = Kino.Input.read(base_query_in) |> String.trim()
days_back = Kino.Input.read(days_back_in)
state_field = Kino.Input.read(state_field_in) |> String.trim()
assignees_field = Kino.Input.read(assignees_field_in) |> String.trim()
project_prefix = Kino.Input.read(project_prefix_in) |> String.trim()

excluded_logins =
  Kino.Input.read(excluded_logins_in)
  |> String.split(",", trim: true)
  |> Enum.map(&String.trim/1)

in_progress_names =
  Kino.Input.read(in_progress_names_in)
  |> String.split(",", trim: true)
  |> Enum.map(&String.trim/1)

done_state_names =
  Kino.Input.read(done_state_names_in)
  |> String.split(",", trim: true)
  |> Enum.map(&String.trim/1)

include_substreams? = Kino.Input.read(include_substreams_in)
unplanned_tag = Kino.Input.read(unplanned_tag_in) |> String.trim()
use_activities? = Kino.Input.read(use_activities_in)

today = Date.utc_today() |> Date.to_iso8601()
start_date = Date.utc_today() |> Date.add(-days_back) |> Date.to_iso8601()
query = "#{base_query} updated: #{start_date} .. #{today}"

Kino.render(Kino.Markdown.new("**Query:** `#{query}`"))

req = Client.new!(base_url, token)
raw_issues = Client.fetch_issues!(req, query)

issues =
  if project_prefix == "" do
    raw_issues
  else
    Enum.filter(raw_issues, fn %{"idReadable" => id} ->
      String.starts_with?(id, project_prefix)
    end)
  end

Kino.Markdown.new("Fetched **#{length(issues)}** issues.")

Build Work Items

{issue_start_at, issue_activities} =
  if use_activities? do
    issues
    |> Task.async_stream(
      fn issue ->
        id = issue["id"]
        acts = Client.fetch_activities!(req, id)
        start_at = StartAt.from_activities(acts, state_field, in_progress_names)
        {id, start_at, acts}
      end,
      max_concurrency: 8,
      timeout: 60_000
    )
    |> Enum.reduce({%{}, %{}}, fn
      {:ok, {id, start_at, acts}}, {start_map, acts_map} ->
        {Map.put(start_map, id, start_at), Map.put(acts_map, id, acts)}

      _, acc ->
        acc
    end)
  else
    {%{}, %{}}
  end

work_items =
  WorkItems.build(issues,
    state_field: state_field,
    assignees_field: assignees_field,
    rules: workstream_rules,
    in_progress_names: in_progress_names,
    issue_start_at: issue_start_at,
    excluded_logins: excluded_logins,
    include_substreams: include_substreams?,
    unplanned_tag: unplanned_tag
  )

Kino.Markdown.new("Built **#{length(work_items)}** work items.")

Throughput (Progress)

Cards completed per week — the most fundamental progress metric.

finished_items =
  work_items
  |> Enum.filter(&(&1.status == "finished" and is_integer(&1.resolved)))
  |> Enum.map(fn wi ->
    resolved_date = DateTime.from_unix!(div(wi.resolved, 1000)) |> DateTime.to_date()
    week = Date.beginning_of_week(resolved_date, :monday)
    %{week: Date.to_iso8601(week), stream: wi.stream, person: wi.person_name}
  end)

throughput_by_week =
  finished_items
  |> Enum.frequencies_by(& &1.week)
  |> Enum.map(fn {week, count} -> %{week: week, completed: count} end)
  |> Enum.sort_by(& &1.week)

throughput_chart =
  Vl.new(width: 700, height: 300, title: "Throughput: Completed Items per Week")
  |> Vl.data_from_values(throughput_by_week)
  |> Vl.layers([
    Vl.new()
    |> Vl.mark(:bar, opacity: 0.6, color: "steelblue")
    |> Vl.encode_field(:x, "week", type: :temporal, title: "Week")
    |> Vl.encode_field(:y, "completed", type: :quantitative, title: "Completed"),
    Vl.new()
    |> Vl.mark(:line, color: "red", point: true)
    |> Vl.encode_field(:x, "week", type: :temporal)
    |> Vl.encode_field(:y, "completed", type: :quantitative)
  ])
  |> Vl.encode(:tooltip, [
    [field: "week", type: :temporal, title: "Week"],
    [field: "completed", type: :quantitative, title: "Completed"]
  ])

Kino.VegaLite.new(throughput_chart)

Throughput by Person

throughput_by_person =
  finished_items
  |> Enum.frequencies_by(& &1.person)
  |> Enum.map(fn {person, count} -> %{person: person, completed: count} end)
  |> Enum.sort_by(& &1.completed, :desc)

person_throughput_chart =
  Vl.new(width: 600, height: 300, title: "Throughput by Person")
  |> Vl.data_from_values(throughput_by_person)
  |> Vl.mark(:bar, tooltip: true, color: "steelblue")
  |> Vl.encode_field(:x, "person", type: :nominal, title: "Person", sort: "-y")
  |> Vl.encode_field(:y, "completed", type: :quantitative, title: "Completed")
  |> Vl.encode(:tooltip, [
    [field: "person", type: :nominal, title: "Person"],
    [field: "completed", type: :quantitative, title: "Completed"]
  ])

Kino.VegaLite.new(person_throughput_chart)

Cycle Time (Progress)

Time from start (created or first “In Progress”) to resolved, for finished items.

cycle_time_data =
  work_items
  |> Enum.filter(&(&1.status == "finished" and is_integer(&1.start_at) and is_integer(&1.resolved)))
  |> Enum.map(fn wi ->
    cycle_days = (wi.resolved - wi.start_at) / (1000 * 60 * 60 * 24) |> Float.round(1)

    %{
      issue_id: wi.issue_id,
      title: wi.title,
      person: wi.person_name,
      stream: wi.stream,
      cycle_days: max(cycle_days, 0.0)
    }
  end)
  |> Enum.uniq_by(& &1.issue_id)

if length(cycle_time_data) > 0 do
  avg_cycle = Enum.map(cycle_time_data, & &1.cycle_days) |> then(&(Enum.sum(&1) / length(&1))) |> Float.round(1)
  median_idx = div(length(cycle_time_data), 2)
  median_cycle = cycle_time_data |> Enum.sort_by(& &1.cycle_days) |> Enum.at(median_idx) |> Map.get(:cycle_days)
  p85_idx = round(length(cycle_time_data) * 0.85)
  p85_cycle = cycle_time_data |> Enum.sort_by(& &1.cycle_days) |> Enum.at(min(p85_idx, length(cycle_time_data) - 1)) |> Map.get(:cycle_days)

  Kino.render(Kino.Markdown.new("""
  ### Cycle Time Summary
  - **Mean:** #{avg_cycle} days
  - **Median (P50):** #{median_cycle} days
  - **P85:** #{p85_cycle} days
  - **Items measured:** #{length(cycle_time_data)}
  """))

  histogram =
    Vl.new(width: 600, height: 300, title: "Cycle Time Distribution (days)")
    |> Vl.data_from_values(cycle_time_data)
    |> Vl.mark(:bar, tooltip: true)
    |> Vl.encode_field(:x, "cycle_days",
      type: :quantitative,
      bin: [maxbins: 20],
      title: "Cycle Time (days)"
    )
    |> Vl.encode(:y, aggregate: :count, type: :quantitative, title: "Count")
    |> Vl.encode(:tooltip, [
      [field: "cycle_days", type: :quantitative, bin: true, title: "Days"],
      [aggregate: :count, type: :quantitative, title: "Count"]
    ])

  Kino.VegaLite.new(histogram)
else
  Kino.Markdown.new("*No finished items with valid start/end times found.*")
end

Cycle Time by Workstream

if length(cycle_time_data) > 0 do
  boxplot =
    Vl.new(width: 600, height: 300, title: "Cycle Time by Workstream")
    |> Vl.data_from_values(cycle_time_data)
    |> Vl.mark(:boxplot, extent: 1.5)
    |> Vl.encode_field(:x, "stream", type: :nominal, title: "Workstream", sort: :ascending)
    |> Vl.encode_field(:y, "cycle_days", type: :quantitative, title: "Cycle Time (days)")
    |> Vl.encode_field(:color, "stream", type: :nominal, legend: nil)

  Kino.VegaLite.new(boxplot)
else
  Kino.Markdown.new("*No cycle time data available.*")
end

WIP — Work in Progress (Energy)

How many items are concurrently “ongoing” per person. High WIP signals overcommitment and multitasking drag.

ongoing_items =
  work_items
  |> Enum.filter(&(&1.status == "ongoing"))

wip_by_person =
  ongoing_items
  |> Enum.group_by(& &1.person_name)
  |> Enum.map(fn {person, items} ->
    unique_issues = items |> Enum.uniq_by(& &1.issue_id) |> length()
    %{person: person, wip: unique_issues}
  end)
  |> Enum.sort_by(& &1.wip, :desc)

if length(wip_by_person) > 0 do
  avg_wip = Enum.map(wip_by_person, & &1.wip) |> then(&(Enum.sum(&1) / length(&1))) |> Float.round(1)

  Kino.render(Kino.Markdown.new("""
  ### Current WIP
  - **Average WIP per person:** #{avg_wip} items
  - **Highest WIP:** #{hd(wip_by_person).person} with #{hd(wip_by_person).wip} items
  """))

  wip_chart =
    Vl.new(width: 600, height: 300, title: "Current WIP per Person")
    |> Vl.data_from_values(wip_by_person)
    |> Vl.mark(:bar, tooltip: true)
    |> Vl.encode_field(:x, "person", type: :nominal, title: "Person", sort: "-y")
    |> Vl.encode_field(:y, "wip", type: :quantitative, title: "Active Items")
    |> Vl.encode_field(:color, "wip",
      type: :quantitative,
      scale: [scheme: "reds"],
      title: "WIP"
    )
    |> Vl.encode(:tooltip, [
      [field: "person", type: :nominal, title: "Person"],
      [field: "wip", type: :quantitative, title: "WIP"]
    ])

  Kino.VegaLite.new(wip_chart)
else
  Kino.Markdown.new("*No ongoing items found.*")
end

WIP by Workstream

wip_by_stream =
  ongoing_items
  |> Enum.group_by(& &1.stream)
  |> Enum.map(fn {stream, items} ->
    unique_issues = items |> Enum.uniq_by(& &1.issue_id) |> length()
    %{stream: stream, wip: unique_issues}
  end)
  |> Enum.sort_by(& &1.wip, :desc)

if length(wip_by_stream) > 0 do
  stream_wip_chart =
    Vl.new(width: 600, height: 300, title: "Current WIP by Workstream")
    |> Vl.data_from_values(wip_by_stream)
    |> Vl.mark(:bar, tooltip: true, color: "teal")
    |> Vl.encode_field(:x, "stream", type: :nominal, title: "Workstream", sort: "-y")
    |> Vl.encode_field(:y, "wip", type: :quantitative, title: "Active Items")
    |> Vl.encode(:tooltip, [
      [field: "stream", type: :nominal, title: "Stream"],
      [field: "wip", type: :quantitative, title: "WIP"]
    ])

  Kino.VegaLite.new(stream_wip_chart)
else
  Kino.Markdown.new("*No ongoing items found.*")
end

Context Switching Index (Energy)

Distinct workstreams per person per week. High values indicate fragmented focus.

context_switch_data =
  work_items
  |> Enum.filter(&is_integer(&1.created))
  |> Enum.map(fn wi ->
    date = DateTime.from_unix!(div(wi.created, 1000)) |> DateTime.to_date()
    week = Date.beginning_of_week(date, :monday)
    %{person: wi.person_name, week: Date.to_iso8601(week), stream: wi.stream}
  end)
  |> Enum.group_by(&{&1.person, &1.week})
  |> Enum.map(fn {{person, week}, items} ->
    distinct_streams = items |> Enum.map(& &1.stream) |> Enum.uniq() |> length()
    %{person: person, week: week, distinct_streams: distinct_streams}
  end)
  |> Enum.sort_by(&{&1.week, &1.person})

if length(context_switch_data) > 0 do
  # Average context switching per person
  person_avg =
    context_switch_data
    |> Enum.group_by(& &1.person)
    |> Enum.map(fn {person, weeks} ->
      avg = Enum.map(weeks, & &1.distinct_streams) |> then(&(Enum.sum(&1) / length(&1))) |> Float.round(1)
      %{person: person, avg_streams: avg}
    end)
    |> Enum.sort_by(& &1.avg_streams, :desc)

  Kino.render(
    Vl.new(width: 600, height: 300, title: "Avg Context Switching Index per Person")
    |> Vl.data_from_values(person_avg)
    |> Vl.mark(:bar, tooltip: true)
    |> Vl.encode_field(:x, "person", type: :nominal, title: "Person", sort: "-y")
    |> Vl.encode_field(:y, "avg_streams", type: :quantitative, title: "Avg Distinct Streams/Week")
    |> Vl.encode_field(:color, "avg_streams",
      type: :quantitative,
      scale: [scheme: "oranges"],
      title: "Streams"
    )
    |> Vl.encode(:tooltip, [
      [field: "person", type: :nominal, title: "Person"],
      [field: "avg_streams", type: :quantitative, title: "Avg Streams/Week"]
    ])
    |> Kino.VegaLite.new()
  )

  # Heatmap: person × week
  Vl.new(width: 700, height: 300, title: "Context Switching: Streams per Person per Week")
  |> Vl.data_from_values(context_switch_data)
  |> Vl.mark(:rect, tooltip: true)
  |> Vl.encode_field(:x, "week", type: :temporal, title: "Week")
  |> Vl.encode_field(:y, "person", type: :nominal, title: "Person", sort: :ascending)
  |> Vl.encode_field(:color, "distinct_streams",
    type: :quantitative,
    title: "Distinct Streams",
    scale: [scheme: "oranges"]
  )
  |> Vl.encode(:tooltip, [
    [field: "person", type: :nominal, title: "Person"],
    [field: "week", type: :temporal, title: "Week"],
    [field: "distinct_streams", type: :quantitative, title: "Distinct Streams"]
  ])
  |> Kino.VegaLite.new()
else
  Kino.Markdown.new("*No data for context switching analysis.*")
end

Bus Factor (Togetherness)

Unique assignees per workstream. Streams with bus factor = 1 are knowledge silos.

bus_factor_data =
  work_items
  |> Enum.group_by(& &1.stream)
  |> Enum.map(fn {stream, items} ->
    unique_people = items |> Enum.map(& &1.person_login) |> Enum.uniq()
    %{
      stream: stream,
      bus_factor: length(unique_people),
      people: Enum.join(unique_people, ", "),
      total_items: items |> Enum.uniq_by(& &1.issue_id) |> length()
    }
  end)
  |> Enum.sort_by(& &1.bus_factor)

if length(bus_factor_data) > 0 do
  silos = Enum.filter(bus_factor_data, &amp;(&amp;1.bus_factor <= 1))

  if length(silos) > 0 do
    Kino.render(
      Kino.Markdown.new(
        "### ⚠️ Knowledge Silos (bus factor ≤ 1)\n" <>
        Enum.map_join(silos, "\n", fn s -> "- **#{s.stream}**: #{s.people} (#{s.total_items} items)" end)
      )
    )
  end

  bus_chart =
    Vl.new(width: 600, height: 300, title: "Bus Factor by Workstream")
    |> Vl.data_from_values(bus_factor_data)
    |> Vl.mark(:bar, tooltip: true)
    |> Vl.encode_field(:x, "stream", type: :nominal, title: "Workstream", sort: "y")
    |> Vl.encode_field(:y, "bus_factor", type: :quantitative, title: "Unique Contributors")
    |> Vl.encode_field(:color, "bus_factor",
      type: :quantitative,
      scale: [scheme: "redyellowgreen", domain: [1, Enum.max_by(bus_factor_data, &amp; &amp;1.bus_factor).bus_factor]],
      title: "Bus Factor"
    )
    |> Vl.encode(:tooltip, [
      [field: "stream", type: :nominal, title: "Stream"],
      [field: "bus_factor", type: :quantitative, title: "Bus Factor"],
      [field: "people", type: :nominal, title: "Contributors"],
      [field: "total_items", type: :quantitative, title: "Total Items"]
    ])

  Kino.render(Kino.VegaLite.new(bus_chart))
  Kino.DataTable.new(bus_factor_data, name: "Bus Factor by Stream")
else
  Kino.Markdown.new("*No workstream data available.*")
end

Long-Running Items (Energy)

Items that have been “ongoing” for an extended period. These often signal blockers or scope creep.

now_ms = System.system_time(:millisecond)

long_running =
  work_items
  |> Enum.filter(&amp;(&amp;1.status == "ongoing" and is_integer(&amp;1.start_at)))
  |> Enum.map(fn wi ->
    end_ms = wi.end_at || now_ms
    age_days = (end_ms - wi.start_at) / (1000 * 60 * 60 * 24) |> Float.round(1)
    start_date = DateTime.from_unix!(div(wi.start_at, 1000)) |> DateTime.to_date() |> Date.to_iso8601()
    end_date = DateTime.from_unix!(div(end_ms, 1000)) |> DateTime.to_date() |> Date.to_iso8601()
    %{
      issue_id: wi.issue_id,
      title: wi.title,
      person: wi.person_name,
      stream: wi.stream,
      start_date: start_date,
      end_date: end_date,
      age_days: age_days
    }
  end)
  |> Enum.uniq_by(&amp; &amp;1.issue_id)
  |> Enum.sort_by(&amp; &amp;1.age_days, :desc)

if length(long_running) > 0 do
  stale_threshold = 14.0
  stale_count = Enum.count(long_running, &amp;(&amp;1.age_days >= stale_threshold))

  unless use_activities? do
    Kino.render(Kino.Markdown.new("""
    > **⚠️ Accuracy warning:** "Compute start_at via activities" is **off**, so `start_date` is the issue
    > *creation* date, not when it moved to "In Progress". Enable the checkbox above and re-run
    > for accurate ages.
    """))
  end

  Kino.render(Kino.Markdown.new("""
  ### Long-Running Items
  - **Total ongoing:** #{length(long_running)}
  - **Stale (>#{round(stale_threshold)} days):** #{stale_count}
  - **Start date source:** #{if use_activities?, do: "first In Progress transition (activities)", else: "issue creation date (⚠️ inaccurate)"}
  """))

  age_chart =
    Vl.new(width: 700, height: 300, title: "Ongoing Items by Age (days)")
    |> Vl.data_from_values(long_running)
    |> Vl.mark(:bar, tooltip: true)
    |> Vl.encode_field(:x, "issue_id", type: :nominal, title: "Issue", sort: "-y")
    |> Vl.encode_field(:y, "age_days", type: :quantitative, title: "Age (days)")
    |> Vl.encode_field(:color, "age_days",
      type: :quantitative,
      scale: [scheme: "orangered"],
      title: "Age"
    )
    |> Vl.encode(:tooltip, [
      [field: "issue_id", type: :nominal, title: "Issue"],
      [field: "title", type: :nominal, title: "Title"],
      [field: "person", type: :nominal, title: "Person"],
      [field: "stream", type: :nominal, title: "Stream"],
      [field: "age_days", type: :quantitative, title: "Age (days)"]
    ])

  Kino.render(Kino.VegaLite.new(age_chart))
  Kino.DataTable.new(Enum.take(long_running, 20), name: "Long-Running Items")
else
  Kino.Markdown.new("*No ongoing items found.*")
end

Rework Detection (Progress)

Issues that were reopened after resolution indicate rework — work that wasn’t done right the first time or requirements that changed.

if use_activities? and map_size(issue_activities) > 0 do
  rework_counts = Rework.count_by_issue(issue_activities, state_field, done_state_names)

  if map_size(rework_counts) > 0 do
    # Match rework counts to work items for context
    rework_items =
      work_items
      |> Enum.filter(fn wi -> Map.has_key?(rework_counts, wi.issue_internal_id) end)
      |> Enum.uniq_by(&amp; &amp;1.issue_id)
      |> Enum.map(fn wi ->
        %{
          issue_id: wi.issue_id,
          title: wi.title,
          person: wi.person_name,
          stream: wi.stream,
          rework_count: Map.get(rework_counts, wi.issue_internal_id, 0)
        }
      end)
      |> Enum.sort_by(&amp; &amp;1.rework_count, :desc)

    total_rework_issues = length(rework_items)
    total_issues = issues |> Enum.uniq_by(&amp; &amp;1["id"]) |> length()
    rework_pct = Float.round(total_rework_issues / max(total_issues, 1) * 100, 1)

    Kino.render(Kino.Markdown.new("""
    ### Rework Summary
    - **Reworked issues:** #{total_rework_issues} / #{total_issues} (#{rework_pct}%)
    - **Total reopenings:** #{rework_counts |> Map.values() |> Enum.sum()}
    - **Done states checked:** #{Enum.join(done_state_names, ", ")}
    """))

    # Rework by workstream
    rework_by_stream =
      rework_items
      |> Enum.group_by(&amp; &amp;1.stream)
      |> Enum.map(fn {stream, items} ->
        %{stream: stream, rework_issues: length(items), total_reopenings: Enum.sum(Enum.map(items, &amp; &amp;1.rework_count))}
      end)
      |> Enum.sort_by(&amp; &amp;1.rework_issues, :desc)

    Kino.render(
      Vl.new(width: 600, height: 300, title: "Rework by Workstream")
      |> Vl.data_from_values(rework_by_stream)
      |> Vl.mark(:bar, tooltip: true, color: "coral")
      |> Vl.encode_field(:x, "stream", type: :nominal, title: "Workstream", sort: "-y")
      |> Vl.encode_field(:y, "rework_issues", type: :quantitative, title: "Reworked Issues")
      |> Vl.encode(:tooltip, [
        [field: "stream", type: :nominal, title: "Stream"],
        [field: "rework_issues", type: :quantitative, title: "Reworked Issues"],
        [field: "total_reopenings", type: :quantitative, title: "Total Reopenings"]
      ])
      |> Kino.VegaLite.new()
    )

    Kino.DataTable.new(rework_items, name: "Reworked Issues")
  else
    Kino.Markdown.new("*No rework detected — no issues were reopened after resolution.*")
  end
else
  Kino.Markdown.new("""
  *Rework detection requires activities data.*
  Enable **"Compute start_at via activities"** and re-run to see rework analysis.
  """)
end

Knowledge Silo Trend (Togetherness)

Bus factor tracked week-over-week. Spots whether knowledge concentration is improving or worsening over time.

silo_trend_data =
  work_items
  |> Enum.filter(&amp;is_integer(&amp;1.created))
  |> Enum.map(fn wi ->
    date = DateTime.from_unix!(div(wi.created, 1000)) |> DateTime.to_date()
    week = Date.beginning_of_week(date, :monday) |> Date.to_iso8601()
    %{week: week, stream: wi.stream, person: wi.person_login}
  end)
  |> Enum.group_by(&amp;{&amp;1.week, &amp;1.stream})
  |> Enum.map(fn {{week, stream}, items} ->
    unique_people = items |> Enum.map(&amp; &amp;1.person) |> Enum.uniq() |> length()
    %{week: week, stream: stream, bus_factor: unique_people}
  end)
  |> Enum.sort_by(&amp;{&amp;1.week, &amp;1.stream})

if length(silo_trend_data) > 0 do
  # Line chart: bus factor over time per stream
  Kino.render(
    Vl.new(width: 700, height: 350, title: "Knowledge Silo Trend: Bus Factor per Stream over Time")
    |> Vl.data_from_values(silo_trend_data)
    |> Vl.mark(:line, point: true, tooltip: true)
    |> Vl.encode_field(:x, "week", type: :temporal, title: "Week")
    |> Vl.encode_field(:y, "bus_factor", type: :quantitative, title: "Unique Contributors")
    |> Vl.encode_field(:color, "stream", type: :nominal, title: "Workstream")
    |> Vl.encode(:tooltip, [
      [field: "week", type: :temporal, title: "Week"],
      [field: "stream", type: :nominal, title: "Stream"],
      [field: "bus_factor", type: :quantitative, title: "Bus Factor"]
    ])
    |> Kino.VegaLite.new()
  )

  # Heatmap: stream × week
  Vl.new(width: 700, height: 300, title: "Bus Factor Heatmap: Stream × Week")
  |> Vl.data_from_values(silo_trend_data)
  |> Vl.mark(:rect, tooltip: true)
  |> Vl.encode_field(:x, "week", type: :temporal, title: "Week")
  |> Vl.encode_field(:y, "stream", type: :nominal, title: "Workstream", sort: :ascending)
  |> Vl.encode_field(:color, "bus_factor",
    type: :quantitative,
    title: "Bus Factor",
    scale: [scheme: "redyellowgreen"]
  )
  |> Vl.encode(:tooltip, [
    [field: "stream", type: :nominal, title: "Stream"],
    [field: "week", type: :temporal, title: "Week"],
    [field: "bus_factor", type: :quantitative, title: "Bus Factor"]
  ])
  |> Kino.VegaLite.new()
else
  Kino.Markdown.new("*No data for knowledge silo trend.*")
end

Interrupt Source Analysis (Autonomy)

Which workstreams and projects generate the most unplanned work? Helps identify where interrupts originate.

unplanned_items = Enum.filter(work_items, &amp; &amp;1.is_unplanned)

if length(unplanned_items) > 0 do
  total_items_count = work_items |> Enum.uniq_by(&amp; &amp;1.issue_id) |> length()
  unplanned_unique = unplanned_items |> Enum.uniq_by(&amp; &amp;1.issue_id) |> length()
  unplanned_pct_val = Float.round(unplanned_unique / max(total_items_count, 1) * 100, 1)

  Kino.render(Kino.Markdown.new("""
  ### Interrupt Summary
  - **Unplanned issues:** #{unplanned_unique} / #{total_items_count} (#{unplanned_pct_val}%)
  - **Tag used:** `#{unplanned_tag}`
  """))

  # By workstream
  interrupts_by_stream =
    unplanned_items
    |> Enum.uniq_by(&amp;{&amp;1.issue_id, &amp;1.stream})
    |> Enum.frequencies_by(&amp; &amp;1.stream)
    |> Enum.map(fn {stream, count} -> %{stream: stream, unplanned: count} end)
    |> Enum.sort_by(&amp; &amp;1.unplanned, :desc)

  Kino.render(
    Vl.new(width: 600, height: 300, title: "Unplanned Work by Workstream")
    |> Vl.data_from_values(interrupts_by_stream)
    |> Vl.mark(:bar, tooltip: true, color: "salmon")
    |> Vl.encode_field(:x, "stream", type: :nominal, title: "Workstream", sort: "-y")
    |> Vl.encode_field(:y, "unplanned", type: :quantitative, title: "Unplanned Issues")
    |> Vl.encode(:tooltip, [
      [field: "stream", type: :nominal, title: "Stream"],
      [field: "unplanned", type: :quantitative, title: "Unplanned"]
    ])
    |> Kino.VegaLite.new()
  )

  # By person — who absorbs the most interrupts?
  interrupts_by_person =
    unplanned_items
    |> Enum.uniq_by(&amp;{&amp;1.issue_id, &amp;1.person_login})
    |> Enum.frequencies_by(&amp; &amp;1.person_name)
    |> Enum.map(fn {person, count} -> %{person: person, unplanned: count} end)
    |> Enum.sort_by(&amp; &amp;1.unplanned, :desc)

  Kino.render(
    Vl.new(width: 600, height: 300, title: "Unplanned Work by Person (interrupt absorbers)")
    |> Vl.data_from_values(interrupts_by_person)
    |> Vl.mark(:bar, tooltip: true, color: "darkorange")
    |> Vl.encode_field(:x, "person", type: :nominal, title: "Person", sort: "-y")
    |> Vl.encode_field(:y, "unplanned", type: :quantitative, title: "Unplanned Issues")
    |> Vl.encode(:tooltip, [
      [field: "person", type: :nominal, title: "Person"],
      [field: "unplanned", type: :quantitative, title: "Unplanned"]
    ])
    |> Kino.VegaLite.new()
  )

  # Trend over time
  interrupt_trend =
    unplanned_items
    |> Enum.filter(&amp;is_integer(&amp;1.created))
    |> Enum.map(fn wi ->
      date = DateTime.from_unix!(div(wi.created, 1000)) |> DateTime.to_date()
      week = Date.beginning_of_week(date, :monday) |> Date.to_iso8601()
      %{week: week, issue_id: wi.issue_id}
    end)
    |> Enum.uniq_by(&amp;{&amp;1.week, &amp;1.issue_id})
    |> Enum.frequencies_by(&amp; &amp;1.week)
    |> Enum.map(fn {week, count} -> %{week: week, unplanned: count} end)
    |> Enum.sort_by(&amp; &amp;1.week)

  Vl.new(width: 700, height: 300, title: "Unplanned Work Trend (per week)")
  |> Vl.data_from_values(interrupt_trend)
  |> Vl.layers([
    Vl.new()
    |> Vl.mark(:bar, opacity: 0.5, color: "salmon")
    |> Vl.encode_field(:x, "week", type: :temporal, title: "Week")
    |> Vl.encode_field(:y, "unplanned", type: :quantitative, title: "Unplanned Issues"),
    Vl.new()
    |> Vl.mark(:line, color: "red", point: true)
    |> Vl.encode_field(:x, "week", type: :temporal)
    |> Vl.encode_field(:y, "unplanned", type: :quantitative)
  ])
  |> Vl.encode(:tooltip, [
    [field: "week", type: :temporal, title: "Week"],
    [field: "unplanned", type: :quantitative, title: "Unplanned"]
  ])
  |> Kino.VegaLite.new()
else
  Kino.Markdown.new("*No unplanned items found. Make sure the unplanned tag is set correctly.*")
end

Workstream Rotation (Togetherness / Autonomy)

How much do people rotate between workstreams? This section tracks rotation patterns, back-and-forth movements, and time spent in each stream before switching.

Rotation Metrics per Person

rotation_metrics = Rotation.metrics_by_person(work_items)

if length(rotation_metrics) > 0 do
  avg_switches =
    Enum.map(rotation_metrics, &amp; &amp;1.switches)
    |> then(&amp;(Enum.sum(&amp;1) / length(&amp;1)))
    |> Float.round(1)

  avg_tenure =
    Enum.map(rotation_metrics, &amp; &amp;1.avg_tenure_weeks)
    |> then(&amp;(Enum.sum(&amp;1) / length(&amp;1)))
    |> Float.round(1)

  avg_boomerang =
    rotation_metrics
    |> Enum.filter(&amp;(&amp;1.switches > 0))
    |> then(fn
      [] -> 0.0
      filtered -> Enum.map(filtered, &amp; &amp;1.boomerang_rate) |> then(&amp;(Enum.sum(&amp;1) / length(&amp;1))) |> Float.round(1)
    end)

  Kino.render(Kino.Markdown.new("""
  ### Rotation Summary
  - **Avg stream switches per person:** #{avg_switches}
  - **Avg tenure before switching:** #{avg_tenure} weeks
  - **Avg boomerang rate:** #{avg_boomerang}% (switches back to a previously worked stream)
  """))

  # Switches per person chart
  switch_data =
    rotation_metrics
    |> Enum.map(fn m ->
      %{person: m.person, switches: m.switches, unique_streams: m.unique_streams, boomerang_rate: m.boomerang_rate}
    end)

  Kino.render(
    Vl.new(width: 600, height: 300, title: "Stream Switches per Person")
    |> Vl.data_from_values(switch_data)
    |> Vl.mark(:bar, tooltip: true)
    |> Vl.encode_field(:x, "person", type: :nominal, title: "Person", sort: "-y")
    |> Vl.encode_field(:y, "switches", type: :quantitative, title: "Stream Switches")
    |> Vl.encode_field(:color, "boomerang_rate",
      type: :quantitative,
      scale: [scheme: "purples"],
      title: "Boomerang %"
    )
    |> Vl.encode(:tooltip, [
      [field: "person", type: :nominal, title: "Person"],
      [field: "switches", type: :quantitative, title: "Switches"],
      [field: "unique_streams", type: :quantitative, title: "Unique Streams"],
      [field: "boomerang_rate", type: :quantitative, title: "Boomerang %"]
    ])
    |> Kino.VegaLite.new()
  )

  # Tenure chart
  tenure_data =
    rotation_metrics
    |> Enum.map(fn m ->
      %{person: m.person, avg_tenure_weeks: m.avg_tenure_weeks, unique_streams: m.unique_streams}
    end)

  Kino.render(
    Vl.new(width: 600, height: 300, title: "Average Tenure per Stream (weeks)")
    |> Vl.data_from_values(tenure_data)
    |> Vl.mark(:bar, tooltip: true, color: "mediumpurple")
    |> Vl.encode_field(:x, "person", type: :nominal, title: "Person", sort: "-y")
    |> Vl.encode_field(:y, "avg_tenure_weeks", type: :quantitative, title: "Avg Weeks on Same Stream")
    |> Vl.encode(:tooltip, [
      [field: "person", type: :nominal, title: "Person"],
      [field: "avg_tenure_weeks", type: :quantitative, title: "Avg Tenure (weeks)"],
      [field: "unique_streams", type: :quantitative, title: "Unique Streams"]
    ])
    |> Kino.VegaLite.new()
  )

  # Journey table
  journey_data =
    rotation_metrics
    |> Enum.map(fn m ->
      %{
        person: m.person,
        journey: m.journey,
        switches: m.switches,
        unique_streams: m.unique_streams,
        boomerang_rate: m.boomerang_rate,
        avg_tenure_weeks: m.avg_tenure_weeks,
        total_weeks: m.total_weeks
      }
    end)

  Kino.DataTable.new(journey_data, name: "Rotation Journeys")
else
  Kino.Markdown.new("*No rotation data available.*")
end

Workstream Heatmap per Person per Week

A heatmap showing which streams each person worked on each week — reveals focus, fragmentation, and rotation patterns at a glance.

pws_data = Rotation.person_week_stream(work_items)

if length(pws_data) > 0 do
  Vl.new(width: 700, height: 400, title: "Person × Week: Workstream Activity")
  |> Vl.data_from_values(pws_data)
  |> Vl.mark(:rect, tooltip: true)
  |> Vl.encode_field(:x, "week", type: :temporal, title: "Week")
  |> Vl.encode_field(:y, "person", type: :nominal, title: "Person")
  |> Vl.encode_field(:color, "stream", type: :nominal, title: "Workstream")
  |> Vl.encode(:tooltip, [
    [field: "person", type: :nominal, title: "Person"],
    [field: "week", type: :temporal, title: "Week"],
    [field: "stream", type: :nominal, title: "Stream"],
    [field: "item_count", type: :quantitative, title: "Items"]
  ])
  |> Kino.VegaLite.new()
else
  Kino.Markdown.new("*No data for workstream heatmap.*")
end

Stream Tenure by Person

How long does each person stay in a stream before switching? Identifies who is deeply invested vs. spreading thin.

tenure_data_detail = Rotation.stream_tenure(work_items)

if length(tenure_data_detail) > 0 do
  Kino.render(
    Vl.new(width: 700, height: 400, title: "Stream Tenure: Total Weeks per Person per Stream")
    |> Vl.data_from_values(tenure_data_detail)
    |> Vl.mark(:bar, tooltip: true)
    |> Vl.encode_field(:x, "person", type: :nominal, title: "Person")
    |> Vl.encode_field(:y, "total_weeks", type: :quantitative, title: "Total Weeks", stack: :zero)
    |> Vl.encode_field(:color, "stream", type: :nominal, title: "Workstream")
    |> Vl.encode(:tooltip, [
      [field: "person", type: :nominal, title: "Person"],
      [field: "stream", type: :nominal, title: "Stream"],
      [field: "total_weeks", type: :quantitative, title: "Total Weeks"],
      [field: "stints", type: :quantitative, title: "Separate Stints"],
      [field: "avg_stint_weeks", type: :quantitative, title: "Avg Stint (weeks)"]
    ])
    |> Kino.VegaLite.new()
  )

  Kino.DataTable.new(tenure_data_detail, name: "Stream Tenure Details")
else
  Kino.Markdown.new("*No tenure data available.*")
end

Summary Scorecard

A PETALS-oriented summary of the key metrics.

total_items = length(work_items)
finished_count = Enum.count(work_items, &amp;(&amp;1.status == "finished"))
ongoing_count = Enum.count(work_items, &amp;(&amp;1.status == "ongoing"))
unplanned_count = Enum.count(work_items, &amp; &amp;1.is_unplanned)

unique_people = work_items |> Enum.map(&amp; &amp;1.person_login) |> Enum.uniq() |> length()

avg_wip =
  if length(wip_by_person) > 0 do
    Enum.map(wip_by_person, &amp; &amp;1.wip) |> then(&amp;(Enum.sum(&amp;1) / length(&amp;1))) |> Float.round(1)
  else
    0.0
  end

avg_cycle =
  if length(cycle_time_data) > 0 do
    Enum.map(cycle_time_data, &amp; &amp;1.cycle_days) |> then(&amp;(Enum.sum(&amp;1) / length(&amp;1))) |> Float.round(1)
  else
    nil
  end

min_bus = if length(bus_factor_data) > 0, do: hd(bus_factor_data).bus_factor, else: nil

unplanned_pct = if total_items > 0, do: Float.round(unplanned_count / total_items * 100, 1), else: 0.0

rework_pct_val =
  if use_activities? and map_size(issue_activities) > 0 do
    rework_counts_final = Rework.count_by_issue(issue_activities, state_field, done_state_names)
    total_unique = issues |> Enum.uniq_by(&amp; &amp;1["id"]) |> length()
    if total_unique > 0, do: Float.round(map_size(rework_counts_final) / total_unique * 100, 1), else: 0.0
  else
    nil
  end

rotation_avg_switches =
  if length(rotation_metrics) > 0 do
    Enum.map(rotation_metrics, &amp; &amp;1.switches) |> then(&amp;(Enum.sum(&amp;1) / length(&amp;1))) |> Float.round(1)
  else
    nil
  end

rotation_avg_tenure =
  if length(rotation_metrics) > 0 do
    Enum.map(rotation_metrics, &amp; &amp;1.avg_tenure_weeks) |> then(&amp;(Enum.sum(&amp;1) / length(&amp;1))) |> Float.round(1)
  else
    nil
  end

Kino.Markdown.new("""
## PETALS Scorecard

| Dimension | Metric | Value |
|-----------|--------|-------|
| **Progress** | Finished items | #{finished_count} / #{total_items} |
| **Progress** | Avg cycle time | #{avg_cycle || "N/A"} days |
| **Progress** | Rework rate | #{rework_pct_val || "N/A (enable activities)"}% |
| **Energy** | Avg WIP/person | #{avg_wip} items |
| **Energy** | Ongoing items | #{ongoing_count} |
| **Togetherness** | Team size | #{unique_people} |
| **Togetherness** | Min bus factor | #{min_bus || "N/A"} |
| **Togetherness** | Avg stream switches | #{rotation_avg_switches || "N/A"} |
| **Togetherness** | Avg stream tenure | #{rotation_avg_tenure || "N/A"} weeks |
| **Autonomy** | Unplanned % | #{unplanned_pct}% |
""")