Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Time Series Forecasting with Axon

apple_stock_prices.livemd

Time Series Forecasting with Axon

Mix.install([
  {:axon, "~> 0.6.1"},
  {:kino_vega_lite, "~> 0.1.13"},
  {:nx, "~> 0.7.3"},
  {:tucan, "~> 0.3.1"},
  {:polaris, "~> 0.1.0"},
  {:exla, "~> 0.7.3"},
  {:req, "~> 0.5.2"}
])

Nx.global_default_backend(EXLA.Backend)
Nx.Defn.global_default_options(compiler: EXLA)

Introduction

In this project, we will build a predictor for a time series using an RNN regressor (Recurrent Neural Network regressor). The prediction will be made for Apple’s stock price 7 days in advance, based on a historical series.

An architecture known as Long Term Short Memory (LSTM) will be used for the RNN.

First, we load the historical data. Subsequently, preprocessing will be conducted to utilize the information with an RNN model. The initial step involves normalizing the range of the series. This helps mitigate significant numerical issues associated with how activation functions like tanh transform very large numbers (whether positive or negative), and it aids in avoiding problems with calculating derivatives.

data_url =
  "https://raw.githubusercontent.com/santiago-imelio/my_livebooks/main/apple_stock_prices/files/normalized_apple_prices.csv"

raw_csv = Req.get!(data_url).body
raw_data = Regex.split(~r/\r|\n|\r\n/, raw_csv)
data =
  raw_data
  |> List.pop_at(-1)
  |> elem(1)
  |> Enum.map(&Float.parse/1)
  |> Enum.map(&elem(&1, 0))
  |> Nx.tensor()

The series is normalized to belong within the range [-1, 1] using min-max scaling. It’s also common to see applications where normalization is performed using standard deviation. Let’s visualize the loaded data.

plt_domain = Nx.linspace(0, 137, n: 137, type: {:u, 8})

plt_data = [
  prices: data,
  t: plt_domain
]

Tucan.lineplot(plt_data, "t", "prices", height: 400, width: 600, line_color: "black")

# Comment to visualize with KinoVegaLite
:ok

|:-:| |Apple stock prices normalized|

Sliding window

Generally, a time-series is mathematically represented as:

$$ \langle s_0, s_1, s_2, \ldots, s_P \rangle $$

where $s_p$ is the numerical value of the series at time interval $p$, and $P$ is the total length of the series. To apply an RNN, the prediction should be treated as a regression problem. This involves using a sliding window to construct a set of input-output pairs on which regression will be applied.

For example, with a window size $T = 3$, the following pairs need to be produced:

$$ \begin{array}{c|c} \text{Input} & \text{Output}\ \hline \langle s{1},s{2},s{3}\rangle & s{4} \ \langle s{2},s{3},s{4} \rangle & s{5} \ \vdots & \vdots \ \langle s{P-3},s{P-2},s{P-1} \rangle & s{P} \end{array} $$

In each pair, the input consists of a sequence of $T$ consecutive values from the series, and the output is the subsequent value immediately following the input sequence. This approach prepares the data for training the RNN model for time-series prediction.

sliding_window = fn series, win_size ->
  {p} = Nx.shape(series)

  x_idx =
    series
    |> Nx.shape()
    |> Nx.iota()
    |> Nx.reshape({:auto, 1})
    |> Nx.vectorize(:elements)

  y_idx =
    series
    |> Nx.shape()
    |> Nx.iota()
    |> Nx.add(win_size)
    |> Nx.reshape({:auto, 1})
    |> Nx.slice([0, 0], [p - win_size, 1])

  x =
    Nx.slice(series, [x_idx[0]], [win_size])
    |> Nx.devectorize(keep_names: false)
    |> Nx.slice([0, 0], [p - win_size, win_size])

  y = Nx.gather(series, y_idx, axes: [0])

  {x, y}
end

Next we test our sliding window function with first 10 numbers of the Fibonacci sequence.

test_series = Nx.tensor([0, 1, 1, 2, 3, 5, 8, 13, 21, 34])
sliding_window.(test_series, 2)

Now we apply the sliding window to our dataset with a window size of 7.

p = 133
win_size = 7
trunc_data = data[0..(p - 1)]

{x, y} = sliding_window.(trunc_data, win_size)
{dataset_size, _} = Nx.shape(x)

Train-test splitting

