Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Glides Terminal Missed/Extraneous Predictions

glides_terminal_missed_extraneous_predictions.livemd

Glides Terminal Missed/Extraneous Predictions

project_dir = __DIR__ |> Path.join("..") |> Path.expand()

Mix.install(
  [
    {:kino, "~> 0.12.0"},
    {:transit_data, path: project_dir},
    # transit_data needs a timezone DB for some date-related logic.
    {:tz, "~> 0.26.5"},
  ],
  config: [
    elixir: [time_zone_database: Tz.TimeZoneDatabase],
    ex_aws: [
      access_key_id: System.get_env("LB_AWS_ACCESS_KEY_ID"),
      secret_access_key: System.get_env("LB_AWS_SECRET_ACCESS_KEY"),
      region: "us-east-1"
    ]
  ]
)

alias TransitData.GlidesReport

Kino.nothing()

Instructions

This notebook provides an implementation of Rethinking Prediction Accuracy for Glides.

The notebook requires AWS credentials for a user with access to the mbta-gtfs-s3 family of S3 buckets.

How to add your AWS credentials

  1. Open your Hub. This link should send you there.
  2. Under “Secrets”, add two secrets with the following names:
    • AWS_ACCESS_KEY_ID
    • AWS_SECRET_ACCESS_KEY
  3. Return to this notebook. Click the 🔒 icon in the left sidebar and toggle on both secrets.

Generating a report

Evaluate code cells from top to bottom.

Some cells produce controls that let you adjust how the report runs.

Cells that generate inputs / outputs are marked with a “👉”—you can skip to these, and when you evaluate them, any intermediate cells will automatically evaluate as well.

Tip: If you want to generate another report with different settings, simply change the settings, scroll back down to Results, and click “Evaluate” again.

Setup

use_advanced_date? = Kino.Shorts.read_checkbox("Use advanced date/time input?")
Kino.nothing()

Simple: Choose a date. The report will analyze the full service day on that date.

Advanced: Choose a start date+time and end date+time. The report will analyze within that window only.

👉 What data do you want to load?

env_input =
  Kino.Input.select("Environment", [
    {"", "prod"},
    {"-dev-blue", "dev-blue"},
    {"-dev-green", "dev-green"},
    {"-dev", "dev"},
    {"-sandbox", "sandbox"}
  ])

# DateTimes are cached in the process dictionary to prevent the input's default
# value from changing (and the input from resetting to that default) every time this cell re-runs.
process_get = fn key, default_fn ->
  case Process.get(key, nil) do
    nil ->
      value = default_fn.()
      Process.put(key, {:value, value})
      value
    {:value, value} ->
      value
  end
end


today_date =
  process_get.(:today_date, fn ->
      DateTime.utc_now()
      |> DateTime.shift_zone!("America/New_York")
      |> DateTime.to_date()
    end)

yesterday_date = process_get.(:yesterday_date, fn -> Date.add(today_date, -1) end)

# Default setting: Analyze a full service day, starting at 4am yesterday (Eastern) and ending at 4am today (Eastern).
yesterday_start_time =
  process_get.(:yesterday_start_time, fn ->
      DateTime.new!(yesterday_date, ~T[04:00:00], "America/New_York")
      |> DateTime.shift_zone!("Etc/UTC")
      |> DateTime.to_naive()
  end)

today_end_time =
  process_get.(:today_end_time, fn ->
    DateTime.new!(today_date, ~T[03:59:59], "America/New_York")
      |> DateTime.shift_zone!("Etc/UTC")
      |> DateTime.to_naive()
  end)

# Simple date input
date_input = Kino.Input.date("Date", default: yesterday_date, max: today_date)

# Advanced start/end datetime inputs
start_date_input =
  Kino.Input.utc_datetime("Analyze data from...",
    default: yesterday_start_time,
    max: DateTime.utc_now()
  )

end_date_input =
  Kino.Input.utc_datetime("to...", default: today_end_time, max: DateTime.utc_now())

sample_rate_input =
  Kino.Input.range("Sample data at (?)-minute intervals", min: 1, max: 5, step: 1, default: 5)

samples_per_minute_input =
  Kino.Input.number("Take (?) samples per minute - leave blank for ALL", default: 1)

content_map = %{
  true:
    Kino.Layout.grid(
      [
        env_input,
        Kino.Layout.grid(
          [
            start_date_input,
            end_date_input,
            sample_rate_input,
            samples_per_minute_input
          ],
          columns: 2
        )
      ],
      columns: 1
    ),
  false:
    Kino.Layout.grid(
      [
        env_input,
        date_input,
        sample_rate_input,
        samples_per_minute_input
      ],
      columns: 2
    )
}

