Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Glides Terminal Departure Accuracy

glides_terminal_departure_accuracy.livemd

Glides Terminal Departure Accuracy

project_dir = __DIR__ |> Path.join("..") |> Path.expand()

Mix.install(
  [
    {:kino, "~> 0.14.2"},
    {:transit_data, path: project_dir},
    # transit_data needs a timezone DB for some date-related logic.
    {:tz, "~> 0.26.5"},
    {:kino_vega_lite, "~> 0.1.11"}
  ],
  config: [
    elixir: [time_zone_database: Tz.TimeZoneDatabase],
    ex_aws: [
      access_key_id: System.get_env("LB_AWS_ACCESS_KEY_ID"),
      secret_access_key: System.get_env("LB_AWS_SECRET_ACCESS_KEY"),
      region: "us-east-1"
    ]
  ]
)

alias TransitData.GlidesReport

Kino.nothing()

Instructions

This notebook provides an implementation of 🚃 Build External Glides Terminal Departure Accuracy Report.

The notebook requires AWS credentials for a user with access to the mbta-gtfs-s3 family of S3 buckets.

How to add your AWS credentials

  1. Open your Hub. This link should send you there.
  2. Under “Secrets”, add two secrets with the following names:
    • AWS_ACCESS_KEY_ID
    • AWS_SECRET_ACCESS_KEY
  3. Return to this notebook. Click the 🔒 icon in the left sidebar and toggle on both secrets.

Generating a report

Evaluate code cells from top to bottom.

Some cells produce controls that let you adjust how the report runs.

Cells that generate inputs / outputs are marked with a “👉”—you can skip to these, and when you evaluate them, any intermediate cells will automatically evaluate as well.

Tip: If you want to generate another report with different settings, simply change the settings, scroll back down to Results, and click “Evaluate” again.

Setup

👉 What data do you want to load?

env_input =
  Kino.Input.select("Environment", [
    {"", "prod"},
    {"-dev-blue", "dev-blue"},
    {"-dev-green", "dev-green"},
    {"-dev", "dev"},
    {"-sandbox", "sandbox"}
  ])

# Default setting: Analyze a full service day, starting at 4am yesterday (Eastern) and ending at 4am today (Eastern).
today =
  DateTime.utc_now()
  |> DateTime.shift_zone!("America/New_York")
  |> DateTime.to_date()

yesterday = Date.add(today, -1)

start_date_input = Kino.Input.date("Analyze data from...", default: yesterday, max: today)
end_date_input = Kino.Input.date("to...", default: yesterday, max: today)

sample_rate_input =
  Kino.Input.range("Sample data at (?)-minute intervals", min: 1, max: 5, step: 1, default: 1)

samples_per_minute_input =
  Kino.Input.number("Take (?) samples per minute - leave blank for ALL", default: nil)

Kino.Layout.grid(
  [
    env_input,
    Kino.Layout.grid(
      [
        start_date_input,
        end_date_input,
        sample_rate_input,
        samples_per_minute_input
      ],
      columns: 2
    )
  ],
  columns: 1
)

Setting Details

Setting Details
Environment Environment to analyze data from.
Analyze data from… First service day to analyze.
(Service days start at 4am.)
to… Last service day to analyze.
Sample data at (?)-minute intervals Sets interval at which data is sampled for analysis.
Lower value = more samples and slower report generation.
Take (?) samples per minute Sets number of samples to take within each sampled minute.
Higher value = more samples and slower report generation.
Leave blank to analyze all data within each sampled minute. (slowest)

Read inputs.

# Some manual validation:
sample_count = Kino.Input.read(samples_per_minute_input)

if is_number(sample_count) and trunc(sample_count) <= 0 do
  Kino.interrupt!(:error, "Samples per minute must be either blank or a positive integer.")
end

start_d = Kino.Input.read(start_date_input)
end_d = Kino.Input.read(end_date_input)

if is_nil(start_d) do
  Kino.interrupt!(:error, "A start date must be selected.")
end

if is_nil(end_d) do
  Kino.interrupt!(:error, "An end date must be selected.")
end

if Date.compare(start_d, end_d) == :gt do
  Kino.interrupt!(:error, "Start date cannot be after end date.")
end

