Glides Terminal Departure Accuracy
project_dir = __DIR__ |> Path.join("..") |> Path.expand()
Mix.install(
[
{:kino, "~> 0.14.2"},
{:transit_data, path: project_dir},
# transit_data needs a timezone DB for some date-related logic.
{:tz, "~> 0.26.5"},
{:kino_vega_lite, "~> 0.1.11"}
],
config: [
elixir: [time_zone_database: Tz.TimeZoneDatabase],
ex_aws: [
access_key_id: System.get_env("LB_AWS_ACCESS_KEY_ID"),
secret_access_key: System.get_env("LB_AWS_SECRET_ACCESS_KEY"),
region: "us-east-1"
]
]
)
alias TransitData.GlidesReport
Kino.nothing()
Instructions
This notebook provides an implementation of 🚃 Build External Glides Terminal Departure Accuracy Report.
The notebook requires AWS credentials for a user with access to the mbta-gtfs-s3
family of S3 buckets.
- Open your Hub. This link should send you there.
-
Under “Secrets”, add two secrets with the following names:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- Return to this notebook. Click the 🔒 icon in the left sidebar and toggle on both secrets.
Generating a report
Evaluate code cells from top to bottom.
Some cells produce controls that let you adjust how the report runs.
Cells that generate inputs / outputs are marked with a “👉”—you can skip to these, and when you evaluate them, any intermediate cells will automatically evaluate as well.
Tip: If you want to generate another report with different settings, simply change the settings, scroll back down to Results, and click “Evaluate” again.
Setup
👉 What data do you want to load?
env_input =
Kino.Input.select("Environment", [
{"", "prod"},
{"-dev-blue", "dev-blue"},
{"-dev-green", "dev-green"},
{"-dev", "dev"},
{"-sandbox", "sandbox"}
])
# Default setting: Analyze a full service day, starting at 4am yesterday (Eastern) and ending at 4am today (Eastern).
today =
DateTime.utc_now()
|> DateTime.shift_zone!("America/New_York")
|> DateTime.to_date()
yesterday = Date.add(today, -1)
start_date_input = Kino.Input.date("Analyze data from...", default: yesterday, max: today)
end_date_input = Kino.Input.date("to...", default: yesterday, max: today)
sample_rate_input =
Kino.Input.range("Sample data at (?)-minute intervals", min: 1, max: 5, step: 1, default: 1)
samples_per_minute_input =
Kino.Input.number("Take (?) samples per minute - leave blank for ALL", default: nil)
Kino.Layout.grid(
[
env_input,
Kino.Layout.grid(
[
start_date_input,
end_date_input,
sample_rate_input,
samples_per_minute_input
],
columns: 2
)
],
columns: 1
)
Setting Details
Setting | Details |
---|---|
Environment | Environment to analyze data from. |
Analyze data from… |
First service day to analyze. (Service days start at 4am.) |
to… | Last service day to analyze. |
Sample data at (?)-minute intervals |
Sets interval at which data is sampled for analysis. Lower value = more samples and slower report generation. |
Take (?) samples per minute |
Sets number of samples to take within each sampled minute. Higher value = more samples and slower report generation. Leave blank to analyze all data within each sampled minute. (slowest) |
Read inputs.
# Some manual validation:
sample_count = Kino.Input.read(samples_per_minute_input)
if is_number(sample_count) and trunc(sample_count) <= 0 do
Kino.interrupt!(:error, "Samples per minute must be either blank or a positive integer.")
end
start_d = Kino.Input.read(start_date_input)
end_d = Kino.Input.read(end_date_input)
if is_nil(start_d) do
Kino.interrupt!(:error, "A start date must be selected.")
end
if is_nil(end_d) do
Kino.interrupt!(:error, "An end date must be selected.")
end
if Date.compare(start_d, end_d) == :gt do
Kino.interrupt!(:error, "Start date cannot be after end date.")
end
# Inputs are valid. Drop them into a settings struct.
loader_settings =
GlidesReport.Settings.Load.new(
Kino.Input.read(env_input),
Kino.Input.read(start_date_input),
Kino.Input.read(end_date_input),
Kino.Input.read(sample_rate_input),
Kino.Input.read(samples_per_minute_input)
)
Load data into memory.
file_counts =
GlidesReport.Loader.load_data(
loader_settings.start_date,
loader_settings.end_date,
loader_settings.env_suffix,
loader_settings.sample_rate,
loader_settings.sample_count
)
IO.puts("Found #{file_counts.local} existing local files.")
IO.puts("Downloaded #{file_counts.downloaded} new files.")
# Uncomment to inspect the ETS tables:
# Kino.Layout.grid([
# Kino.ETS.new(:TripUpdates),
# Kino.ETS.new(:VehiclePositions)
# ], columns: 2)
Kino.nothing()
👉 Now, choose how you want to filter the data.
{terminals, groups} = GlidesReport.Terminal.all_labeled_terminals_and_groups()
terminal_ids_input = GlidesReport.TerminalSelection.new(terminals, groups)
limit_to_next_2_predictions_input = Kino.Input.checkbox("Simulate countdown clocks?")
min_advance_notice_input = Kino.Input.number("Minimum advance notice (seconds)", default: 1)
[
terminal_ids_input,
limit_to_next_2_predictions_input,
min_advance_notice_input
]
|> Kino.Layout.grid(columns: 3)
Setting Details
Setting | Details |
---|---|
Stop(s) | Only analyze data concerning these terminal stops. |
Simulate countdown clocks? | Only consider predictions that would have appeared on countdown clocks—those that were in the next 2 predictions for a stop at some point. |
Minimum advance notice (seconds) |
Only consider predictions created at least this many seconds in advance of the departure time they predict. For example: If this were set to 60 , then:
|
Read inputs.
if Enum.empty?(GlidesReport.TerminalSelection.get_checked_terminals(terminal_ids_input)) do
Kino.interrupt!(:error, "Please select at least one stop.")
end
filter_settings =
GlidesReport.Settings.Filter.new(
GlidesReport.TerminalSelection.get_checked_terminals(terminal_ids_input),
Kino.Input.read(limit_to_next_2_predictions_input),
Kino.Input.read(min_advance_notice_input)
)
Main procedure
Filter trip updates based on your settings.
alias TransitData.GlidesReport.Departure
top_twos =
if filter_settings.limit_to_next_2_predictions do
GlidesReport.CountdownClocksSimulation.get_all_top_two_times(filter_settings.terminal_ids)
else
nil
end
trip_updates =
:TripUpdates
|> GlidesReport.Util.stream_values()
|> Stream.map(&GlidesReport.TripUpdate.normalize_stop_ids/1)
# Filter each trip update's stop_time_update list.
# If filtered list is empty for any trip update, the trip update is removed entirely.
|> Stream.map(&GlidesReport.TripUpdate.filter_terminals(&1, filter_settings.terminal_ids))
|> Stream.reject(&is_nil/1)
|> Stream.map(
&GlidesReport.TripUpdate.filter_by_advance_notice(&1, filter_settings.min_advance_notice_sec)
)
|> Stream.reject(&is_nil/1)
# Split each trip update into its individual stop_time_update items.
# Note that we use the timestamp of the predicted departure,
# not the timestamp of when the prediction was generated.
|> Stream.flat_map(fn tr_upd ->
Enum.map(
tr_upd.trip_update.stop_time_update,
&Departure.new(tr_upd.trip_update.trip.trip_id, &1.terminal_id, &1.departure.time)
)
end)
# Apply the "appeared on countdown clocks" filter, if it's enabled.
|> then(fn stream ->
if not is_nil(top_twos) do
Stream.filter(stream, &({&1.terminal, &1.timestamp} in top_twos))
else
stream
end
end)
|> Enum.to_list()
Kino.nothing()
Filter vehicle positions based on your settings.
vehicle_positions =
:VehiclePositions
|> GlidesReport.Util.stream_values()
|> Stream.map(&GlidesReport.VehiclePosition.normalize_stop_id/1)
|> Stream.filter(&(&1.vehicle.terminal_id in filter_settings.terminal_ids))
|> GlidesReport.VehiclePosition.dedup_statuses()
|> Stream.map(
&Departure.new(&1.vehicle.trip.trip_id, &1.vehicle.terminal_id, &1.vehicle.timestamp)
)
|> Enum.to_list()
Kino.nothing()
Compute accuracy under a range of bucket sizes. (Or variances
, in the code)
variances = [1, 2, 3, 5, 10]
##################
# OVERALL VALUES #
##################
# %{
# variance => %{
# terminal_id => [actual_departure_range1, actual_departure_range2, ...]
# }
# }
actual_departure_windows_by_variance =
Map.new(variances, fn variance ->
# Convert to seconds, to match up with vehicle.timestamp
variance_sec = variance * 60
time_ranges_by_terminal_id =
vehicle_positions
|> Enum.group_by(& &1.terminal, & &1.timestamp)
|> Map.new(fn {terminal_id, timestamps} ->
time_ranges =
timestamps
|> Enum.map(fn t -> (t - variance_sec)..(t + variance_sec)//1 end)
|> GlidesReport.Util.merge_ranges()
{terminal_id, time_ranges}
end)
{variance, time_ranges_by_terminal_id}
end)
# %{
# variance => MapSet.t(Departure.t())
# }
accurate_predictions_by_variance =
Map.new(variances, fn variance ->
# look up the actuals
accuracy_windows_by_terminal = Map.fetch!(actual_departure_windows_by_variance, variance)
accurate_predictions =
Enum.filter(trip_updates, fn departure ->
accuracy_windows_by_terminal
|> Map.get(departure.terminal, [])
|> Enum.any?(fn window -> departure.timestamp in window end)
end)
{variance, MapSet.new(accurate_predictions)}
end)
###########################
# VALUES BUCKETED BY HOUR #
###########################
trunc_to_hour = fn dt -> %{dt | minute: 0, second: 0, microsecond: {0, 0}} end
# `hour` from here on means a DateTime struct truncated to the hour.
predictions_by_hour =
trip_updates
|> Enum.group_by(&trunc_to_hour.(&1.local_dt))
|> Map.new(fn {k, l} -> {k, MapSet.new(l)} end)
departures_by_hour =
vehicle_positions
|> Enum.group_by(&trunc_to_hour.(&1.local_dt))
|> Map.new(fn {k, l} -> {k, MapSet.new(l)} end)
# %{
# hour => %{ variance => MapSet.t(Departure.t()) }
# }
accurate_predictions_by_variance_by_hour =
accurate_predictions_by_variance
# First, flatten the map while maintaining all data.
# [{variance, Departure.t()}]
|> Enum.flat_map(fn {variance, departures} ->
Enum.map(departures, &{variance, &1})
end)
# Then, group by hour.
# %{hour => [{variance, Departure.t()}]}
|> Enum.group_by(fn {_variance, departure} ->
trunc_to_hour.(departure.local_dt)
end)
# Then, re-build the variance => stop_times map within each hour bucket.
|> Map.new(fn {hour, values} ->
accurate_predictions_by_variance =
Enum.group_by(
values,
fn {variance, _} -> variance end,
fn {_, departure} -> departure end
)
|> Map.new(fn {variance, departures} -> {variance, MapSet.new(departures)} end)
{hour, accurate_predictions_by_variance}
end)
Kino.nothing()
👉 Results
Per-Hour Counts of trips for which RTR made departure predictions vs. actual departures
Methodology:
- From VehiclePositions, get all timestamps (truncated to minute) at which a vehicle actually departed a terminal.[^1]
- From TripUpdates, get all timestamps (truncated to minute) at which a vehicle was predicted to depart a terminal.[^2]
- If a predicted departure time was within some number of minutes from the actual departure time at the same terminal, then that prediction is considered accurate. Each prediction is evaluated for accuracy under a number of different minute ranges: 1, 2, 3, 5, and 10 minutes.
[^1]: There is no “departing stop” vehicle status, so we look for events where the vehicle is “INTRANSIT_TO” or “INCOMING_AT” the stop _after the terminal. E.g. INCOMINGAT Green Street from Forest Hills.
[^2]: This is the set of _all times at which a vehicle was predicted to depart a terminal. If at any moment, even just for a minute, a vehicle was predicted to depart terminal S at time T, then that {time, terminal}
pair is added to the set.
all_hours =
(
start_hour = DateTime.new!(loader_settings.start_date, ~T[04:00:00], "America/New_York")
end_hour =
DateTime.new!(
Date.shift(loader_settings.end_date, day: 1),
~T[03:00:00],
"America/New_York"
)
start_hour
|> Stream.iterate(&DateTime.shift(&1, hour: 1))
|> Stream.take_while(&(DateTime.compare(&1, end_hour) != :gt))
# They should already be truncated to the hour, but let's just be safe...
|> Enum.map(trunc_to_hour)
)
table =
Enum.map(all_hours, fn hour ->
predicted_departure_time_count =
predictions_by_hour
|> Map.get(hour, MapSet.new())
|> MapSet.size()
actual_departure_time_count =
departures_by_hour
|> Map.get(hour, MapSet.new())
|> MapSet.size()
accurate_predictions_by_variance =
accurate_predictions_by_variance_by_hour
|> Map.get(hour, Map.new(variances, &{&1, MapSet.new()}))
bucket_columns =
Enum.flat_map(variances, fn variance ->
accurate_prediction_count =
accurate_predictions_by_variance
|> Map.get(variance, MapSet.new())
|> MapSet.size()
percentage =
GlidesReport.Util.format_percent(
accurate_prediction_count,
predicted_departure_time_count,
"N/A"
)
[
{"# accurate (± #{variance})", accurate_prediction_count},
{"% accurate (± #{variance})", percentage}
]
end)
[
{"service day", to_string(GlidesReport.Loader.service_day(hour))},
{"hour", Calendar.strftime(hour, "%H:00")},
{"# predictions", predicted_departure_time_count},
{"# departures", actual_departure_time_count}
] ++ bucket_columns
end)
table_name = "Prediction accuracy by bucket"
Kino.Download.new(
fn -> GlidesReport.Util.table_to_csv(table) end,
filename: GlidesReport.Util.build_csv_name(table_name, loader_settings, filter_settings),
label: "Export as CSV"
)
|> Kino.render()
Kino.DataTable.new(table, name: table_name)
About this table
Column | Description |
---|---|
service day | Service day analyzed by this row. |
hour | Hour within the service day analyzed by this row. |
# predictions | Total number of predictions generated during this hour.[^1] |
# departures | Total number of departures that occurred during this hour. |
# accurate (± N ) |
Number of predictions generated during this hour, whose predicted times were within N minutes of a departure from the same stop. |
% accurate (± N ) |
equal to [# accurate (± N )] / [# predictions] |
[^1]: Predictions with the same trip, stop, and predicted minute of departure are considered equivalent and counted once.
overall_prediction_count = MapSet.size(MapSet.new(trip_updates))
accuracy_percentages_by_variance =
Enum.map(variances, fn variance ->
accurate_prediction_count =
accurate_predictions_by_variance
|> Map.get(variance, MapSet.new())
|> MapSet.size()
percentage =
if overall_prediction_count > 0 do
round(100.0 * (accurate_prediction_count / overall_prediction_count))
else
"N/A"
end
%{
"Bucket size (± n minutes)" => variance,
"Accuracy (% of total predictions)" => percentage
}
end)
Kino.nothing()
VegaLite.new(width: 500, height: 300, title: "Percent accurate by bucket size")
|> VegaLite.data_from_values(accuracy_percentages_by_variance,
only: ["Bucket size (± n minutes)", "Accuracy (% of total predictions)"]
)
|> VegaLite.mark(:bar)
|> VegaLite.encode_field(:x, "Bucket size (± n minutes)", type: :quantitative)
|> VegaLite.encode_field(:y, "Accuracy (% of total predictions)", type: :quantitative)