doc_table_map =
  %{
    true: """
    | Analyze data from... | Start analyzing data from this date/time.
Default is 4am yesterday, Eastern. | | to... | Analyze data up to this date/time.
Default is 3:59am today, Eastern. | """
, false: """ | Date | Service date to analyze data from. A 24-hour period starting at 4am Eastern. | """ } |> Map.new(fn {k, fragment} -> {k, Kino.Markdown.new(""" ### Setting Details | Setting | Details | | ----------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | Environment | Environment to analyze data from. | #{String.trim_trailing(fragment)} | Sample data at (?)-minute intervals | Sets interval at which data is sampled for analysis.
Lower value = more samples and slower report generation. | | Take (?) samples per minute | Sets number of samples to take within each sampled minute.
Higher value = more samples and slower report generation.
Leave blank to analyze *all* data within each sampled minute. (slowest) | """
)} end) frame = Kino.Frame.new(placeholder: false) Kino.Frame.clear(frame) Kino.Frame.append(frame, content_map[use_advanced_date?]) Kino.Frame.append(frame, doc_table_map[use_advanced_date?]) frame

Read inputs.

# Some manual validation:
sample_count = Kino.Input.read(samples_per_minute_input)

if is_number(sample_count) and trunc(sample_count) <= 0 do
  Kino.interrupt!(:error, "Samples per minute must be either blank or a positive integer.")
end

if use_advanced_date? do
  start_dt = Kino.Input.read(start_date_input)
  end_dt = Kino.Input.read(end_date_input)

  if is_nil(start_dt) do
    Kino.interrupt!(:error, "A start date/time must be selected.")
  end

  if is_nil(end_dt) do
    Kino.interrupt!(:error, "An end date/time must be selected.")
  end

  if NaiveDateTime.diff(end_dt, start_dt, :hour) >= 24 do
    Kino.interrupt!(
      :error,
      "Sorry, time windows of 24 hours or more are not yet supported.\n" <>
        "(If this was unexpected, check whether the window includes the autumn DST change.)"
    )
  end

  if NaiveDateTime.diff(end_dt, start_dt) <= 0 do
    Kino.interrupt!(:error, "Start date needs to come before end date.")
  end
else
  date = Kino.Input.read(date_input)

  if is_nil(date) do
    Kino.interrupt!(:error, "A date must be selected.")
  end
end

# Inputs are valid. Drop them into a settings struct.
loader_settings =
  if use_advanced_date? do
    GlidesReport.Settings.Load.new(
      Kino.Input.read(env_input),
      Kino.Input.read(start_date_input) |> DateTime.from_naive!("Etc/UTC"),
      Kino.Input.read(end_date_input) |> DateTime.from_naive!("Etc/UTC"),
      Kino.Input.read(sample_rate_input),
      Kino.Input.read(samples_per_minute_input)
    )
  else
    GlidesReport.Settings.Load.new(
      Kino.Input.read(env_input),
      Kino.Input.read(date_input),
      Kino.Input.read(sample_rate_input),
      Kino.Input.read(samples_per_minute_input)
    )
  end

Load data into memory.

file_counts =
  GlidesReport.Loader.load_data(
    loader_settings.start_dt,
    loader_settings.end_dt,
    loader_settings.env_suffix,
    loader_settings.sample_rate,
    loader_settings.sample_count
  )

IO.puts("Found #{file_counts.local} existing local files.")
IO.puts("Downloaded #{file_counts.downloaded} new files.")

# Uncomment to inspect the ETS tables:
# Kino.Layout.grid([
#   Kino.ETS.new(:TripUpdates),
#   Kino.ETS.new(:VehiclePositions)
# ], columns: 2)
Kino.nothing()

👉 Now, choose how you want to filter the data.

stop_ids_input =
  Kino.Input.select("Stop(s)", GlidesReport.Terminals.all_labeled_stops_and_groups())

limit_to_next_2_predictions_input = Kino.Input.checkbox("Simulate countdown clocks?")

min_advance_notice_input = Kino.Input.number("Minimum advance notice (seconds)")

[
  stop_ids_input,
  limit_to_next_2_predictions_input,
  min_advance_notice_input
]
|> Kino.Layout.grid(columns: 3)

Setting Details

Setting Details
Stop(s) Only analyze data concerning a specific stop, or group of stops.

Note: Terminals that do not have Glides predictions—Heath Street and Ashmont—are ignored regardless of which stop(s) you select.
Simulate countdown clocks? Only consider predictions that would have appeared on countdown clocks—those that were in the next 2 predictions for a stop at some point.
Minimum advance notice (seconds) Only consider predictions created at least this many seconds in advance of the departure time they predict.