# Inputs are valid. Drop them into a settings struct.
loader_settings =
  GlidesReport.Settings.Load.new(
    Kino.Input.read(env_input),
    Kino.Input.read(start_date_input),
    Kino.Input.read(end_date_input),
    Kino.Input.read(sample_rate_input),
    Kino.Input.read(samples_per_minute_input)
  )

Load data into memory.

file_counts =
  GlidesReport.Loader.load_data(
    loader_settings.start_date,
    loader_settings.end_date,
    loader_settings.env_suffix,
    loader_settings.sample_rate,
    loader_settings.sample_count
  )

IO.puts("Found #{file_counts.local} existing local files.")
IO.puts("Downloaded #{file_counts.downloaded} new files.")

# Uncomment to inspect the ETS tables:
# Kino.Layout.grid([
#   Kino.ETS.new(:TripUpdates),
#   Kino.ETS.new(:VehiclePositions)
# ], columns: 2)
Kino.nothing()

👉 Now, choose how you want to filter the data.

{terminals, groups} = GlidesReport.Terminal.all_labeled_terminals_and_groups()
terminal_ids_input = GlidesReport.TerminalSelection.new(terminals, groups)

limit_to_next_2_predictions_input = Kino.Input.checkbox("Simulate countdown clocks?")

min_advance_notice_input = Kino.Input.number("Minimum advance notice (seconds)", default: 1)

[
  terminal_ids_input,
  limit_to_next_2_predictions_input,
  min_advance_notice_input
]
|> Kino.Layout.grid(columns: 3)

Setting Details

Setting Details
Stop(s) Only analyze data concerning these terminal stops.
Simulate countdown clocks? Only consider predictions that would have appeared on countdown clocks—those that were in the next 2 predictions for a stop at some point.
Minimum advance notice (seconds) Only consider predictions created at least this many seconds in advance of the departure time they predict.

For example: If this were set to 60, then:
  • a prediction generated at 12:00:00, with departure time predicted for 12:00:30, would be omitted from analysis.
  • A prediction generated at 12:00:00 with departure time predicted for 12:01:00 would be included in analysis.
Leave the field blank to disable this filter.

Read inputs.

if Enum.empty?(GlidesReport.TerminalSelection.get_checked_terminals(terminal_ids_input)) do
  Kino.interrupt!(:error, "Please select at least one stop.")
end

filter_settings =
  GlidesReport.Settings.Filter.new(
    GlidesReport.TerminalSelection.get_checked_terminals(terminal_ids_input),
    Kino.Input.read(limit_to_next_2_predictions_input),
    Kino.Input.read(min_advance_notice_input)
  )

Main procedure

Filter trip updates based on your settings.

alias TransitData.GlidesReport.Departure

top_twos =
  if filter_settings.limit_to_next_2_predictions do
    GlidesReport.CountdownClocksSimulation.get_all_top_two_times(filter_settings.terminal_ids)
  else
    nil
  end

trip_updates =
  :TripUpdates
  |> GlidesReport.Util.stream_values()
  |> Stream.map(&amp;GlidesReport.TripUpdate.normalize_stop_ids/1)
  # Filter each trip update's stop_time_update list.
  # If filtered list is empty for any trip update, the trip update is removed entirely.
  |> Stream.map(&amp;GlidesReport.TripUpdate.filter_terminals(&amp;1, filter_settings.terminal_ids))
  |> Stream.reject(&amp;is_nil/1)
  |> Stream.map(
    &amp;GlidesReport.TripUpdate.filter_by_advance_notice(&amp;1, filter_settings.min_advance_notice_sec)
  )
  |> Stream.reject(&amp;is_nil/1)
  # Split each trip update into its individual stop_time_update items.
  # Note that we use the timestamp of the predicted departure,
  # not the timestamp of when the prediction was generated.
  |> Stream.flat_map(fn tr_upd ->
    Enum.map(
      tr_upd.trip_update.stop_time_update,
      &amp;Departure.new(tr_upd.trip_update.trip.trip_id, &amp;1.terminal_id, &amp;1.departure.time)
    )
  end)
  # Apply the "appeared on countdown clocks" filter, if it's enabled.
  |> then(fn stream ->
    if not is_nil(top_twos) do
      Stream.filter(stream, &amp;({&amp;1.terminal, &amp;1.timestamp} in top_twos))
    else
      stream
    end
  end)
  |> Enum.to_list()

Kino.nothing()

Filter vehicle positions based on your settings.

