Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Glides Terminal Departure Accuracy

glides_terminal_departure_accuracy.livemd

Glides Terminal Departure Accuracy

project_dir = __DIR__ |> Path.join("..") |> Path.expand()

Mix.install(
  [
    {:kino, "~> 0.12.0"},
    {:transit_data, path: project_dir},
    # transit_data needs a timezone DB for some date-related logic.
    {:tz, "~> 0.26.5"},
    {:kino_vega_lite, "~> 0.1.11"}
  ],
  config: [
    elixir: [time_zone_database: Tz.TimeZoneDatabase],
    ex_aws: [
      access_key_id: System.get_env("LB_AWS_ACCESS_KEY_ID"),
      secret_access_key: System.get_env("LB_AWS_SECRET_ACCESS_KEY"),
      region: "us-east-1"
    ]
  ]
)

alias TransitData.GlidesReport

Kino.nothing()

Instructions

This notebook provides an implementation of 🚃 Build External Glides Terminal Departure Accuracy Report.

The notebook requires AWS credentials for a user with access to the mbta-gtfs-s3 family of S3 buckets.

How to add your AWS credentials

  1. Open your Hub. This link should send you there.
  2. Under “Secrets”, add two secrets with the following names:
    • AWS_ACCESS_KEY_ID
    • AWS_SECRET_ACCESS_KEY
  3. Return to this notebook. Click the 🔒 icon in the left sidebar and toggle on both secrets.

Generating a report

Evaluate code cells from top to bottom.

Some cells produce controls that let you adjust how the report runs.

Cells that generate inputs / outputs are marked with a “👉”—you can skip to these, and when you evaluate them, any intermediate cells will automatically evaluate as well.

Tip: If you want to generate another report with different settings, simply change the settings, scroll back down to Results, and click “Evaluate” again.

Setup

use_advanced_date? = Kino.Shorts.read_checkbox("Use advanced date/time input?")
Kino.nothing()

Simple: Choose a date. The report will analyze the full service day on that date.

Advanced: Choose a start date+time and end date+time. The report will analyze within that window only.

env_input =
  Kino.Input.select("Environment", [
    {"", "prod"},
    {"-dev-blue", "dev-blue"},
    {"-dev-green", "dev-green"},
    {"-dev", "dev"},
    {"-sandbox", "sandbox"}
  ])

# DateTimes are cached in the process dictionary to prevent the input's default
# value from changing (and the input from resetting to that default) every time this cell re-runs.
process_get = fn key, default_fn ->
  case Process.get(key, nil) do
    nil ->
      value = default_fn.()
      Process.put(key, {:value, value})
      value
    {:value, value} ->
      value
  end
end


today_date =
  process_get.(:today_date, fn ->
      DateTime.utc_now()
      |> DateTime.shift_zone!("America/New_York")
      |> DateTime.to_date()
    end)

yesterday_date = process_get.(:yesterday_date, fn -> Date.add(today_date, -1) end)

# Default setting: Analyze a full service day, starting at 4am yesterday (Eastern) and ending at 4am today (Eastern).
yesterday_start_time =
  process_get.(:yesterday_start_time, fn ->
      DateTime.new!(yesterday_date, ~T[04:00:00], "America/New_York")
      |> DateTime.shift_zone!("Etc/UTC")
      |> DateTime.to_naive()
  end)

today_end_time =
  process_get.(:today_end_time, fn ->
    DateTime.new!(today_date, ~T[03:59:59], "America/New_York")
      |> DateTime.shift_zone!("Etc/UTC")
      |> DateTime.to_naive()
  end)

# Simple date input
date_input = Kino.Input.date("Date", default: yesterday_date, max: today_date)

# Advanced start/end datetime inputs
start_date_input =
  Kino.Input.utc_datetime("Analyze data from...",
    default: yesterday_start_time,
    max: DateTime.utc_now()
  )

end_date_input =
  Kino.Input.utc_datetime("to...", default: today_end_time, max: DateTime.utc_now())

sample_rate_input =
  Kino.Input.range("Sample data at (?)-minute intervals", min: 1, max: 5, step: 1, default: 5)

samples_per_minute_input =
  Kino.Input.number("Take (?) samples per minute - leave blank for ALL", default: 1)

content_map = %{
  true:
    Kino.Layout.grid(
      [
        env_input,
        Kino.Layout.grid(
          [
            start_date_input,
            end_date_input,
            sample_rate_input,
            samples_per_minute_input
          ],
          columns: 2
        )
      ],
      columns: 1
    ),
  false:
    Kino.Layout.grid(
      [
        env_input,
        date_input,
        sample_rate_input,
        samples_per_minute_input
      ],
      columns: 2
    )
}