For example: If this were set to 60, then a prediction generated at 12:00:00, with departure time predicted for 12:00:59, would be omitted from analysis.

Leave the field blank to disable this filter.

Read inputs.

filter_settings =
  GlidesReport.Settings.Filter.new(
    Kino.Input.read(stop_ids_input),
    Kino.Input.read(limit_to_next_2_predictions_input),
    Kino.Input.read(min_advance_notice_input)
  )

Main procedure

Filter trip updates based on your settings.

alias TransitData.GlidesReport.Departure

top_twos =
  if filter_settings.limit_to_next_2_predictions do
    GlidesReport.CountdownClocksSimulation.get_all_top_two_times(filter_settings.stop_ids)
  else
    nil
  end

trip_updates =
  :TripUpdates
  |> GlidesReport.Util.stream_values()
  |> Stream.map(&amp;GlidesReport.TripUpdate.normalize_stop_ids/1)
  # Filter each trip update's stop_time_update list.
  # If filtered list is empty for any trip update, the trip update is removed entirely.
  |> Stream.map(&amp;GlidesReport.TripUpdate.filter_stops(&amp;1, filter_settings.stop_ids))
  |> Stream.reject(&amp;is_nil/1)
  |> Stream.map(
    &amp;GlidesReport.TripUpdate.filter_by_advance_notice(&amp;1, filter_settings.min_advance_notice_sec)
  )
  |> Stream.reject(&amp;is_nil/1)
  # Split each trip update into its individual stop_time_update items.
  # Note that we use the timestamp of the predicted departure,
  # not the timestamp of when the prediction was generated.
  |> Stream.flat_map(fn tr_upd ->
    Enum.map(
      tr_upd.trip_update.stop_time_update,
      &amp;Departure.new(tr_upd.trip_update.trip.trip_id, &amp;1.stop_id, &amp;1.departure.time)
    )
  end)
  # Apply the "appeared on countdown clocks" filter, if it's enabled.
  |> then(fn stream ->
    if not is_nil(top_twos) do
      Stream.filter(stream, &amp;({&amp;1.stop, &amp;1.timestamp} in top_twos))
    else
      stream
    end
  end)
  |> Enum.to_list()

Kino.nothing()

Filter vehicle positions based on your settings.

vehicle_positions =
  :VehiclePositions
  |> GlidesReport.Util.stream_values()
  |> Stream.map(&amp;GlidesReport.VehiclePosition.normalize_stop_id/1)
  |> Stream.filter(&amp;(&amp;1.vehicle.stop_id in filter_settings.stop_ids))
  |> GlidesReport.VehiclePosition.dedup_statuses()
  |> Stream.map(&amp;Departure.new(&amp;1.vehicle.trip.trip_id, &amp;1.vehicle.stop_id, &amp;1.vehicle.timestamp))
  |> Enum.to_list()

Kino.nothing()

Compute sets of predicted departure times and sets of actual departure times,

predictions_by_hour = Enum.group_by(trip_updates, &amp;(&amp;1.hour))
departures_by_hour = Enum.group_by(vehicle_positions, &amp;(&amp;1.hour))

#################################################
# Departure/prediction metrics by stop and time #
# (Gives the rider's perspective)               #
#################################################
predicted_times_by_hour =
  Map.new(predictions_by_hour, fn {hour, departures} ->
    {hour, MapSet.new(departures, &amp;{&amp;1.stop, &amp;1.minute})}
  end)

actual_times_by_hour =
  Map.new(departures_by_hour, fn {hour, departures} ->
    {hour, MapSet.new(departures, &amp;{&amp;1.stop, &amp;1.minute})}
  end)

predicted_time_percentages_by_hour =
  Map.new(0..23, fn hour ->
    predicted_times = Map.get(predicted_times_by_hour, hour, MapSet.new())
    actual_times = Map.get(actual_times_by_hour, hour, MapSet.new())

    actual_time_count = MapSet.size(actual_times)

    # Number of departure times that were both predicted and actually happened.
    actual_AND_predicted_time_count =
      MapSet.intersection(predicted_times, actual_times)
      |> MapSet.size()

    percentage =
      GlidesReport.Util.format_percent(
        actual_AND_predicted_time_count,
        actual_time_count,
        "N/A (0 actual departures)"
      )

    {hour, percentage}
  end)

#################################################
# Departure/prediction metrics by stop and trip #
# (A more internal performance metric)          #
#################################################
predicted_trips_by_hour =
  Map.new(predictions_by_hour, fn {hour, departures} ->
    {hour, MapSet.new(departures, &amp;{&amp;1.stop, &amp;1.trip})}
  end)

