Powered by AppSignal & Oban Pro

Ch 10: RNNs for time series

Ch10 - Time series.livemd

Ch 10: RNNs for time series

Mix.install([
  {:explorer, "~> 0.7.0"},
  {:nx, "~> 0.6"},
  {:exla, "~> 0.6"},
  {:axon, "~> 0.5"},
  {:vega_lite, "~> 0.1.6"},
  {:kino, "~> 0.8.0"},
  {:kino_vega_lite, "~> 0.1.7"}
])

alias VegaLite, as: Vl
alias Explorer.DataFrame, as: Df

Exploring the data

base_path = "Dev/Education/Elixir/ml/Datasets/"
csv_file = "stocks/all_stocks_2006-01-01_to_2018-01-01.csv"
df = Df.from_csv!(base_path <> csv_file, parse_dates: true)
df.names |> IO.inspect()

df = Df.select(df, ["Date", "Close", "Name"])
# Just taking a look at what to_columns does
Explorer.DataFrame.to_columns(df)
mk_graph = fn df, title -> 
  Vl.new(title: title, width: 640, height: 480)
|> Vl.data_from_values(Explorer.DataFrame.to_columns(df))
|> Vl.mark(:line)
|> Vl.encode_field(:x, "Date", type: :temporal)
|> Vl.encode_field(:y, "Close", type: :quantitative)
|> Vl.encode_field(:color, "Name", type: :nominal)
|> Kino.VegaLite.new()
end

mk_graph.(df, "DJIA Stock Prices")

We’re only concerned with the price of the AAPL stock, so we can filter the data accordingly:

aapl_df = Df.filter_with(df, fn df -> 
  Explorer.Series.equal(df["Name"], "AAPL")
end)
mk_graph.(aapl_df, "AAPL Stock Prices")

Prep data

normalized_aapl_df =
  Df.mutate_with(aapl_df, fn df ->
    var = Explorer.Series.variance(df["Close"])
    mean = Explorer.Series.mean(df["Close"])
    centered = Explorer.Series.subtract(df["Close"], mean)
    norm = Explorer.Series.divide(centered, var)
    [Close: norm]
  end)

# get a sense of range
min = Explorer.Series.min(normalized_aapl_df["Close"])
max = Explorer.Series.max(normalized_aapl_df["Close"])
IO.inspect({min, max})

mk_graph.(normalized_aapl_df, "AAPL normalised stock price")
defmodule Data do
  @spec window(
          inputs :: Enumerable.t(),
          window_size :: non_neg_integer(),
          target_window_size :: non_neg_integer()
        ) :: Enumerable.t({Enumerable.t(), Enumerable.t()})

  def window(inputs, window_size, target_window_size) do
    inputs
    |> Stream.chunk_every(window_size + target_window_size, 1, :discard)
    |> Stream.map(fn window ->
      features =
        window
        |> Enum.take(window_size)
        |> Nx.tensor()
        |> Nx.new_axis(1)

      targets =
        window
        |> Enum.drop(window_size)
        |> Nx.tensor()
        |> Nx.new_axis(1)

      {features, targets}
    end)
  end

  @spec batch(
          inputs :: Enumerable.t({Enumerable.t(), Enumerable.t()}),
          batch_size :: non_neg_integer()
        ) :: Enumerable.t({any(), any()})
  def batch(inputs, batch_size) do
    inputs
    |> Stream.chunk_every(batch_size, batch_size, :discard)
    |> Stream.map(fn windows ->
      {features, targets} = Enum.unzip(windows)
      {Nx.stack(features), Nx.stack(targets)}
    end)
  end
end
train_df =
  Explorer.DataFrame.filter_with(normalized_aapl_df, fn df ->
    Explorer.Series.less(df["Date"], Date.new!(2016, 1, 1))
  end)

test_df =
  Explorer.DataFrame.filter_with(normalized_aapl_df, fn df ->
    Explorer.Series.greater_equal(df["Date"], Date.new!(2016, 1, 1))
  end)
window_size = 5
batch_size = 32

train_prices = Explorer.Series.to_list(train_df["Close"])
test_prices = Explorer.Series.to_list(test_df["Close"])

single_step_train_data =
  train_prices
  |> Data.window(window_size, 1)
  # [
  #   {#Nx.Tensor,
  #    #Nx.Tensor}
  # ...]
  |> Data.batch(batch_size)
# [
#   {#Nx.Tensor,
#   #Nx.Tensor}
# ...]

single_step_test_data =
  test_prices
  |> Data.window(window_size, 1)
  |> Data.batch(batch_size)