doc_table_map =
  %{
    true: """
    | Analyze data from... | Start analyzing data from this date/time.
Default is 4am yesterday, Eastern. | | to... | Analyze data up to this date/time.
Default is 3:59am today, Eastern. | """
, false: """ | Date | Service date to analyze data from. A 24-hour period starting at 4am Eastern. | """ } |> Map.new(fn {k, fragment} -> {k, Kino.Markdown.new(""" ### Setting Details | Setting | Details | | ----------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | Environment | Environment to analyze data from. | #{String.trim_trailing(fragment)} | Sample data at (?)-minute intervals | Sets interval at which data is sampled for analysis.
Lower value = more samples and slower report generation. | | Take (?) samples per minute | Sets number of samples to take within each sampled minute.
Higher value = more samples and slower report generation.
Leave blank to analyze *all* data within each sampled minute. (slowest) | """
)} end) frame = Kino.Frame.new(placeholder: false) Kino.Frame.clear(frame) Kino.Frame.append(frame, content_map[use_advanced_date?]) Kino.Frame.append(frame, doc_table_map[use_advanced_date?]) frame

Read inputs.

# Some manual validation:
sample_count = Kino.Input.read(samples_per_minute_input)

if is_number(sample_count) and trunc(sample_count) <= 0 do
  Kino.interrupt!(:error, "Samples per minute must be either blank or a positive integer.")
end

if use_advanced_date? do
  start_dt = Kino.Input.read(start_date_input)
  end_dt = Kino.Input.read(end_date_input)

  if is_nil(start_dt) do
    Kino.interrupt!(:error, "A start date/time must be selected.")
  end

  if is_nil(end_dt) do
    Kino.interrupt!(:error, "An end date/time must be selected.")
  end

  if NaiveDateTime.diff(end_dt, start_dt, :hour) >= 24 do
    Kino.interrupt!(
      :error,
      "Sorry, time windows of 24 hours or more are not yet supported.\n" <>
        "(If this was unexpected, check whether the window includes the autumn DST change.)"
    )
  end

  if NaiveDateTime.diff(end_dt, start_dt) <= 0 do
    Kino.interrupt!(:error, "Start date needs to come before end date.")
  end
else
  date = Kino.Input.read(date_input)

  if is_nil(date) do
    Kino.interrupt!(:error, "A date must be selected.")
  end
end

# Inputs are valid. Drop them into a settings struct.
loader_settings =
  if use_advanced_date? do
    GlidesReport.Settings.Load.new(
      Kino.Input.read(env_input),
      Kino.Input.read(start_date_input) |> DateTime.from_naive!("Etc/UTC"),
      Kino.Input.read(end_date_input) |> DateTime.from_naive!("Etc/UTC"),
      Kino.Input.read(sample_rate_input),
      Kino.Input.read(samples_per_minute_input)
    )
  else
    GlidesReport.Settings.Load.new(
      Kino.Input.read(env_input),
      Kino.Input.read(date_input),
      Kino.Input.read(sample_rate_input),
      Kino.Input.read(samples_per_minute_input)
    )
  end

Load data into memory.

file_counts =
  GlidesReport.Loader.load_data(
    loader_settings.start_dt,
    loader_settings.end_dt,
    loader_settings.env_suffix,
    loader_settings.sample_rate,
    loader_settings.sample_count
  )

IO.puts("Found #{file_counts.local} existing local files.")
IO.puts("Downloaded #{file_counts.downloaded} new files.")

# Uncomment to inspect the ETS tables:
# Kino.Layout.grid([
#   Kino.ETS.new(:TripUpdates),
#   Kino.ETS.new(:VehiclePositions)
# ], columns: 2)
Kino.nothing()

👉 Now, choose how you want to filter the data.

stop_ids_input =
  Kino.Input.select("Stop(s)", GlidesReport.Terminals.all_labeled_stops_and_groups())

limit_to_next_2_predictions_input = Kino.Input.checkbox("Simulate countdown clocks?")

min_advance_notice_input = Kino.Input.number("Minimum advance notice (seconds)")

[
  stop_ids_input,
  limit_to_next_2_predictions_input,
  min_advance_notice_input
]
|> Kino.Layout.grid(columns: 3)

Setting Details

Setting Details
Stop(s) Only analyze data concerning a specific stop, or group of stops.

Note: Terminals that do not have Glides predictions—Heath Street and Ashmont—are ignored regardless of which stop(s) you select.
Simulate countdown clocks? Only consider predictions that would have appeared on countdown clocks—those that were in the next 2 predictions for a stop at some point.
Minimum advance notice (seconds) Only consider predictions created at least this many seconds in advance of the departure time they predict.

For example: If this were set to 60, then a prediction generated at 12:00:00, with departure time predicted for 12:00:59, would be omitted from analysis.

Leave the field blank to disable this filter.

Read inputs.