actual_trips_by_hour =
  Map.new(departures_by_hour, fn {hour, departures} ->
    {hour, MapSet.new(departures, &amp;{&amp;1.stop, &amp;1.trip})}
  end)

predicted_trip_percentages_by_hour =
  Map.new(0..23, fn hour ->
    predicted_trips = Map.get(predicted_trips_by_hour, hour, MapSet.new())
    actual_trips = Map.get(actual_trips_by_hour, hour, MapSet.new())

    actual_trip_count = MapSet.size(actual_trips)

    # Number of departures (represented as stop X trip)
    # that were both predicted and actually happened.
    actual_AND_predicted_trip_count =
      MapSet.intersection(predicted_trips, actual_trips)
      |> MapSet.size()

    percentage =
      GlidesReport.Util.format_percent(
        actual_AND_predicted_trip_count,
        actual_trip_count,
        "N/A (0 actual departures)"
      )

    {hour, percentage}
  end)

Kino.nothing()

👉 Results

Per-Hour Counts of trips for which RTR made departure predictions vs. actual departures

Methodology:

  • From VehiclePositions, get all timestamps (truncated to minute) at which a vehicle actually departed a stop.[^1]
  • From TripUpdates, get all timestamps (truncated to minute) at which a vehicle was predicted to depart a stop.[^2]
  • If a vehicle actually departed stop S at the same minute that a vehicle was predicted to depart stop S, then that prediction is considered accurate.
  • Also compute the above, but with predictions and departures compared by trip ID instead of timestamp.

[^1]: There is no “departing stop” vehicle status, so we look for events where the vehicle is “INTRANSIT_TO” or “INCOMING_AT” the stop _after the target stop. [^2]: This is the set of all times at which a vehicle was predicted to depart a stop. If at any moment, even just for a minute, a vehicle was predicted to depart stop S at time T, then that {time, stop} pair is added to the set.

times_table =
  0..23
  # Service day starts at 4am, so let's start the table at that hour.
  |> Enum.map(&amp;rem(&amp;1 + 4, 24))
  |> Enum.map(fn hour ->
    predicted_time_count =
      predicted_times_by_hour
      |> Map.get(hour, MapSet.new())
      |> MapSet.size()

    actual_time_count =
      actual_times_by_hour
      |> Map.get(hour, MapSet.new())
      |> MapSet.size()

    time_percentage = Map.fetch!(predicted_time_percentages_by_hour, hour)

    [
      {"hour", "#{GlidesReport.Util.zero_pad(hour)}:00"},
      {"# of predicted departure times", predicted_time_count},
      {"# of actual departure times", actual_time_count},
      {"% of actual departure times that were also predicted", time_percentage}
    ]
  end)

trips_table =
  0..23
  # Service day starts at 4am, so let's start the table at that hour.
  |> Enum.map(&amp;rem(&amp;1 + 4, 24))
  |> Enum.map(fn hour ->
    predicted_trip_count =
      predicted_trips_by_hour
      |> Map.get(hour, MapSet.new())
      |> MapSet.size()

    actual_trip_count =
      actual_trips_by_hour
      |> Map.get(hour, MapSet.new())
      |> MapSet.size()

    trip_percentage = Map.fetch!(predicted_trip_percentages_by_hour, hour)

    [
      {"hour", "#{GlidesReport.Util.zero_pad(hour)}:00"},
      {"# of predicted departures collapsed by trip", predicted_trip_count},
      {"# of actual departures collapsed by trip", actual_trip_count},
      {"% of predicted departures collapsed by trip", trip_percentage}
    ]
  end)

Kino.Markdown.new("""
### 📣 Please note:

**The last table column is significantly affected by the sample rate / samples-per-minute settings.**

Percentage will increase with more samples taken.

---
""")
|> Kino.render()

times_table_name = "Predicted vs actual departures"

Kino.Download.new(
  fn -> GlidesReport.Util.table_to_csv(times_table) end,
  filename: GlidesReport.Util.build_csv_name(times_table_name, loader_settings, filter_settings),
  label: "Export as CSV"
)
|> Kino.render()

Kino.DataTable.new(times_table, name: times_table_name)
|> Kino.render()

trips_table_name = "Predicted vs actual trips"

Kino.Download.new(
  fn -> GlidesReport.Util.table_to_csv(trips_table) end,
  filename: GlidesReport.Util.build_csv_name(trips_table_name, loader_settings, filter_settings),
  label: "Export as CSV"
)
|> Kino.render()

Kino.DataTable.new(trips_table, name: trips_table_name)