Ch 10: RNNs for time series
Mix.install([
{:explorer, "~> 0.7.0"},
{:nx, "~> 0.6"},
{:exla, "~> 0.6"},
{:axon, "~> 0.5"},
{:vega_lite, "~> 0.1.6"},
{:kino, "~> 0.8.0"},
{:kino_vega_lite, "~> 0.1.7"}
])
alias VegaLite, as: Vl
alias Explorer.DataFrame, as: Df
Exploring the data
base_path = "Dev/Education/Elixir/ml/Datasets/"
csv_file = "stocks/all_stocks_2006-01-01_to_2018-01-01.csv"
df = Df.from_csv!(base_path <> csv_file, parse_dates: true)
df.names |> IO.inspect()
df = Df.select(df, ["Date", "Close", "Name"])
# Just taking a look at what to_columns does
Explorer.DataFrame.to_columns(df)
mk_graph = fn df, title ->
Vl.new(title: title, width: 640, height: 480)
|> Vl.data_from_values(Explorer.DataFrame.to_columns(df))
|> Vl.mark(:line)
|> Vl.encode_field(:x, "Date", type: :temporal)
|> Vl.encode_field(:y, "Close", type: :quantitative)
|> Vl.encode_field(:color, "Name", type: :nominal)
|> Kino.VegaLite.new()
end
mk_graph.(df, "DJIA Stock Prices")
We’re only concerned with the price of the AAPL stock, so we can filter the data accordingly:
aapl_df = Df.filter_with(df, fn df ->
Explorer.Series.equal(df["Name"], "AAPL")
end)
mk_graph.(aapl_df, "AAPL Stock Prices")
Prep data
normalized_aapl_df =
Df.mutate_with(aapl_df, fn df ->
var = Explorer.Series.variance(df["Close"])
mean = Explorer.Series.mean(df["Close"])
centered = Explorer.Series.subtract(df["Close"], mean)
norm = Explorer.Series.divide(centered, var)
[Close: norm]
end)
# get a sense of range
min = Explorer.Series.min(normalized_aapl_df["Close"])
max = Explorer.Series.max(normalized_aapl_df["Close"])
IO.inspect({min, max})
mk_graph.(normalized_aapl_df, "AAPL normalised stock price")
defmodule Data do
@spec window(
inputs :: Enumerable.t(),
window_size :: non_neg_integer(),
target_window_size :: non_neg_integer()
) :: Enumerable.t({Enumerable.t(), Enumerable.t()})
def window(inputs, window_size, target_window_size) do
inputs
|> Stream.chunk_every(window_size + target_window_size, 1, :discard)
|> Stream.map(fn window ->
features =
window
|> Enum.take(window_size)
|> Nx.tensor()
|> Nx.new_axis(1)
targets =
window
|> Enum.drop(window_size)
|> Nx.tensor()
|> Nx.new_axis(1)
{features, targets}
end)
end
@spec batch(
inputs :: Enumerable.t({Enumerable.t(), Enumerable.t()}),
batch_size :: non_neg_integer()
) :: Enumerable.t({any(), any()})
def batch(inputs, batch_size) do
inputs
|> Stream.chunk_every(batch_size, batch_size, :discard)
|> Stream.map(fn windows ->
{features, targets} = Enum.unzip(windows)
{Nx.stack(features), Nx.stack(targets)}
end)
end
end
train_df =
Explorer.DataFrame.filter_with(normalized_aapl_df, fn df ->
Explorer.Series.less(df["Date"], Date.new!(2016, 1, 1))
end)
test_df =
Explorer.DataFrame.filter_with(normalized_aapl_df, fn df ->
Explorer.Series.greater_equal(df["Date"], Date.new!(2016, 1, 1))
end)
window_size = 5
batch_size = 32
train_prices = Explorer.Series.to_list(train_df["Close"])
test_prices = Explorer.Series.to_list(test_df["Close"])
single_step_train_data =
train_prices
|> Data.window(window_size, 1)
# [
# {#Nx.Tensor,
# #Nx.Tensor}
# ...]
|> Data.batch(batch_size)
# [
# {#Nx.Tensor,
# #Nx.Tensor}
# ...]
single_step_test_data =
test_prices
|> Data.window(window_size, 1)
|> Data.batch(batch_size)
Enum.take(single_step_train_data, 1)
train = fn (model, epochs, optimizer, loss, metrics) ->
model
|> Axon.Loop.trainer(loss, optimizer)
|> Axon.Loop.metric(metrics)
|> Axon.Loop.run(single_step_train_data, %{}, epochs: epochs, compiler: EXLA)
end
evaluate = fn (model, trained_state, metrics) ->
model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(metrics)
|> Axon.Loop.run(single_step_test_data, trained_state, compiler: EXLA)
end
defmodule Analysis do
def visualize_predictions(
model,
model_state,
prices,
window_size,
target_window_size,
batch_size
) do
{_, predict_fn} = Axon.build(model, compiler: EXLA)
windows =
prices
|> Data.window(window_size, target_window_size)
|> Data.batch(batch_size)
|> Stream.map(&elem(&1, 0))
predicted =
Enum.flat_map(windows, fn window ->
predict_fn.(model_state, window)
|> Nx.to_flat_list()
end)
predicted = List.duplicate(nil, 10) ++ predicted
types =
List.duplicate("AAPL", length(prices)) ++
List.duplicate("Predicted", length(prices))
days =
Enum.to_list(0..(length(prices) - 1)) ++
Enum.to_list(0..(length(prices) - 1))
prices = prices ++ predicted
plot(
%{
"day" => days,
"prices" => prices,
"types" => types
},
"AAPL Stock Price vs. Predicted, CNN Single-Shot"
)
predicted
end
defp plot(values, title) do
Vl.new(title: title, width: 640, height: 480)
|> Vl.data_from_values(values)
|> Vl.mark(:line)
|> Vl.encode_field(:x, "day", type: :temporal)
|> Vl.encode_field(:y, "prices", type: :quantitative)
|> Vl.encode_field(:color, "types", type: :nominal)
|> Kino.VegaLite.new()
end
end
CNN for single step prediction
cnn_model =
Axon.input("stock_price")
|> Axon.nx(&Nx.new_axis(&1, -1))
|> Axon.conv(32, kernel_size: {window_size, 1}, activation: :relu)
|> Axon.dense(32, activation: :relu)
|> Axon.dense(1)
template = Nx.template({32, 10, 1}, :f32)
Axon.Display.as_graph(cnn_model, template)
cnn_trained_model_state =
train.(cnn_model, 10, :adam, :mean_squared_error, :mean_absolute_error)
variance = Explorer.Series.variance(df["Close"])
mean = Explorer.Series.mean(df["Close"])
denormalise = fn %{0 => error_map} ->
error_map
|> Map.get("mean_absolute_error")
|> Nx.to_number()
|> Kernel.*(:math.sqrt(variance))
|> Kernel.+(mean)
end
error = evaluate.(cnn_model, cnn_trained_model_state, :mean_absolute_error)
|> IO.inspect()
|> denormalise.()
IO.puts("""
The denormalised error is #{error}.
You can interpret the result as meaning that over the course of two years,
your model had an absolute error of $#{Float.round(error, 2)} off the next day’s closing stock price across each batch.
""")
Analysis.visualize_predictions(
cnn_model,
cnn_trained_model_state,
Explorer.Series.to_list(aapl_df["Close"]),
window_size,
1,
batch_size
)
RNN for time series
rnn_model =
Axon.input("stock_prices")
|> Axon.lstm(32)
|> elem(0)
|> Axon.nx(& &1[[0..-1//1, -1, 0..-1//1]])
|> Axon.dense(1)
template = Nx.template({32, 10, 1}, :f32)
Axon.Display.as_graph(rnn_model, template)
rnn_trained_state =
train.(rnn_model, 30, :adam, :mean_squared_error, :mean_absolute_error)
variance = Explorer.Series.variance(df["Close"])
mean = Explorer.Series.mean(df["Close"])
denormalise = fn %{0 => error_map} ->
error_map
|> Map.get("mean_absolute_error")
|> Nx.to_number()
|> Kernel.*(:math.sqrt(variance))
|> Kernel.+(mean)
end
error = evaluate.(rnn_model, rnn_trained_state, :mean_absolute_error)
|> IO.inspect()
|> denormalise.()
IO.puts("""
The denormalised error is #{error}.
You can interpret the result as meaning that over the course of two years,
your model had an absolute error of $#{Float.round(error, 2)} off the next day’s closing stock price across each batch.
""")
Analysis.visualize_predictions(
rnn_model,
rnn_trained_state,
Explorer.Series.to_list(aapl_df["Close"]),
window_size,
1,
batch_size
)