filter_settings =
  GlidesReport.Settings.Filter.new(
    Kino.Input.read(stop_ids_input),
    Kino.Input.read(limit_to_next_2_predictions_input),
    Kino.Input.read(min_advance_notice_input)
  )

Main procedure

Filter trip updates based on your settings.

alias TransitData.GlidesReport.Departure

top_twos =
  if filter_settings.limit_to_next_2_predictions do
    GlidesReport.CountdownClocksSimulation.get_all_top_two_times(filter_settings.stop_ids)
  else
    nil
  end

trip_updates =
  :TripUpdates
  |> GlidesReport.Util.stream_values()
  |> Stream.map(&amp;GlidesReport.TripUpdate.normalize_stop_ids/1)
  # Filter each trip update's stop_time_update list.
  # If filtered list is empty for any trip update, the trip update is removed entirely.
  |> Stream.map(&amp;GlidesReport.TripUpdate.filter_stops(&amp;1, filter_settings.stop_ids))
  |> Stream.reject(&amp;is_nil/1)
  |> Stream.map(
    &amp;GlidesReport.TripUpdate.filter_by_advance_notice(&amp;1, filter_settings.min_advance_notice_sec)
  )
  |> Stream.reject(&amp;is_nil/1)
  # Split each trip update into its individual stop_time_update items.
  # Note that we use the timestamp of the predicted departure,
  # not the timestamp of when the prediction was generated.
  |> Stream.flat_map(fn tr_upd ->
    Enum.map(
      tr_upd.trip_update.stop_time_update,
      &amp;Departure.new(tr_upd.trip_update.trip.trip_id, &amp;1.stop_id, &amp;1.departure.time)
    )
  end)
  # Apply the "appeared on countdown clocks" filter, if it's enabled.
  |> then(fn stream ->
    if not is_nil(top_twos) do
      Stream.filter(stream, &amp;({&amp;1.stop, &amp;1.timestamp} in top_twos))
    else
      stream
    end
  end)
  |> Enum.to_list()

Kino.nothing()

Filter vehicle positions based on your settings.

vehicle_positions =
  :VehiclePositions
  |> GlidesReport.Util.stream_values()
  |> Stream.map(&amp;GlidesReport.VehiclePosition.normalize_stop_id/1)
  |> Stream.filter(&amp;(&amp;1.vehicle.stop_id in filter_settings.stop_ids))
  |> GlidesReport.VehiclePosition.dedup_statuses()
  |> Stream.map(&amp;Departure.new(&amp;1.vehicle.trip.trip_id, &amp;1.vehicle.stop_id, &amp;1.vehicle.timestamp))
  |> Enum.to_list()

Kino.nothing()

Compute accuracy under a range of bucket sizes. (Or variances, in the code)

variances = [1, 2, 3, 5, 10]

##################
# OVERALL VALUES #
##################