vehicle_positions =
  :VehiclePositions
  |> GlidesReport.Util.stream_values()
  |> Stream.map(&amp;GlidesReport.VehiclePosition.normalize_stop_id/1)
  |> Stream.filter(&amp;(&amp;1.vehicle.terminal_id in filter_settings.terminal_ids))
  |> GlidesReport.VehiclePosition.dedup_statuses()
  |> Stream.map(
    &amp;Departure.new(&amp;1.vehicle.trip.trip_id, &amp;1.vehicle.terminal_id, &amp;1.vehicle.timestamp)
  )
  |> Enum.to_list()

Kino.nothing()

Compute accuracy under a range of bucket sizes. (Or variances, in the code)

variances = [1, 2, 3, 5, 10]

##################
# OVERALL VALUES #
##################

# %{
#   variance => %{
#     terminal_id => [actual_departure_range1, actual_departure_range2, ...]
#   }
# }
actual_departure_windows_by_variance =
  Map.new(variances, fn variance ->
    # Convert to seconds, to match up with vehicle.timestamp
    variance_sec = variance * 60

    time_ranges_by_terminal_id =
      vehicle_positions
      |> Enum.group_by(&amp; &amp;1.terminal, &amp; &amp;1.timestamp)
      |> Map.new(fn {terminal_id, timestamps} ->
        time_ranges =
          timestamps
          |> Enum.map(fn t -> (t - variance_sec)..(t + variance_sec)//1 end)
          |> GlidesReport.Util.merge_ranges()

        {terminal_id, time_ranges}
      end)

    {variance, time_ranges_by_terminal_id}
  end)

# %{
#   variance => MapSet.t(Departure.t())
# }
accurate_predictions_by_variance =
  Map.new(variances, fn variance ->
    # look up the actuals
    accuracy_windows_by_terminal = Map.fetch!(actual_departure_windows_by_variance, variance)

    accurate_predictions =
      Enum.filter(trip_updates, fn departure ->
        accuracy_windows_by_terminal
        |> Map.get(departure.terminal, [])
        |> Enum.any?(fn window -> departure.timestamp in window end)
      end)

    {variance, MapSet.new(accurate_predictions)}
  end)

###########################
# VALUES BUCKETED BY HOUR #
###########################

trunc_to_hour = fn dt -> %{dt | minute: 0, second: 0, microsecond: {0, 0}} end

# `hour` from here on means a DateTime struct truncated to the hour.
predictions_by_hour =
  trip_updates
  |> Enum.group_by(&amp;trunc_to_hour.(&amp;1.local_dt))
  |> Map.new(fn {k, l} -> {k, MapSet.new(l)} end)

departures_by_hour =
  vehicle_positions
  |> Enum.group_by(&amp;trunc_to_hour.(&amp;1.local_dt))
  |> Map.new(fn {k, l} -> {k, MapSet.new(l)} end)

# %{
#   hour => %{ variance => MapSet.t(Departure.t()) }
# }
accurate_predictions_by_variance_by_hour =
  accurate_predictions_by_variance
  # First, flatten the map while maintaining all data.
  # [{variance, Departure.t()}]
  |> Enum.flat_map(fn {variance, departures} ->
    Enum.map(departures, &amp;{variance, &amp;1})
  end)
  # Then, group by hour.
  # %{hour => [{variance, Departure.t()}]}
  |> Enum.group_by(fn {_variance, departure} ->
    trunc_to_hour.(departure.local_dt)
  end)
  # Then, re-build the variance => stop_times map within each hour bucket.
  |> Map.new(fn {hour, values} ->
    accurate_predictions_by_variance =
      Enum.group_by(
        values,
        fn {variance, _} -> variance end,
        fn {_, departure} -> departure end
      )
      |> Map.new(fn {variance, departures} -> {variance, MapSet.new(departures)} end)

    {hour, accurate_predictions_by_variance}
  end)

Kino.nothing()

👉 Results

Per-Hour Counts of trips for which RTR made departure predictions vs. actual departures

Methodology:

  • From VehiclePositions, get all timestamps (truncated to minute) at which a vehicle actually departed a terminal.[^1]
  • From TripUpdates, get all timestamps (truncated to minute) at which a vehicle was predicted to depart a terminal.[^2]
  • If a predicted departure time was within some number of minutes from the actual departure time at the same terminal, then that prediction is considered accurate. Each prediction is evaluated for accuracy under a number of different minute ranges: 1, 2, 3, 5, and 10 minutes.

[^1]: There is no “departing stop” vehicle status, so we look for events where the vehicle is “INTRANSIT_TO” or “INCOMING_AT” the stop _after the terminal. E.g. INCOMINGAT Green Street from Forest Hills. [^2]: This is the set of _all times at which a vehicle was predicted to depart a terminal. If at any moment, even just for a minute, a vehicle was predicted to depart terminal S at time T, then that {time, terminal} pair is added to the set.

all_hours =
  (
    start_hour = DateTime.new!(loader_settings.start_date, ~T[04:00:00], "America/New_York")

    end_hour =
      DateTime.new!(
        Date.shift(loader_settings.end_date, day: 1),
        ~T[03:00:00],
        "America/New_York"
      )

    start_hour
    |> Stream.iterate(&amp;DateTime.shift(&amp;1, hour: 1))
    |> Stream.take_while(&amp;(DateTime.compare(&amp;1, end_hour) != :gt))
    # They should already be truncated to the hour, but let's just be safe...
    |> Enum.map(trunc_to_hour)
  )

table =
  Enum.map(all_hours, fn hour ->
    predicted_departure_time_count =
      predictions_by_hour
      |> Map.get(hour, MapSet.new())
      |> MapSet.size()

    actual_departure_time_count =
      departures_by_hour
      |> Map.get(hour, MapSet.new())
      |> MapSet.size()

    accurate_predictions_by_variance =
      accurate_predictions_by_variance_by_hour
      |> Map.get(hour, Map.new(variances, &amp;{&amp;1, MapSet.new()}))

    bucket_columns =
      Enum.flat_map(variances, fn variance ->
        accurate_prediction_count =
          accurate_predictions_by_variance
          |> Map.get(variance, MapSet.new())
          |> MapSet.size()

        percentage =
          GlidesReport.Util.format_percent(
            accurate_prediction_count,
            predicted_departure_time_count,
            "N/A"
          )

        [
          {"# accurate (± #{variance})", accurate_prediction_count},
          {"% accurate (± #{variance})", percentage}
        ]
      end)

    [
      {"service day", to_string(GlidesReport.Loader.service_day(hour))},
      {"hour", Calendar.strftime(hour, "%H:00")},
      {"# predictions", predicted_departure_time_count},
      {"# departures", actual_departure_time_count}
    ] ++ bucket_columns
  end)

table_name = "Prediction accuracy by bucket"

Kino.Download.new(
  fn -> GlidesReport.Util.table_to_csv(table) end,
  filename: GlidesReport.Util.build_csv_name(table_name, loader_settings, filter_settings),
  label: "Export as CSV"
)
|> Kino.render()

Kino.DataTable.new(table, name: table_name)

About this table

Column Description
service day Service day analyzed by this row.
hour Hour within the service day analyzed by this row.
# predictions Total number of predictions generated during this hour.[^1]
# departures Total number of departures that occurred during this hour.
# accurate (± N) Number of predictions generated during this hour, whose predicted times were within N minutes of a departure from the same stop.
% accurate (± N) equal to [# accurate (± N)] / [# predictions]

[^1]: Predictions with the same trip, stop, and predicted minute of departure are considered equivalent and counted once.

overall_prediction_count = MapSet.size(MapSet.new(trip_updates))

accuracy_percentages_by_variance =
  Enum.map(variances, fn variance ->
    accurate_prediction_count =
      accurate_predictions_by_variance
      |> Map.get(variance, MapSet.new())
      |> MapSet.size()

    percentage =
      if overall_prediction_count > 0 do
        round(100.0 * (accurate_prediction_count / overall_prediction_count))
      else
        "N/A"
      end

    %{
      "Bucket size (± n minutes)" => variance,
      "Accuracy (% of total predictions)" => percentage
    }
  end)

Kino.nothing()
VegaLite.new(width: 500, height: 300, title: "Percent accurate by bucket size")
|> VegaLite.data_from_values(accuracy_percentages_by_variance,
  only: ["Bucket size (± n minutes)", "Accuracy (% of total predictions)"]
)
|> VegaLite.mark(:bar)
|> VegaLite.encode_field(:x, "Bucket size (± n minutes)", type: :quantitative)
|> VegaLite.encode_field(:y, "Accuracy (% of total predictions)", type: :quantitative)