Glides Terminal Missed/Extraneous Predictions
project_dir = __DIR__ |> Path.join("..") |> Path.expand()
Mix.install(
[
{:kino, "~> 0.14.2"},
{:transit_data, path: project_dir},
# transit_data needs a timezone DB for some date-related logic.
{:tz, "~> 0.26.5"},
],
config: [
elixir: [time_zone_database: Tz.TimeZoneDatabase],
ex_aws: [
access_key_id: System.get_env("LB_AWS_ACCESS_KEY_ID"),
secret_access_key: System.get_env("LB_AWS_SECRET_ACCESS_KEY"),
region: "us-east-1"
]
]
)
alias TransitData.GlidesReport
Kino.nothing()
Instructions
This notebook provides an implementation of Rethinking Prediction Accuracy for Glides.
The notebook requires AWS credentials for a user with access to the mbta-gtfs-s3
family of S3 buckets.
- Open your Hub. This link should send you there.
-
Under “Secrets”, add two secrets with the following names:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- Return to this notebook. Click the 🔒 icon in the left sidebar and toggle on both secrets.
Generating a report
Evaluate code cells from top to bottom.
Some cells produce controls that let you adjust how the report runs.
Cells that generate inputs / outputs are marked with a “👉”—you can skip to these, and when you evaluate them, any intermediate cells will automatically evaluate as well.
Tip: If you want to generate another report with different settings, simply change the settings, scroll back down to Results, and click “Evaluate” again.
Setup
👉 What data do you want to load?
env_input =
Kino.Input.select("Environment", [
{"", "prod"},
{"-dev-blue", "dev-blue"},
{"-dev-green", "dev-green"},
{"-dev", "dev"},
{"-sandbox", "sandbox"}
])
# Default setting: Analyze a full service day, starting at 4am yesterday (Eastern) and ending at 4am today (Eastern).
today =
DateTime.utc_now()
|> DateTime.shift_zone!("America/New_York")
|> DateTime.to_date()
yesterday = Date.add(today, -1)
start_date_input = Kino.Input.date("Analyze data from...", default: yesterday, max: today)
end_date_input = Kino.Input.date("to...", default: yesterday, max: today)
sample_rate_input =
Kino.Input.range("Sample data at (?)-minute intervals", min: 1, max: 5, step: 1, default: 1)
samples_per_minute_input =
Kino.Input.number("Take (?) samples per minute - leave blank for ALL", default: nil)
Kino.Layout.grid(
[
env_input,
Kino.Layout.grid(
[
start_date_input,
end_date_input,
sample_rate_input,
samples_per_minute_input
],
columns: 2
)
],
columns: 1
)
Setting Details
Setting | Details |
---|---|
Environment | Environment to analyze data from. |
Analyze data from… |
First service date to analyze. (Service dates start at 4am.) |
to… | Last service date to analyze. |
Sample data at (?)-minute intervals |
Sets interval at which data is sampled for analysis. Lower value = more samples and slower report generation. |
Take (?) samples per minute |
Sets number of samples to take within each sampled minute. Higher value = more samples and slower report generation. Leave blank to analyze all data within each sampled minute. (slowest) |
Read inputs.
# Some manual validation:
sample_count = Kino.Input.read(samples_per_minute_input)
if is_number(sample_count) and trunc(sample_count) <= 0 do
Kino.interrupt!(:error, "Samples per minute must be either blank or a positive integer.")
end
start_d = Kino.Input.read(start_date_input)
end_d = Kino.Input.read(end_date_input)
if is_nil(start_d) do
Kino.interrupt!(:error, "A start date must be selected.")
end
if is_nil(end_d) do
Kino.interrupt!(:error, "An end date must be selected.")
end
if Date.compare(start_d, end_d) == :gt do
Kino.interrupt!(:error, "Start date cannot be after end date.")
end
# Inputs are valid. Drop them into a settings struct.
loader_settings =
GlidesReport.Settings.Load.new(
Kino.Input.read(env_input),
Kino.Input.read(start_date_input),
Kino.Input.read(end_date_input),
Kino.Input.read(sample_rate_input),
Kino.Input.read(samples_per_minute_input)
)
Load data into memory.
file_counts =
GlidesReport.Loader.load_data(
loader_settings.start_date,
loader_settings.end_date,
loader_settings.env_suffix,
loader_settings.sample_rate,
loader_settings.sample_count
)
IO.puts("Found #{file_counts.local} existing local files.")
IO.puts("Downloaded #{file_counts.downloaded} new files.")
# Uncomment to inspect the ETS tables:
# Kino.Layout.grid([
# Kino.ETS.new(:TripUpdates),
# Kino.ETS.new(:VehiclePositions)
# ], columns: 2)
Kino.nothing()
👉 Now, choose how you want to filter the data.
{terminals, groups} = GlidesReport.Terminal.all_labeled_terminals_and_groups()
terminal_ids_input = TransitData.GlidesReport.TerminalSelection.new(terminals, groups)
limit_to_next_2_predictions_input = Kino.Input.checkbox("Simulate countdown clocks?")
min_advance_notice_input = Kino.Input.number("Minimum advance notice (seconds)", default: 1)
[
terminal_ids_input,
limit_to_next_2_predictions_input,
min_advance_notice_input
]
|> Kino.Layout.grid(columns: 3)
Setting Details
Setting | Details |
---|---|
Stop(s) | Only analyze data concerning these terminal stops. |
Simulate countdown clocks? | Only consider predictions that would have appeared on countdown clocks—those that were in the next 2 predictions for a stop at some point. |
Minimum advance notice (seconds) |
Only consider predictions created at least this many seconds in advance of the departure time they predict. For example: If this were set to 60 , then:
|
Read inputs.
if Enum.empty?(GlidesReport.TerminalSelection.get_checked_terminals(terminal_ids_input)) do
Kino.interrupt!(:error, "Please select at least one stop.")
end
filter_settings =
GlidesReport.Settings.Filter.new(
TransitData.GlidesReport.TerminalSelection.get_checked_terminals(terminal_ids_input),
Kino.Input.read(limit_to_next_2_predictions_input),
Kino.Input.read(min_advance_notice_input)
)
Main procedure
Filter trip updates based on your settings.
alias TransitData.GlidesReport.Departure
top_twos =
if filter_settings.limit_to_next_2_predictions do
GlidesReport.CountdownClocksSimulation.get_all_top_two_times(filter_settings.terminal_ids)
else
nil
end
trip_updates =
:TripUpdates
|> GlidesReport.Util.stream_values()
|> Stream.map(&GlidesReport.TripUpdate.normalize_stop_ids/1)
# Filter each trip update's stop_time_update list.
# If filtered list is empty for any trip update, the trip update is removed entirely.
|> Stream.map(&GlidesReport.TripUpdate.filter_terminals(&1, filter_settings.terminal_ids))
|> Stream.reject(&is_nil/1)
|> Stream.map(
&GlidesReport.TripUpdate.filter_by_advance_notice(&1, filter_settings.min_advance_notice_sec)
)
|> Stream.reject(&is_nil/1)
# Split each trip update into its individual stop_time_update items.
# Note that we use the timestamp of the predicted departure,
# not the timestamp of when the prediction was generated.
|> Stream.flat_map(fn tr_upd ->
Enum.map(
tr_upd.trip_update.stop_time_update,
&Departure.new(tr_upd.trip_update.trip.trip_id, &1.terminal_id, &1.departure.time)
)
end)
# Apply the "appeared on countdown clocks" filter, if it's enabled.
|> then(fn stream ->
if not is_nil(top_twos) do
Stream.filter(stream, &({&1.terminal, &1.timestamp} in top_twos))
else
stream
end
end)
|> Enum.to_list()
Kino.nothing()
Filter vehicle positions based on your settings.
vehicle_positions =
:VehiclePositions
|> GlidesReport.Util.stream_values()
|> Stream.map(&GlidesReport.VehiclePosition.normalize_stop_id/1)
|> Stream.filter(&(&1.vehicle.terminal_id in filter_settings.terminal_ids))
|> GlidesReport.VehiclePosition.dedup_statuses()
|> Stream.map(&Departure.new(&1.vehicle.trip.trip_id, &1.vehicle.terminal_id, &1.vehicle.timestamp))
|> Enum.to_list()
Kino.nothing()
Compute sets of predicted departure times and sets of actual departure times,
trunc_to_hour = fn dt -> %{dt | minute: 0, second: 0, microsecond: {0, 0}} end
predictions_by_hour = Enum.group_by(trip_updates, &trunc_to_hour.(&1.local_dt))
departures_by_hour = Enum.group_by(vehicle_positions, &trunc_to_hour.(&1.local_dt))
#####################################################
# Departure/prediction metrics by terminal and time #
# (Gives the rider's perspective) #
#####################################################
# `hour` from here on means a DateTime struct truncated to the hour.
predicted_times_by_hour =
Map.new(predictions_by_hour, fn {hour, departures} ->
{hour, MapSet.new(departures, &{&1.terminal, &1.local_dt.minute})}
end)
actual_times_by_hour =
Map.new(departures_by_hour, fn {hour, departures} ->
{hour, MapSet.new(departures, &{&1.terminal, &1.local_dt.minute})}
end)
all_hours =
(
start_hour = DateTime.new!(loader_settings.start_date, ~T[04:00:00], "America/New_York")
end_hour = DateTime.new!(Date.shift(loader_settings.end_date, day: 1), ~T[03:00:00], "America/New_York")
start_hour
|> Stream.iterate(&DateTime.shift(&1, hour: 1))
|> Stream.take_while(&(DateTime.compare(&1, end_hour) != :gt))
# They should already be truncated to the hour, but let's just be safe...
|> Enum.map(trunc_to_hour)
)
predicted_time_percentages_by_hour =
Map.new(all_hours, fn hour ->
predicted_times = Map.get(predicted_times_by_hour, hour, MapSet.new())
actual_times = Map.get(actual_times_by_hour, hour, MapSet.new())
actual_time_count = MapSet.size(actual_times)
# Number of departure times that were both predicted and actually happened.
actual_AND_predicted_time_count =
MapSet.intersection(predicted_times, actual_times)
|> MapSet.size()
percentage =
GlidesReport.Util.format_percent(
actual_AND_predicted_time_count,
actual_time_count,
"N/A (0 actual departures)"
)
{hour, percentage}
end)
#####################################################
# Departure/prediction metrics by terminal and trip #
# (A more internal performance metric) #
#####################################################
predicted_trips_by_hour =
Map.new(predictions_by_hour, fn {hour, departures} ->
{hour, MapSet.new(departures, &{&1.terminal, &1.trip})}
end)
actual_trips_by_hour =
Map.new(departures_by_hour, fn {hour, departures} ->
{hour, MapSet.new(departures, &{&1.terminal, &1.trip})}
end)
predicted_trip_percentages_by_hour =
Map.new(all_hours, fn hour ->
predicted_trips = Map.get(predicted_trips_by_hour, hour, MapSet.new())
actual_trips = Map.get(actual_trips_by_hour, hour, MapSet.new())
actual_trip_count = MapSet.size(actual_trips)
# Number of departures (each represented as terminal + trip)
# that were both predicted and actually happened.
actual_AND_predicted_trip_count =
MapSet.intersection(predicted_trips, actual_trips)
|> MapSet.size()
percentage =
GlidesReport.Util.format_percent(
actual_AND_predicted_trip_count,
actual_trip_count,
"N/A (0 actual departures)"
)
{hour, percentage}
end)
Kino.nothing()
👉 Results
Per-Hour Counts of trips for which RTR made departure predictions vs. actual departures
Methodology:
- From VehiclePositions, get all timestamps (truncated to minute) at which a vehicle actually departed a stop.[^1]
- From TripUpdates, get all timestamps (truncated to minute) at which a vehicle was predicted to depart a stop.[^2]
- If a vehicle actually departed stop S at the same minute that a vehicle was predicted to depart stop S, then that prediction is considered accurate.
- Also compute the above, but with predictions and departures compared by trip ID instead of timestamp.
[^1]: There is no “departing stop” vehicle status, so we look for events where the vehicle is “INTRANSIT_TO” or “INCOMING_AT” the stop _after the target stop.
[^2]: This is the set of all times at which a vehicle was predicted to depart a stop. If at any moment, even just for a minute, a vehicle was predicted to depart stop S at time T, then that {time, stop}
pair is added to the set.
times_table = Enum.map(all_hours, fn hour ->
predicted_time_count =
predicted_times_by_hour
|> Map.get(hour, MapSet.new())
|> MapSet.size()
actual_time_count =
actual_times_by_hour
|> Map.get(hour, MapSet.new())
|> MapSet.size()
time_percentage = Map.fetch!(predicted_time_percentages_by_hour, hour)
[
{"service day", to_string(GlidesReport.Loader.service_day(hour))},
{"hour", Calendar.strftime(hour, "%H:00")},
{"# of predicted departure times", predicted_time_count},
{"# of actual departure times", actual_time_count},
{"% of actual departure times that were also predicted", time_percentage}
]
end)
trips_table = Enum.map(all_hours, fn hour ->
predicted_trip_count =
predicted_trips_by_hour
|> Map.get(hour, MapSet.new())
|> MapSet.size()
actual_trip_count =
actual_trips_by_hour
|> Map.get(hour, MapSet.new())
|> MapSet.size()
trip_percentage = Map.fetch!(predicted_trip_percentages_by_hour, hour)
[
{"service day", to_string(GlidesReport.Loader.service_day(hour))},
{"hour", Calendar.strftime(hour, "%H:00")},
{"# of predicted departures collapsed by trip", predicted_trip_count},
{"# of actual departures collapsed by trip", actual_trip_count},
{"% of predicted departures collapsed by trip", trip_percentage}
]
end)
times_table_name = "Predicted vs actual departures"
Kino.Download.new(
fn -> GlidesReport.Util.table_to_csv(times_table) end,
filename: GlidesReport.Util.build_csv_name(times_table_name, loader_settings, filter_settings),
label: "Export as CSV"
)
|> Kino.render()
Kino.DataTable.new(times_table, name: times_table_name)
|> Kino.render()
trips_table_name = "Predicted vs actual trips"
Kino.Download.new(
fn -> GlidesReport.Util.table_to_csv(trips_table) end,
filename: GlidesReport.Util.build_csv_name(trips_table_name, loader_settings, filter_settings),
label: "Export as CSV"
)
|> Kino.render()
Kino.DataTable.new(trips_table, name: trips_table_name)
About these tables
Predicted vs actual departures
For this table, predictions and departures were compared only on stop and time–not trip.
Column | Description |
---|---|
service day | Service day analyzed by this row. |
hour | Hour within the service day analyzed by this row. |
# of predicted departure times | Total number of predictions generated for this hour.[^1] |
# of actual departure times | Total number of actual departures that occurred during this hour.[^1] |
% of actual departure times that were also predicted | equal to (number of actual departures that had a matching prediction) / (total number of actual departures) |
Predicted vs actual trips
For this table, predictions and departures were compared only on stop and trip–not time.
Column | Description |
---|---|
service day | Service day analyzed by this row. |
hour | Hour within the service day analyzed by this row. |
# of predicted departures collapsed by trip | Total number of predictions generated for this hour.[^2] |
# of actual departures collapsed by trip | Total number of actual departures that occurred during this hour.[^2] |
% of predicted departures collapsed by trip | equal to (number of actual departures that had a matching prediction) / (total number of actual departures) |
[^1]: Values are compared by (stop, minute)
. Two predictions are considered equivalent, and counted once, if they are for the same stop and minute. Two actual departures are considered equivalent, and counted once, if they are for the same stop and minute.
[^2]: Values are compared by (stop, trip)
. Two predictions are considered equivalent, and counted once, if they are for the same stop and trip. Two actual departures are considered equivalent, and counted once, if they are for the same stop and trip.