Glides Terminal Missed/Extraneous Predictions
project_dir = __DIR__ |> Path.join("..") |> Path.expand()
Mix.install(
[
{:kino, "~> 0.12.0"},
{:transit_data, path: project_dir},
# transit_data needs a timezone DB for some date-related logic.
{:tz, "~> 0.26.5"},
],
config: [
elixir: [time_zone_database: Tz.TimeZoneDatabase],
ex_aws: [
access_key_id: System.get_env("LB_AWS_ACCESS_KEY_ID"),
secret_access_key: System.get_env("LB_AWS_SECRET_ACCESS_KEY"),
region: "us-east-1"
]
]
)
alias TransitData.GlidesReport
Kino.nothing()
Instructions
This notebook provides an implementation of Rethinking Prediction Accuracy for Glides.
The notebook requires AWS credentials for a user with access to the mbta-gtfs-s3
family of S3 buckets.
- Open your Hub. This link should send you there.
-
Under “Secrets”, add two secrets with the following names:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- Return to this notebook. Click the 🔒 icon in the left sidebar and toggle on both secrets.
Generating a report
Evaluate code cells from top to bottom.
Some cells produce controls that let you adjust how the report runs.
Cells that generate inputs / outputs are marked with a “👉”—you can skip to these, and when you evaluate them, any intermediate cells will automatically evaluate as well.
Tip: If you want to generate another report with different settings, simply change the settings, scroll back down to Results, and click “Evaluate” again.
Setup
use_advanced_date? = Kino.Shorts.read_checkbox("Use advanced date/time input?")
Kino.nothing()
Simple: Choose a date. The report will analyze the full service day on that date.
Advanced: Choose a start date+time and end date+time. The report will analyze within that window only.
👉 What data do you want to load?
env_input =
Kino.Input.select("Environment", [
{"", "prod"},
{"-dev-blue", "dev-blue"},
{"-dev-green", "dev-green"},
{"-dev", "dev"},
{"-sandbox", "sandbox"}
])
# DateTimes are cached in the process dictionary to prevent the input's default
# value from changing (and the input from resetting to that default) every time this cell re-runs.
process_get = fn key, default_fn ->
case Process.get(key, nil) do
nil ->
value = default_fn.()
Process.put(key, {:value, value})
value
{:value, value} ->
value
end
end
today_date =
process_get.(:today_date, fn ->
DateTime.utc_now()
|> DateTime.shift_zone!("America/New_York")
|> DateTime.to_date()
end)
yesterday_date = process_get.(:yesterday_date, fn -> Date.add(today_date, -1) end)
# Default setting: Analyze a full service day, starting at 4am yesterday (Eastern) and ending at 4am today (Eastern).
yesterday_start_time =
process_get.(:yesterday_start_time, fn ->
DateTime.new!(yesterday_date, ~T[04:00:00], "America/New_York")
|> DateTime.shift_zone!("Etc/UTC")
|> DateTime.to_naive()
end)
today_end_time =
process_get.(:today_end_time, fn ->
DateTime.new!(today_date, ~T[03:59:59], "America/New_York")
|> DateTime.shift_zone!("Etc/UTC")
|> DateTime.to_naive()
end)
# Simple date input
date_input = Kino.Input.date("Date", default: yesterday_date, max: today_date)
# Advanced start/end datetime inputs
start_date_input =
Kino.Input.utc_datetime("Analyze data from...",
default: yesterday_start_time,
max: DateTime.utc_now()
)
end_date_input =
Kino.Input.utc_datetime("to...", default: today_end_time, max: DateTime.utc_now())
sample_rate_input =
Kino.Input.range("Sample data at (?)-minute intervals", min: 1, max: 5, step: 1, default: 5)
samples_per_minute_input =
Kino.Input.number("Take (?) samples per minute - leave blank for ALL", default: 1)
content_map = %{
true:
Kino.Layout.grid(
[
env_input,
Kino.Layout.grid(
[
start_date_input,
end_date_input,
sample_rate_input,
samples_per_minute_input
],
columns: 2
)
],
columns: 1
),
false:
Kino.Layout.grid(
[
env_input,
date_input,
sample_rate_input,
samples_per_minute_input
],
columns: 2
)
}
doc_table_map =
%{
true: """
| Analyze data from... | Start analyzing data from this date/time.
Default is 4am yesterday, Eastern. |
| to... | Analyze data up to this date/time.
Default is 3:59am today, Eastern. |
""",
false: """
| Date | Service date to analyze data from. A 24-hour period starting at 4am Eastern. |
"""
}
|> Map.new(fn {k, fragment} ->
{k,
Kino.Markdown.new("""
### Setting Details
| Setting | Details |
| ----------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Environment | Environment to analyze data from. |
#{String.trim_trailing(fragment)}
| Sample data at (?)-minute intervals | Sets interval at which data is sampled for analysis.
Lower value = more samples and slower report generation. |
| Take (?) samples per minute | Sets number of samples to take within each sampled minute.
Higher value = more samples and slower report generation.
Leave blank to analyze *all* data within each sampled minute. (slowest) |
""")}
end)
frame = Kino.Frame.new(placeholder: false)
Kino.Frame.clear(frame)
Kino.Frame.append(frame, content_map[use_advanced_date?])
Kino.Frame.append(frame, doc_table_map[use_advanced_date?])
frame
Read inputs.
# Some manual validation:
sample_count = Kino.Input.read(samples_per_minute_input)
if is_number(sample_count) and trunc(sample_count) <= 0 do
Kino.interrupt!(:error, "Samples per minute must be either blank or a positive integer.")
end
if use_advanced_date? do
start_dt = Kino.Input.read(start_date_input)
end_dt = Kino.Input.read(end_date_input)
if is_nil(start_dt) do
Kino.interrupt!(:error, "A start date/time must be selected.")
end
if is_nil(end_dt) do
Kino.interrupt!(:error, "An end date/time must be selected.")
end
if NaiveDateTime.diff(end_dt, start_dt, :hour) >= 24 do
Kino.interrupt!(
:error,
"Sorry, time windows of 24 hours or more are not yet supported.\n" <>
"(If this was unexpected, check whether the window includes the autumn DST change.)"
)
end
if NaiveDateTime.diff(end_dt, start_dt) <= 0 do
Kino.interrupt!(:error, "Start date needs to come before end date.")
end
else
date = Kino.Input.read(date_input)
if is_nil(date) do
Kino.interrupt!(:error, "A date must be selected.")
end
end
# Inputs are valid. Drop them into a settings struct.
loader_settings =
if use_advanced_date? do
GlidesReport.Settings.Load.new(
Kino.Input.read(env_input),
Kino.Input.read(start_date_input) |> DateTime.from_naive!("Etc/UTC"),
Kino.Input.read(end_date_input) |> DateTime.from_naive!("Etc/UTC"),
Kino.Input.read(sample_rate_input),
Kino.Input.read(samples_per_minute_input)
)
else
GlidesReport.Settings.Load.new(
Kino.Input.read(env_input),
Kino.Input.read(date_input),
Kino.Input.read(sample_rate_input),
Kino.Input.read(samples_per_minute_input)
)
end
Load data into memory.
file_counts =
GlidesReport.Loader.load_data(
loader_settings.start_dt,
loader_settings.end_dt,
loader_settings.env_suffix,
loader_settings.sample_rate,
loader_settings.sample_count
)
IO.puts("Found #{file_counts.local} existing local files.")
IO.puts("Downloaded #{file_counts.downloaded} new files.")
# Uncomment to inspect the ETS tables:
# Kino.Layout.grid([
# Kino.ETS.new(:TripUpdates),
# Kino.ETS.new(:VehiclePositions)
# ], columns: 2)
Kino.nothing()
👉 Now, choose how you want to filter the data.
stop_ids_input =
Kino.Input.select("Stop(s)", GlidesReport.Terminals.all_labeled_stops_and_groups())
limit_to_next_2_predictions_input = Kino.Input.checkbox("Simulate countdown clocks?")
min_advance_notice_input = Kino.Input.number("Minimum advance notice (seconds)")
[
stop_ids_input,
limit_to_next_2_predictions_input,
min_advance_notice_input
]
|> Kino.Layout.grid(columns: 3)
Setting Details
Setting | Details |
---|---|
Stop(s) |
Only analyze data concerning a specific stop, or group of stops. Note: Terminals that do not have Glides predictions—Heath Street and Ashmont—are ignored regardless of which stop(s) you select. |
Simulate countdown clocks? | Only consider predictions that would have appeared on countdown clocks—those that were in the next 2 predictions for a stop at some point. |
Minimum advance notice (seconds) |
Only consider predictions created at least this many seconds in advance of the departure time they predict. For example: If this were set to 60 , then a prediction generated at 12:00:00, with departure time predicted for 12:00:59, would be omitted from analysis.Leave the field blank to disable this filter. |
Read inputs.
filter_settings =
GlidesReport.Settings.Filter.new(
Kino.Input.read(stop_ids_input),
Kino.Input.read(limit_to_next_2_predictions_input),
Kino.Input.read(min_advance_notice_input)
)
Main procedure
Filter trip updates based on your settings.
alias TransitData.GlidesReport.Departure
top_twos =
if filter_settings.limit_to_next_2_predictions do
GlidesReport.CountdownClocksSimulation.get_all_top_two_times(filter_settings.stop_ids)
else
nil
end
trip_updates =
:TripUpdates
|> GlidesReport.Util.stream_values()
|> Stream.map(&GlidesReport.TripUpdate.normalize_stop_ids/1)
# Filter each trip update's stop_time_update list.
# If filtered list is empty for any trip update, the trip update is removed entirely.
|> Stream.map(&GlidesReport.TripUpdate.filter_stops(&1, filter_settings.stop_ids))
|> Stream.reject(&is_nil/1)
|> Stream.map(
&GlidesReport.TripUpdate.filter_by_advance_notice(&1, filter_settings.min_advance_notice_sec)
)
|> Stream.reject(&is_nil/1)
# Split each trip update into its individual stop_time_update items.
# Note that we use the timestamp of the predicted departure,
# not the timestamp of when the prediction was generated.
|> Stream.flat_map(fn tr_upd ->
Enum.map(
tr_upd.trip_update.stop_time_update,
&Departure.new(tr_upd.trip_update.trip.trip_id, &1.stop_id, &1.departure.time)
)
end)
# Apply the "appeared on countdown clocks" filter, if it's enabled.
|> then(fn stream ->
if not is_nil(top_twos) do
Stream.filter(stream, &({&1.stop, &1.timestamp} in top_twos))
else
stream
end
end)
|> Enum.to_list()
Kino.nothing()
Filter vehicle positions based on your settings.
vehicle_positions =
:VehiclePositions
|> GlidesReport.Util.stream_values()
|> Stream.map(&GlidesReport.VehiclePosition.normalize_stop_id/1)
|> Stream.filter(&(&1.vehicle.stop_id in filter_settings.stop_ids))
|> GlidesReport.VehiclePosition.dedup_statuses()
|> Stream.map(&Departure.new(&1.vehicle.trip.trip_id, &1.vehicle.stop_id, &1.vehicle.timestamp))
|> Enum.to_list()
Kino.nothing()
Compute sets of predicted departure times and sets of actual departure times,
predictions_by_hour = Enum.group_by(trip_updates, &(&1.hour))
departures_by_hour = Enum.group_by(vehicle_positions, &(&1.hour))
#################################################
# Departure/prediction metrics by stop and time #
# (Gives the rider's perspective) #
#################################################
predicted_times_by_hour =
Map.new(predictions_by_hour, fn {hour, departures} ->
{hour, MapSet.new(departures, &{&1.stop, &1.minute})}
end)
actual_times_by_hour =
Map.new(departures_by_hour, fn {hour, departures} ->
{hour, MapSet.new(departures, &{&1.stop, &1.minute})}
end)
predicted_time_percentages_by_hour =
Map.new(0..23, fn hour ->
predicted_times = Map.get(predicted_times_by_hour, hour, MapSet.new())
actual_times = Map.get(actual_times_by_hour, hour, MapSet.new())
actual_time_count = MapSet.size(actual_times)
# Number of departure times that were both predicted and actually happened.
actual_AND_predicted_time_count =
MapSet.intersection(predicted_times, actual_times)
|> MapSet.size()
percentage =
GlidesReport.Util.format_percent(
actual_AND_predicted_time_count,
actual_time_count,
"N/A (0 actual departures)"
)
{hour, percentage}
end)
#################################################
# Departure/prediction metrics by stop and trip #
# (A more internal performance metric) #
#################################################
predicted_trips_by_hour =
Map.new(predictions_by_hour, fn {hour, departures} ->
{hour, MapSet.new(departures, &{&1.stop, &1.trip})}
end)
actual_trips_by_hour =
Map.new(departures_by_hour, fn {hour, departures} ->
{hour, MapSet.new(departures, &{&1.stop, &1.trip})}
end)
predicted_trip_percentages_by_hour =
Map.new(0..23, fn hour ->
predicted_trips = Map.get(predicted_trips_by_hour, hour, MapSet.new())
actual_trips = Map.get(actual_trips_by_hour, hour, MapSet.new())
actual_trip_count = MapSet.size(actual_trips)
# Number of departures (represented as stop X trip)
# that were both predicted and actually happened.
actual_AND_predicted_trip_count =
MapSet.intersection(predicted_trips, actual_trips)
|> MapSet.size()
percentage =
GlidesReport.Util.format_percent(
actual_AND_predicted_trip_count,
actual_trip_count,
"N/A (0 actual departures)"
)
{hour, percentage}
end)
Kino.nothing()
👉 Results
Per-Hour Counts of trips for which RTR made departure predictions vs. actual departures
Methodology:
- From VehiclePositions, get all timestamps (truncated to minute) at which a vehicle actually departed a stop.[^1]
- From TripUpdates, get all timestamps (truncated to minute) at which a vehicle was predicted to depart a stop.[^2]
- If a vehicle actually departed stop S at the same minute that a vehicle was predicted to depart stop S, then that prediction is considered accurate.
- Also compute the above, but with predictions and departures compared by trip ID instead of timestamp.
[^1]: There is no “departing stop” vehicle status, so we look for events where the vehicle is “INTRANSIT_TO” or “INCOMING_AT” the stop _after the target stop.
[^2]: This is the set of all times at which a vehicle was predicted to depart a stop. If at any moment, even just for a minute, a vehicle was predicted to depart stop S at time T, then that {time, stop}
pair is added to the set.
times_table =
0..23
# Service day starts at 4am, so let's start the table at that hour.
|> Enum.map(&rem(&1 + 4, 24))
|> Enum.map(fn hour ->
predicted_time_count =
predicted_times_by_hour
|> Map.get(hour, MapSet.new())
|> MapSet.size()
actual_time_count =
actual_times_by_hour
|> Map.get(hour, MapSet.new())
|> MapSet.size()
time_percentage = Map.fetch!(predicted_time_percentages_by_hour, hour)
[
{"hour", "#{GlidesReport.Util.zero_pad(hour)}:00"},
{"# of predicted departure times", predicted_time_count},
{"# of actual departure times", actual_time_count},
{"% of actual departure times that were also predicted", time_percentage}
]
end)
trips_table =
0..23
# Service day starts at 4am, so let's start the table at that hour.
|> Enum.map(&rem(&1 + 4, 24))
|> Enum.map(fn hour ->
predicted_trip_count =
predicted_trips_by_hour
|> Map.get(hour, MapSet.new())
|> MapSet.size()
actual_trip_count =
actual_trips_by_hour
|> Map.get(hour, MapSet.new())
|> MapSet.size()
trip_percentage = Map.fetch!(predicted_trip_percentages_by_hour, hour)
[
{"hour", "#{GlidesReport.Util.zero_pad(hour)}:00"},
{"# of predicted departures collapsed by trip", predicted_trip_count},
{"# of actual departures collapsed by trip", actual_trip_count},
{"% of predicted departures collapsed by trip", trip_percentage}
]
end)
Kino.Markdown.new("""
### 📣 Please note:
**The last table column is significantly affected by the sample rate / samples-per-minute settings.**
Percentage will increase with more samples taken.
---
""")
|> Kino.render()
times_table_name = "Predicted vs actual departures"
Kino.Download.new(
fn -> GlidesReport.Util.table_to_csv(times_table) end,
filename: GlidesReport.Util.build_csv_name(times_table_name, loader_settings, filter_settings),
label: "Export as CSV"
)
|> Kino.render()
Kino.DataTable.new(times_table, name: times_table_name)
|> Kino.render()
trips_table_name = "Predicted vs actual trips"
Kino.Download.new(
fn -> GlidesReport.Util.table_to_csv(trips_table) end,
filename: GlidesReport.Util.build_csv_name(trips_table_name, loader_settings, filter_settings),
label: "Export as CSV"
)
|> Kino.render()
Kino.DataTable.new(trips_table, name: trips_table_name)