Ride Along Prediction Accuracy
Mix.install([
{:explorer, "~> 0.9"},
{:kino, "~> 0.13"},
{:kino_vega_lite, "~> 0.1.13"},
{:scholar, "~> 0.3.0"},
{:exla, ">= 0.0.0"},
{:exgboost, "~> 0.5"},
{:axon, "~> 0.6"},
#{:axon, github: "elixir-nx/axon"},
#{:table_rex, "~> 3.1.1", override: true}
])
Nx.global_default_backend(EXLA.Backend)
# Client can also be set to :cuda / :rocm
Nx.Defn.global_default_options(compiler: EXLA, client: :host)
defmodule Support do
require Explorer.DataFrame, as: DF
alias Explorer.Series
def truncate_to_minute(%DateTime{} = dt) do
Map.merge(dt, %{second: 0, microsecond: {0, 0}})
end
def round_up_to_minute(%DateTime{second: second, microsecond: {microsecond, _precision}} = dt)
when second > 0 or microsecond > 0 do
dt
|> Map.put(:time_zone, "Etc/UTC")
|> DateTime.add(1, :minute)
|> Map.merge(%{second: 0, microsecond: {0, 0}})
dt
end
def round_up_to_minute(dt) do
dt
end
def duration_to_seconds(col) do
Series.cast(
Series.divide(
Series.cast(col, :integer),
1_000_000
), :integer)
end
def diff_seconds(first, second) do
duration_to_seconds(Series.subtract(first, second))
end
def overall_accuracy(df, time_col, actual_col, prediction_col, accuracy) do
df
|> grouped_accuracy(time_col, actual_col, prediction_col, accuracy)
|> DF.summarise(accuracy: round(mean(accuracy), 1))
end
def grouped_accuracy(df, time_col, actual_col, prediction_col, accuracy) do
df
|> DF.distinct([time_col, actual_col, prediction_col])
|> with_accuracy(time_col, actual_col, prediction_col, accuracy)
|> DF.group_by(:category)
|> DF.summarise(
size: size(accurate?),
accurate_count: sum(accurate?)
)
|> DF.mutate(accuracy: round(100 * cast(accurate_count, {:u, 32}) / size, 1))
|> DF.ungroup()
|> DF.sort_by(asc: category)
end
def with_accuracy(df, time_col, actual_col, prediction_col, accuracy) do
time_ahead_seconds = diff_seconds(df[actual_col], df[time_col])
diff_seconds = diff_seconds(df[prediction_col], df[actual_col])
binned = accuracy.(time_ahead_seconds)
df
|> DF.put(:diff, diff_seconds)
|> DF.put(:category, binned[:category])
|> DF.mutate(accurate?: diff >= ^binned[:allowed_early] and diff <= ^binned[:allowed_late])
end
def transit_app_accuracy(series) do
cat =
series
|> Explorer.Series.cut([3 * 60, 6 * 60, 10 * 60, 15 * 60],
labels: ["0-3", "3-6", "6-10", "10-15", "15+"]
)
bins =
DF.new(
%{
category: ["0-3", "3-6", "6-10", "10-15", "15+"],
allowed_early: [-30, -60, -60, -90, -120],
allowed_late: [90, 150, 210, 270, 330]
},
dtypes: %{
category: :category,
allowed_early: {:s, 16},
allowed_late: {:u, 16}
}
)
DF.join(cat, bins, how: :left, on: :category)
end
def ibi_accuracy(series) do
cat =
series
|> Explorer.Series.cut([3 * 60, 6 * 60, 12 * 60, 30 * 60],
labels: ["0-3", "3-6", "6-12", "12-30", "30+"]
)
bins =
DF.new(
%{
category: ["0-3", "3-6", "6-12", "12-30", "30+"],
allowed_early: [-60, -90, -150, -240, -330],
allowed_late: [60, 120, 210, 360, 510]
},
dtypes: %{
category: :category,
allowed_early: {:s, 16},
allowed_late: {:u, 16}
}
)
cat
|> DF.join(bins, how: :left, on: :category)
end
end
Load/Group Data
require Explorer.DataFrame, as: DF
alias Explorer.{Duration, Series}
alias VegaLite, as: Vl
df =
"data-2024-08-26.csv"
|> Kino.FS.file_path()
|> DF.from_csv!(
parse_dates: true,
nil_values: [""],
dtypes: %{status: :category}
)
|> DF.filter(route > 0)
Kino.DataTable.new(df)
grouped = DF.group_by(df, :trip_id)
pure_arrival_times =
grouped
|> DF.filter(status == "arrived")
|> DF.summarise(pure_arrival_time: Series.min(time))
pickup_arrival_times =
grouped
|> DF.filter(status == "picked_up")
|> DF.mutate(load_time: load_time * %Duration{value: 60_000, precision: :millisecond})
|> DF.mutate(time: time - load_time)
|> DF.summarise(pickup_arrival_time: Series.min(time))
arrival_times = df
|> DF.distinct([:trip_id])
|> DF.join(pure_arrival_times, how: :left, on: :trip_id)
|> DF.join(pickup_arrival_times, how: :left, on: :trip_id)
|> DF.mutate(arrival_time: select(is_nil(pure_arrival_time), pickup_arrival_time, pure_arrival_time))
|> DF.select([:trip_id, :arrival_time])
|> DF.filter(not is_nil(arrival_time))
Kino.DataTable.new(arrival_times)
#Kino.nothing()
allowed_early_arrival = %Duration{value: 5 * 60_000, precision: :millisecond}
df =
df
|> DF.join(arrival_times, on: "trip_id")
|> DF.filter(arrival_time > time)
|> DF.filter(Support.diff_seconds(arrival_time, time) < 7200)
|> DF.filter(status in ["enroute", "waiting"])
|> DF.filter(not is_nil(ors_eta))
|> DF.mutate(
hour: Series.remainder(Series.hour(time) + 24 - 4, 24),
min_ors_eta: promise - ^allowed_early_arrival,
ahead: Support.diff_seconds(promise, time)
)
|> DF.mutate(naive_ors_eta: select(ors_eta > min_ors_eta, ors_eta, min_ors_eta))
df = df
|> DF.put(:naive_ors_eta, Series.transform(df[:naive_ors_eta], &Support.round_up_to_minute/1))
|> DF.mutate(
new: select(
ahead > 1200,
select(pick > naive_ors_eta, pick, naive_ors_eta),
naive_ors_eta)
)
accuracy = &Support.ibi_accuracy/1
fields = [:pick, :ors_eta, :naive_ors_eta, :new]#, :calculated]
#Kino.DataTable.new(DF.mutate(df, ahead: Support.diff_seconds(pick, time)))
#Kino.nothing()
ahead_chart = for ahead <- 600..1800//60 do
df =
DF.mutate(df,
new: select(ahead > ^ahead, select(pick > min_ors_eta, pick, min_ors_eta), naive_ors_eta)
)
calc = Support.overall_accuracy(df, :time, :arrival_time, :new, accuracy)[:accuracy][0] - 42
%{ahead: ahead, accuracy: calc}
end
Vl.new()
|> Vl.data_from_values(ahead_chart)
|> Vl.mark(:bar, tooltip: true)
|> Vl.encode_field(:x, "ahead", type: :quantitative)
|> Vl.encode_field(:y, "accuracy", type: :quantitative)
Accuracy Analysis
for field <- fields do
%{
"field" => "#{field}",
"accuracy" => Support.overall_accuracy(df, :time, :arrival_time, field, accuracy)[:accuracy][0]
}
end
|> Kino.DataTable.new(name: "Overall Accuracy %", keys: ["field", "accuracy"])
for field <- fields do
Support.grouped_accuracy(df, :time, :arrival_time, field, accuracy) |> Kino.DataTable.new(name: field)
end
|> Kino.Layout.grid(columns: 2)
accuracies = for hour <- 0..23 do
overall = Support.overall_accuracy(DF.filter(df, hour==^hour), :time, :arrival_time, :new, accuracy)[:accuracy]
overall = if Series.size(overall) == 0 do
0.0
else
overall[0]
end
%{
"hour" => hour,
"accuracy" => overall
}
end
Vl.new()
|> Vl.data_from_values(accuracies)
|> Vl.mark(:bar, tooltip: true)
|> Vl.encode_field(:x, "hour", type: :nominal)
|> Vl.encode_field(:y, "accuracy", type: :quantitative)
Machine Learning
alias Scholar.Metrics.Regression, as: Metrics
df = df
#|> DF.filter(ors_eta >= min_ors_eta)
|> DF.mutate(
time_of_day: (hour + 3) / 24,
ors_duration: Support.diff_seconds(ors_eta, time),
promise_duration: Support.diff_seconds(promise, time),
pick_duration: Support.diff_seconds(pick, time),
actual_duration: Support.diff_seconds(arrival_time, time)
)
|> DF.mutate(
min_ors_duration: select(promise_duration < 1200, 0, promise_duration - 1200),
ors_to_add: select(actual_duration > ors_duration, actual_duration - ors_duration, 0)
#ors_scale: Series.divide(actual_duration, ors_duration)
)
df = df
|> DF.mutate(
weekend?: select(Series.day_of_week(noon) > 5, 1.0, 0.0),
waiting?: select(status == "waiting", 1.0, 0.0),
early?: select(ors_duration < min_ors_duration, 1.0, 0.0),
within_30m?: select(promise_duration < 1800, 1.0, 0.0)
)
#Kino.DataTable.new(DF.describe(DF.select(df, [:ors_duration, :promise_duration, :pick_duration, :actual_duration, :hour])))
#Kino.DataTable.new(DF.filter(df, actual_duration > 7200) |> DF.select([:time, :arrival_time, :trip_id, :status]))
#Kino.DataTable.new(DF.filter(df, trip_id==95211040))
fields = ~w[ors_duration ors_to_add]
Vl.new()
|> Vl.data_from_values(df |> DF.shuffle() |> DF.slice(0..500))
|> Vl.repeat(fields, Vl.new()
|> Vl.mark(:point, tooltip: true)
|> Vl.encode_field(:color, "early?", type: :nominal)
|> Vl.encode_field(:x, "actual_duration", type: :quantitative)
|> Vl.encode_repeat(:y, :repeat, type: :quantitative))
df = DF.shuffle(df)
train_df = df
|> DF.slice(0..30000)
|> DF.shuffle() #DF.filter(df, ors_eta >= min_ors_eta)
training_fields = [
:ors_duration,
#:promise_duration,
:pick_duration,
:min_ors_duration,
:hour,
:weekend?,
#:time_of_day,
:waiting?
#:early?,
#:within_30m?
]
x =
train_df
|> DF.select(training_fields)
|> Nx.stack(axis: 1)
|> Nx.as_type(:s32)
y = DF.select(train_df, :ors_to_add) |> Nx.concatenate() |> Nx.as_type(:s32)
{x_train, x_test} = Nx.split(x, 0.9)
{y_train, y_test} = Nx.split(y, 0.9)
y
Linear Regression
model =
Scholar.Linear.LinearRegression.fit(
x_train,
y_train
)
y_pred = Scholar.Linear.LinearRegression.predict(model, x_test)
IO.inspect(model)
rmse =
Metrics.mean_square_error(y_test, y_pred)
|> Nx.sqrt()
mae = Metrics.mean_absolute_error(y_test, y_pred)
[
RMSE: Nx.to_number(rmse),
MAE: Nx.to_number(mae),
mean: Nx.to_number(Nx.mean(y))
]
all_x =
df
|> DF.select(training_fields)
|> Nx.stack(axis: 1)
pred = Scholar.Linear.LinearRegression.predict(model, all_x)
df
|> DF.put(:add, Nx.max(pred, 0))
|> DF.mutate(regression: ors_eta + %Duration{value: 1_000, precision: :millisecond} * add)
#|> DF.mutate(regression: select(regression > min_ors_eta, regression, min_ors_eta))
|> Support.overall_accuracy(:time, :arrival_time, :regression, accuracy)
|> Kino.DataTable.new()
Polynomial Regression
model =
Scholar.Linear.PolynomialRegression.fit(
x_train,
y_train
)
y_pred = Scholar.Linear.PolynomialRegression.predict(model, x_test)
IO.inspect(model)
rmse =
Metrics.mean_square_error(y_test, y_pred)
|> Nx.sqrt()
mae = Metrics.mean_absolute_error(y_test, y_pred)
[
RMSE: Nx.to_number(rmse),
MAE: Nx.to_number(mae),
mean: Nx.to_number(Nx.mean(y))
] |> IO.inspect()
pred = Scholar.Linear.PolynomialRegression.predict(model, x)
df
|> DF.put(:add, Nx.max(pred, 0))
|> DF.mutate(regression: ors_eta + %Duration{value: 1_000, precision: :millisecond} * add)
|> Support.overall_accuracy(:time, :arrival_time, :regression, accuracy)
|> Kino.DataTable.new()
Random Tree
alias Scholar.ModelSelection
boosted_grid = [
booster: [:gbtree],
device: [:cuda],
objective: [:reg_absoluteerror],
verbose_eval: [true],
tree_method: [:approx, :hist],
max_depth: [10, 50, 100, 200],
num_boost_rounds: [100, 200],
subsample: [0.25, 0.5, 0.75, 1.0]
#evals: [[{x_train, y_train, "training"}]]
]
random_forest_grid = [
booster: [:gbtree],
device: [:cuda],
objective: [:reg_squarederror, :reg_absoluteerror],
verbose_eval: [true],
tree_method: [
#:approx,
:exact],
max_depth: [
2,
#3,
4,
#5,
6],
num_parallel_tree: [10, #30,
50, 100],
num_boost_rounds: [1],
colsample_bynode: [
#0.25,
#0.5,
0.75, 0.99],
subsample: [
#0.25,
#0.5,
0.75, 0.99],
learning_rate: [1],
#evals: [[{x_train, y_train, "training"}]]
]
grid = boosted_grid #random_forest_grid
folding_fn = fn a -> [Nx.split(a, 0.9)] end
scoring_fn = fn x, y, hyperparams ->
IO.inspect(Keyword.delete(hyperparams, :evals))
{x_train, x_test} = x
{y_train, y_test} = y
y_pred =
EXGBoost.train(
x_train,
y_train,
#hyperparams
Keyword.merge(hyperparams, evals: [{x_train, y_train, "training"}])
)
|> EXGBoost.predict(x_test)
mae = Metrics.mean_absolute_error(y_test, y_pred)
rmse = Metrics.mean_square_error(y_test, y_pred)
[mae, rmse]
end
#gs_scores = ModelSelection.grid_search(x, y, folding_fn, scoring_fn, grid)
Kino.nothing()
# [best_config | _]=
# Enum.sort_by(gs_scores, fn %{score: score} ->
# score[1]
# |> Nx.squeeze()
# |> Nx.to_number()
# end) |> IO.inspect()
# %{hyperparameters: boosted_opts} = best_config
# IO.inspect(best_config)
boosted_opts = [
booster: :gbtree,
device: :cuda,
objective: :reg_absoluteerror,
verbose_eval: false,
tree_method: :hist,
max_depth: 5,
num_boost_rounds: 100,
subsample: 0.75,
#colsample_by_tree: 0.9,
#colsample_bylevel: 0.9,
colsample_bynode: 0.9,
#grow_policy: :lossguide,
#early_stopping_rounds: 5,
#monotone_constraints: [1],
learning_rate: 0.3,
seed: :erlang.unique_integer([:positive]),
#feature_name: Enum.map(training_fields, &Atom.to_string/1),
#evals: [{x_test, y_test, "training"}],
#validate_features: false
]
# random_forest_opts = [
# booster: :gbtree,
# device: :cuda,
# objective: :reg_squarederror,
# verbose_eval: false,
# tree_method: :hist,
# max_depth: 10,
# num_parallel_tree: 100,
# subsample: 0.5,
# colsample_bynode: 0.75,
# learning_rate: 1,
# evals: [{x_test, y_test, "training"}]
# ]
opts = boosted_opts
model = EXGBoost.train(x_train, y_train, opts)
#EXGBoost.Plotting.to_tabular(model)
:ok
#EXGBoost.plot_tree(model, rankdir: :lr, index: nil)
IO.inspect(EXGBoost.write_model(model, "Projects/github/ride_along/priv/model", overwrite: true))
y_pred = EXGBoost.predict(model, x_test)
IO.inspect(y_test)
IO.inspect(y_pred)
rmse =
Metrics.mean_square_error(y_test, y_pred)
|> Nx.sqrt()
mae = Metrics.mean_absolute_error(y_test, y_pred)
[
RMSE: Nx.to_number(rmse),
MAE: Nx.to_number(mae),
mean: Nx.to_number(Nx.mean(y_test)),
std: Nx.to_number(Nx.standard_deviation(y_test)),
pred_mean: Nx.to_number(Nx.mean(y_pred)),
pred_std: Nx.to_number(Nx.standard_deviation(y_pred))
]
validate_df = df |> DF.slice(30000..-1//1) |> DF.shuffle() |> DF.slice(0..15000)
x =
validate_df
|> DF.select(training_fields)
|> Nx.stack(axis: 1)
pred = EXGBoost.predict(model, x)
validate_df
|> DF.put(:add, Nx.as_type(pred, :s32))
|> DF.mutate(regression: ors_eta + %Duration{value: 1_000, precision: :millisecond} * add)
#|> DF.mutate(regression: select(regression > min_ors_eta, regression, min_ors_eta))
|> Support.overall_accuracy(:time, :arrival_time, :regression, accuracy)
|> Kino.DataTable.new()
boosted_opts = [
booster: :gbtree,
device: :cuda,
objective: :reg_squarederror,
verbose_eval: false,
tree_method: :hist,
max_depth: 10,
# num_parallel_tree: 100,
num_boost_rounds: 100,
subsample: 0.5,
# colsample_by_tree: 0.9,
# colsample_bylevel: 0.9,
# colsample_bynode: 0.9,
grow_policy: :lossguide,
early_stopping_rounds: 5,
# monotone_constraints: [1],
learning_rate: 0.2,
seed: :erlang.unique_integer([:positive]),
# feature_name: Enum.map(training_fields, &Atom.to_string/1),
evals: [{x_test, y_test, "training"}]
# validate_features: false
]
acc =
for max_depth <- [5],
objective <- [:reg_absoluteerror],
subsample <- [0.75],
colsample_bynode <- [0.9],
num_boost_rounds <- [100],
tree_method <- [:approx], #:hist]
booster <- [:gbtree],#, :dart],
grow_policy <- [:depthwise],#, :lossguide],
learning_rate <- [0.3],# 0.1, 0.2, 0.5, 0.7, 0.9, 1.0],
reduce: %{accuracy: 46} do
acc ->
new_opts = [
objective: objective,
max_depth: max_depth,
subsample: subsample,
colsample_bynode: colsample_bynode,
num_boost_rounds: num_boost_rounds,
tree_method: tree_method,
grow_policy: grow_policy,
learning_rate: learning_rate,
booster: booster
]
opts = Keyword.merge(boosted_opts, new_opts)
model = EXGBoost.train(x_train, y_train, opts)
pred = EXGBoost.predict(model, x)
overall =
(validate_df
|> DF.put(:add, Nx.as_type(pred, :s32))
|> DF.mutate(
regression: ors_eta + %Duration{value: 1_000, precision: :millisecond} * add
)
|> Support.overall_accuracy(:time, :arrival_time, :regression, accuracy))[:accuracy][0]
if overall > acc.accuracy do
IO.inspect({:new_accuracy, overall})
IO.inspect({:opts, new_opts})
Map.merge(acc, %{accuracy: overall, opts: new_opts, model: model})
else
acc
end
end
acc
model
Neural Network
input = Axon.input("input", shape: {nil, length(training_fields)})
neurons = trunc(length(training_fields) / 2)
model = input
|> Axon.dense(neurons)
|> Axon.dense(neurons)
|> Axon.dense(1)
model
train_df= train_df
|> DF.mutate(
ors_duration: (ors_duration - 1800) / 3600,
pick_duration: (pick_duration - 1800) / 3600,
min_ors_duration: (min_ors_duration - 1800) / 3600,
hour: (hour - 12) / 24,
ors_to_add: (ors_to_add - 1800) / 3600
)
train_df
|> DF.select(training_fields)
|> DF.describe()
|> Kino.DataTable.new()
|> Kino.render()
x = train_df
|> DF.select(training_fields)
|> Nx.stack(axis: 1)
|> Nx.as_type(:f32)
y = DF.select(train_df, :ors_to_add) |> Nx.concatenate() |> Nx.as_type(:f32)
{x_train, x_test} = Nx.split(x, 0.9)
{y_train, y_test} = Nx.split(y, 0.9)
batch_size = 32
y_train_batches = Nx.to_batched(y_train, batch_size)
x_train_batches = Nx.to_batched(x_train, batch_size)
train_data = for {x_batch, y_batch} <- Enum.zip(x_train_batches, y_train_batches) do
{%{"input" => Nx.as_type(x_batch, :f32)}, Nx.as_type(y_batch, :f32)}
end
plot =
Vl.new()
|> Vl.mark(:line)
|> Vl.encode_field(:x, "step", type: :quantitative)
|> Vl.encode_field(:y, "loss", type: :quantitative)#, scale: [domain: [574, 579]])
|> Kino.VegaLite.new()
|> Kino.render()
empty = %{} # Axon.ModelState.empty()
optimizer = Polaris.Optimizers.adam()#learning_rate: 0.001)
params = model
|> Axon.Loop.trainer(:mean_absolute_error, optimizer)
#|> Axon.Loop.metric(:mean_squared_error)
#|> Axon.Loop.validate(model, [{%{"input" => x_test}, y_test}])
|> Axon.Loop.kino_vega_lite_plot(plot, "loss", event: :epoch_completed)
|> Axon.Loop.run(train_data, empty, epochs: 20)#, iterations: 1000)
#params = Axon.Loop.run(loop, train_data, Axon.ModelState.empty(), epochs: 20, iterations: 1000)
y_pred = Axon.predict(model, params, %{
"input" => x_test,
}) |> Nx.flatten()
IO.inspect(y_test)
IO.inspect(y_pred)
rmse =
Metrics.mean_square_error(y_test, y_pred)
|> Nx.sqrt()
mae = Metrics.mean_absolute_error(y_test, y_pred)
[
RMSE: Nx.to_number(rmse),
MAE: Nx.to_number(mae),
mean: Nx.to_number(Nx.mean(y)),
std: Nx.to_number(Nx.standard_deviation(y)),
pred_mean: Nx.to_number(Nx.mean(y_pred)),
pred_std: Nx.to_number(Nx.standard_deviation(y_pred))
]
df = df |> DF.slice(30000..-1//1) |> DF.shuffle() |> DF.slice(0..15000)
all_x =
df
|> DF.mutate(
ors_duration: (ors_duration - 1800) / 3600,
pick_duration: (pick_duration - 1800) / 3600,
min_ors_duration: (min_ors_duration - 1800) / 3600,
hour: (hour - 12) / 24,
ors_to_add: (ors_to_add - 1800) / 3600
)
|> DF.select(training_fields)
|> Nx.stack(axis: 1)
|> Nx.as_type(:f32)
pred = Axon.predict(model, params, %{
"input" => all_x,
}) |> Nx.flatten()
df = df
|> DF.put(:add_orig, pred)
|> DF.mutate(add: (add_orig * 3600) + 1800)
|> DF.discard(:add_orig)
|> DF.mutate(add: select(add > 0, add, 0))
|> DF.mutate(regression: ors_eta + %Duration{value: 1_000, precision: :millisecond} * add)
df |> DF.describe() |> Kino.DataTable.new() |> Kino.render()
#|> DF.mutate(regression: select(regression > min_ors_eta, regression, min_ors_eta))
df
|> Support.overall_accuracy(:time, :arrival_time, :regression, accuracy)
#|> DF.filter(add != 1.0)
#|> Kino.DataTable.new()