# %{
#   variance => %{
#     stop_id => [actual_departure_range1, actual_departure_range2, ...]
#   }
# }
actual_departure_windows_by_variance =
  Map.new(variances, fn variance ->
    # Convert to seconds, to match up with vehicle.timestamp
    variance_sec = variance * 60

    time_ranges_by_stop_id =
      vehicle_positions
      |> Enum.group_by(&amp;(&amp;1.stop), &amp;(&amp;1.timestamp))
      |> Map.new(fn {stop_id, timestamps} ->
        time_ranges =
          timestamps
          |> Enum.map(fn t -> (t - variance_sec)..(t + variance_sec)//1 end)
          |> GlidesReport.Util.merge_ranges()

        {stop_id, time_ranges}
      end)

    {variance, time_ranges_by_stop_id}
  end)

# %{
#   variance => MapSet.t(Departure.t())
# }
accurate_predictions_by_variance =
  Map.new(variances, fn variance ->
    # lookup the actuals
    accuracy_windows_by_stop = Map.fetch!(actual_departure_windows_by_variance, variance)

    accurate_predictions =
      trip_updates
      |> Enum.filter(fn departure ->
        accuracy_windows_by_stop
        |> Map.get(departure.stop, [])
        |> Enum.any?(fn window -> departure.timestamp in window end)
      end)
      |> MapSet.new()

    {variance, accurate_predictions}
  end)

###########################
# VALUES BUCKETED BY HOUR #
###########################

predictions_by_hour =
  trip_updates
  |> Enum.group_by(&amp;(&amp;1.hour))
  |> Map.new(fn {k, l} -> {k, MapSet.new(l)} end)
departures_by_hour =
  vehicle_positions
  |> Enum.group_by(&amp;(&amp;1.hour))
  |> Map.new(fn {k, l} -> {k, MapSet.new(l)} end)

# %{
#   hour => %{
#     %{
#       variance => MapSet.t(Departure.t())
#     }
#   }
# }
accurate_predictions_by_variance_by_hour =
  accurate_predictions_by_variance
  # First, flatten the map while maintaining all data.
  # [{variance, Departure.t()}]
  |> Enum.flat_map(fn {variance, departures} ->
    Enum.map(departures, &amp;{variance, &amp;1})
  end)
  # Then, group by hour.
  # %{hour => [{variance, Departure.t()}]}
  |> Enum.group_by(
    fn {_variance, departure} -> GlidesReport.Util.unix_timestamp_to_local_hour(departure.timestamp) end
  )
  # Then, re-build the variance => stop_times map within each hour bucket.
  |> Map.new(fn {hour, values} ->
    accurate_predictions_by_variance =
      Enum.group_by(values,
        fn {variance, _} -> variance end,
        fn {_, departure} -> departure end
      )
      |> Map.new(fn {variance, departures} -> {variance, MapSet.new(departures)} end)

    {hour, accurate_predictions_by_variance}
  end)

Kino.nothing()

👉 Results

Per-Hour Counts of trips for which RTR made departure predictions vs. actual departures

Methodology:

  • From VehiclePositions, get all timestamps (truncated to minute) at which a vehicle actually departed a stop.[^1]
  • From TripUpdates, get all timestamps (truncated to minute) at which a vehicle was predicted to depart a stop.[^2]
  • If a predicted departure time was within some number of minutes from the actual departure time at the same stop, then that prediction is considered accurate. Each prediction is evaluated for accuracy under a number of different minute ranges: 1, 2, 3, 5, and 10 minutes.

[^1]: There is no “departing stop” vehicle status, so we look for events where the vehicle is “INTRANSIT_TO” or “INCOMING_AT” the stop _after the target stop. [^2]: This is the set of all times at which a vehicle was predicted to depart a stop. If at any moment, even just for a minute, a vehicle was predicted to depart stop S at time T, then that {time, stop} pair is added to the set.

table =
  0..23
  # Service day starts at 4am, so let's start the table at that hour.
  |> Enum.map(&amp;rem(&amp;1 + 4, 24))
  |> Enum.map(fn hour ->
    predicted_departure_time_count =
      predictions_by_hour
      |> Map.get(hour, MapSet.new())
      |> MapSet.size()

    actual_departure_time_count =
      departures_by_hour
      |> Map.get(hour, MapSet.new())
      |> MapSet.size()

    accurate_predictions_by_variance =
      accurate_predictions_by_variance_by_hour
      |> Map.get(hour, Map.new(variances, &amp;{&amp;1, MapSet.new()}))

    bucket_columns =
      variances
      |> Enum.flat_map(fn variance ->
        accurate_prediction_count =
          accurate_predictions_by_variance
          |> Map.get(variance, MapSet.new())
          |> MapSet.size()

        percentage =
          if predicted_departure_time_count > 0 do
            p =
              round(100.0 * (accurate_prediction_count / predicted_departure_time_count))

            "#{p}%"
          else
            "N/A"
          end

        [
          {"# accurate (± #{variance})", accurate_prediction_count},
          {"% accurate (± #{variance})", percentage}
        ]
      end)

    [
      {"hour", "#{GlidesReport.Util.zero_pad(hour)}:00"},
      {"# predictions", predicted_departure_time_count},
      {"# departures", actual_departure_time_count}
    ] ++ bucket_columns
  end)

table_name = "Prediction accuracy by bucket"

Kino.Download.new(
  fn -> GlidesReport.Util.table_to_csv(table) end,
  filename: GlidesReport.Util.build_csv_name(table_name, loader_settings, filter_settings),
  label: "Export as CSV"
)
|> Kino.render()

Kino.DataTable.new(table, name: table_name)
overall_prediction_count = Enum.count(trip_updates)

accuracy_percentages_by_variance =
  variances
  |> Enum.map(fn variance ->
    accurate_prediction_count =
      accurate_predictions_by_variance
      |> Map.get(variance, MapSet.new())
      |> MapSet.size()

    percentage =
      if overall_prediction_count > 0 do
        round(100.0 * (accurate_prediction_count / overall_prediction_count))
      else
        "N/A"
      end

    %{
      "Bucket size (± n minutes)" => variance,
      "Accuracy (% of total predictions)" => percentage
    }
  end)

Kino.nothing()
VegaLite.new(width: 500, height: 300, title: "Percent accurate by bucket size")
|> VegaLite.data_from_values(accuracy_percentages_by_variance,
  only: ["Bucket size (± n minutes)", "Accuracy (% of total predictions)"]
)
|> VegaLite.mark(:bar)
|> VegaLite.encode_field(:x, "Bucket size (± n minutes)", type: :quantitative)
|> VegaLite.encode_field(:y, "Accuracy (% of total predictions)", type: :quantitative)