Enum.take(single_step_train_data, 1)
train = fn (model, epochs, optimizer, loss, metrics) ->
    model
    |> Axon.Loop.trainer(loss, optimizer)
    |> Axon.Loop.metric(metrics)
    |> Axon.Loop.run(single_step_train_data, %{}, epochs: epochs, compiler: EXLA)
end

evaluate = fn (model, trained_state, metrics) -> 
model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(metrics)
|> Axon.Loop.run(single_step_test_data, trained_state, compiler: EXLA)
  
end
defmodule Analysis do
  def visualize_predictions(
        model,
        model_state,
        prices,
        window_size,
        target_window_size,
        batch_size
      ) do
    {_, predict_fn} = Axon.build(model, compiler: EXLA)

    windows =
      prices
      |> Data.window(window_size, target_window_size)
      |> Data.batch(batch_size)
      |> Stream.map(&amp;elem(&amp;1, 0))

    predicted =
      Enum.flat_map(windows, fn window ->
        predict_fn.(model_state, window)
        |> Nx.to_flat_list()
      end)

    predicted = List.duplicate(nil, 10) ++ predicted

    types =
      List.duplicate("AAPL", length(prices)) ++
        List.duplicate("Predicted", length(prices))

    days =
      Enum.to_list(0..(length(prices) - 1)) ++
        Enum.to_list(0..(length(prices) - 1))

    prices = prices ++ predicted

    plot(
      %{
        "day" => days,
        "prices" => prices,
        "types" => types
      },
      "AAPL Stock Price vs. Predicted, CNN Single-Shot"
    )

    predicted
  end

  defp plot(values, title) do
    Vl.new(title: title, width: 640, height: 480)
    |> Vl.data_from_values(values)
    |> Vl.mark(:line)
    |> Vl.encode_field(:x, "day", type: :temporal)
    |> Vl.encode_field(:y, "prices", type: :quantitative)
    |> Vl.encode_field(:color, "types", type: :nominal)
    |> Kino.VegaLite.new()
  end
end

CNN for single step prediction

cnn_model =   
  Axon.input("stock_price")   
  |> Axon.nx(&amp;Nx.new_axis(&amp;1, -1))   
  |> Axon.conv(32, kernel_size: {window_size, 1}, activation: :relu)   
  |> Axon.dense(32, activation: :relu)   
  |> Axon.dense(1)
template = Nx.template({32, 10, 1}, :f32)
Axon.Display.as_graph(cnn_model, template)
cnn_trained_model_state =
  train.(cnn_model, 10, :adam, :mean_squared_error, :mean_absolute_error)
variance = Explorer.Series.variance(df["Close"])
mean = Explorer.Series.mean(df["Close"])

denormalise = fn %{0 => error_map} ->
  error_map
  |> Map.get("mean_absolute_error")
  |> Nx.to_number()
  |> Kernel.*(:math.sqrt(variance))
  |> Kernel.+(mean)
end

error = evaluate.(cnn_model, cnn_trained_model_state, :mean_absolute_error)
|> IO.inspect()
|> denormalise.()

IO.puts("""

The denormalised error is #{error}. 

You can interpret the result as meaning that over the course of two years, 
your model had an absolute error of $#{Float.round(error, 2)} off the next day’s closing stock price across each batch.
""")
Analysis.visualize_predictions(
  cnn_model,
  cnn_trained_model_state,
  Explorer.Series.to_list(aapl_df["Close"]),
  window_size,
  1,
  batch_size
)

RNN for time series

rnn_model =
  Axon.input("stock_prices")
  |> Axon.lstm(32)
  |> elem(0)
  |> Axon.nx(&amp; &amp;1[[0..-1//1, -1, 0..-1//1]])
  |> Axon.dense(1)
template = Nx.template({32, 10, 1}, :f32)
Axon.Display.as_graph(rnn_model, template)
rnn_trained_state =
  train.(rnn_model, 30, :adam, :mean_squared_error, :mean_absolute_error)
variance = Explorer.Series.variance(df["Close"])
mean = Explorer.Series.mean(df["Close"])

denormalise = fn %{0 => error_map} ->
  error_map
  |> Map.get("mean_absolute_error")
  |> Nx.to_number()
  |> Kernel.*(:math.sqrt(variance))
  |> Kernel.+(mean)
end

error = evaluate.(rnn_model, rnn_trained_state, :mean_absolute_error)
|> IO.inspect()
|> denormalise.()

IO.puts("""

The denormalised error is #{error}. 

You can interpret the result as meaning that over the course of two years, 
your model had an absolute error of $#{Float.round(error, 2)} off the next day’s closing stock price across each batch.
""")
Analysis.visualize_predictions(
  rnn_model,
  rnn_trained_state,
  Explorer.Series.to_list(aapl_df["Close"]),
  window_size,
  1,
  batch_size
)