Here we split the dataset into train and test sets. We will use two thirds of the data for training and the remaining for validation. Since the dataset is a ordered time-series, we shouldn’t shuffle the data before splitting.

train_test_split = 2 / 3

{x_train, x_test} = Nx.split(x, train_test_split)
{y_train, y_test} = Nx.split(y, train_test_split)

Building and training our RNN regressor

We will use Axon to build a neural network with two hidden RNN layers with the following specifications:

  1. The first layer should use an LSTM module with 5 hidden units. Its input_shape should be (window_size, 1).
  2. The second layer uses a fully connected module with one unit.

For the loss function we will use mean squared error.

batch_size = 6
input_shape = {batch_size, 7, 1}

model =
  Axon.input("input", shape: input_shape)
  |> Axon.lstm(5, name: "LSTM")
  |> then(fn {out, _} -> out end)
  # takes the last element of the sequence
  |> Axon.nx(fn t -> t[[0..-1//1, -1]] end)
  |> Axon.dense(1)

Axon.Display.as_graph(model, Nx.template(input_shape, :f32))

Before training we split our train and test data in batches using the specified batch size.

y_train_batches = Nx.to_batched(y_train, batch_size)

x_train_batches =
  x_train
  |> Nx.reshape({:auto, win_size, 1})
  |> Nx.to_batched(batch_size)

y_test_batches = Nx.to_batched(y_test, batch_size)

x_test_batches =
  x_test
  |> Nx.reshape({:auto, win_size, 1})
  |> Nx.to_batched(batch_size)

train_data = Stream.zip([x_train_batches, y_train_batches])
test_data = Stream.zip([x_test_batches, y_test_batches])

We are ready to create our training loop and run it. Since our network is simple, we choose SGD as the network optimizer.

optimizer = Polaris.Optimizers.sgd(learning_rate: 0.08)
loop = Axon.Loop.trainer(model, :mean_squared_error, optimizer)

trained_model_state = Axon.Loop.run(loop, train_data, %{}, epochs: 500)

Finally, we make predictions over the train and test sets so we can evaluate our model.

y_pred_train =
  x_train_batches
  |> Enum.map(&Axon.predict(model, trained_model_state, &1))
  |> Nx.concatenate()

y_pred_test =
  x_test_batches
  |> Enum.map(&Axon.predict(model, trained_model_state, &1))
  |> Nx.concatenate()
y_train
|> Axon.Losses.mean_squared_error(y_pred_train, reduction: :mean)
|> Nx.to_number()
|> IO.inspect(label: "MSE train")

y_test
|> Axon.Losses.mean_squared_error(y_pred_test, reduction: :mean)
|> Nx.to_number()
|> IO.inspect(label: "MSE test")

:ok

Visualizing predictions

Let’s visualize the predictions compared to the real trend of the prices. First we build the plot data required for each layer.

interval = Nx.iota({dataset_size})

real_plt_data = [
  Prices: trunc_data[(win_size - 1)..(dataset_size - 1)],
  Day: interval[(win_size - 1)..(dataset_size - 1)],
  Eval: List.duplicate("Real prices", dataset_size)
]

{train_size} = Nx.shape(y_train)
{test_size} = Nx.shape(y_test)

data_eval_train = [
  Prices: Nx.to_flat_list(y_pred_train),
  Day: interval[(win_size - 1)..(train_size + win_size - 1)],
  Eval: List.duplicate("Predictions train", train_size)
]

data_eval_test = [
  Prices: Nx.to_flat_list(y_pred_test),
  Day: interval[(train_size + win_size - 1)..(dataset_size - 1)],
  Eval: List.duplicate("Predictions test", test_size)
]

opts = [stroke_width: 2]
Tucan.layers([
  Tucan.lineplot(real_plt_data, "Day", "Prices", Keyword.put(opts, :stroke_dash, [2])),
  Tucan.lineplot(data_eval_train, "Day", "Prices", opts),
  Tucan.lineplot(data_eval_test, "Day", "Prices", opts)
])
|> Tucan.color_by("Eval")
|> Tucan.set_theme(:urban_institute)
|> Tucan.set_size(600, 400)
|> Tucan.Scale.set_x_domain(win_size, dataset_size)

# Comment to visualize with KinoVegaLite
:ok

|:-:| |Predictions over train set and test sets compared to real prices (epochs = 500).|