Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

warragamba.livebook

warragamba.livemd

warragamba.livebook

Mix.install(
  [
    {:httpoison, "~> 1.8"},
    {:jason, "~> 1.4"},
    {:vega_lite, "~> 0.1.5"},
    {:kino, "~> 0.8.0"},
    {:kino_vega_lite, "~> 0.1.1"},
    {:explorer, "~> 0.4.0"},
    {:axon, "~> 0.3.0"},
    {:exla, "~> 0.4.0"},
    {:nx, "~> 0.4.0"},
    {:table_rex, "~> 3.1"}
  ],
  config: [
    nx: [default_backend: EXLA.Backend]
  ]
)

alias VegaLite, as: Vl
alias Explorer.DataFrame, as: DF
alias Explorer.Series

# Sets the global compilation options
Nx.Defn.global_default_options(compiler: EXLA)
# Sets the process-level compilation options
Nx.Defn.default_options(compiler: EXLA)

Fetch Data From API

# defining api calls to get data

require Explorer.DataFrame

defmodule WaterAPI do
  require Logger

  @water_nsw_api_endpoint "https://realtimedata.waternsw.com.au/cgi/webservice.pl"

  def get_station_by_coords(longitude, latitude, radius) do
    %{
      "function" => "get_db_info",
      "version" => 3,
      "params" => %{
        "table_name" => "site",
        "return_type" => "array",
        "filter_values" => %{"active" => "true"},
        "geo_filter" => %{
          "circle" => [longitude, latitude, radius]
        }
      }
    }
    |> Jason.encode!()
    |> post_to_api()
  end

  def get_variable_list(station_ids, opts \\ []) do
    data_source = Keyword.get(opts, :data_source, "A")
    station_ids_as_string = Enum.join(station_ids, ",")

    %{
      "function" => "get_variable_list",
      "version" => 1,
      "params" => %{
        "site_list" => station_ids_as_string,
        "datasource" => data_source
      }
    }
    |> Jason.encode!()
    |> post_to_api()
  end

  def get_ts_trace(station_id, start_time, end_time, var_list, interval, aggregate, opts) do
    datasource = Keyword.get(opts, :data_source, "A")
    multiplier = Keyword.get(opts, :multiplier, "1")

    var_list_string = Enum.join(var_list, ",")

    %{
      function: "get_ts_traces",
      version: 2,
      params: %{
        site_list: station_id,
        datasource: datasource,
        start_time: start_time,
        end_time: end_time,
        var_list: var_list_string,
        interval: interval,
        multiplier: multiplier,
        data_type: aggregate
      }
    }
    |> Jason.encode!()
    |> post_to_api()
    |> case do
      {:ok, data} ->
        data["traces"]
        |> List.first()
        |> Map.get("trace")

      {:error, msg} ->
        msg
    end
  end

  defp post_to_api(payload) do
    HTTPoison.post!(
      @water_nsw_api_endpoint,
      payload,
      [],
      timeout: 50_000,
      recv_timeout: 50_000
    )
    |> then(fn r -> r.body end)
    |> Jason.decode!()
    |> case do
      %{"_return" => data} -> {:ok, data}
      %{"error_msg" => error_msg} -> {:error, error_msg}
    end
  end
end
defmodule Helper do
  def rename_cols_by_id(dfs, station_ids, variable_no) do
    for {df, station_id} <- Enum.zip(dfs, station_ids) do
      DF.rename(
        df,
        q: "q_#{station_id}_#{variable_no}",
        v: "v_#{station_id}_#{variable_no}"
      )
    end
  end

  def join_dfs(dfs) when is_list(dfs) do
    Enum.reduce(dfs, fn df, acc ->
      DF.join(df, acc, how: :inner)
    end)
  end

  def split_data(df, decimal) do
    row_no = DF.n_rows(df)

    {first, second} =
      df
      |> DF.to_rows()
      |> Enum.shuffle()
      |> Enum.split(round(decimal * row_no))

    {DF.new(first), DF.new(second)}
  end

  def df_to_batches(df, feature_cols, label_col, batch_size \\ 1) do
    Stream.zip(
      df_to_tensor_batches(df[feature_cols], batch_size),
      df_to_tensor_batches(df[[label_col]], batch_size)
    )
  end

  def df_to_tensor_batches(df, batch_size) do
    df
    |> df_to_tensor()
    |> Nx.shuffle(axis: 0)
    |> Nx.to_batched(batch_size, leftover: :discard)
  end

  def df_to_tensor(df) do
    df
    |> DF.names()
    |> Enum.map(&amp;Explorer.Series.to_tensor(df[&amp;1]))
    |> Nx.stack(axis: 1)
  end
end
# weather and stream station ids, closest to the dam, see here for more info:
# https://realtimedata.waternsw.com.au/

weather_station_ids = ["563035", "563046", "563079", "568045", "568051"]
stream_station_ids = ["212250", "212270"]

water_level_df =
  WaterAPI.get_ts_trace(
    "212242",
    "20080130000000",
    "20220112000000",
    ["130.00"],
    "day",
    "mean",
    datasource: "CP"
  )
  |> DF.new()
  |> DF.mutate(v: Series.cast(v, :float))

rainfall_dfs =
  for station_id <- weather_station_ids do
    WaterAPI.get_ts_trace(
      station_id,
      "20080130000000",
      "20220112000000",
      ["10.00"],
      "day",
      "tot",
      datasource: "CP"
    )
    |> DF.new()
    |> DF.mutate(v: Series.cast(v, :float))
  end

stream_dfs =
  for station_id <- stream_station_ids do
    WaterAPI.get_ts_trace(
      station_id,
      "20080130000000",
      "20220112000000",
      ["100.00"],
      "day",
      "mean",
      datasource: "CP"
    )
    |> DF.new()
    |> DF.mutate(v: Series.cast(v, :float))
  end

Deriving ‘Water Level Difference’

# making new column which is water level shifted by one day
water_level_tomorrow =
  water_level_df["v"]
  |> Series.shift(-1)
  |> then(fn s -> DF.new(water_level_tomorrow: s) end)

water_level_df =
  water_level_df
  |> DF.concat_columns(water_level_tomorrow)
  |> DF.mutate(water_level_difference: water_level_tomorrow - v)
  |> DF.discard("water_level_tomorrow")

Cleaning

water_level_df =
  DF.filter(
    water_level_df,
    q != 201 and q != 255
  )

rainfall_dfs =
  for df <- rainfall_dfs do
    df
    |> DF.mutate(m: cast(q != 201 and q != 255, :integer))
    |> DF.mutate(v: v * m)
    |> DF.discard(:m)
  end

stream_dfs =
  for df <- stream_dfs do
    df
    |> DF.mutate(m: cast(q != 201 and q != 255, :integer))
    |> DF.mutate(v: v * m)
    |> DF.discard(:m)
  end

Renaming Columns

water_level_df = DF.rename(water_level_df, q: "q_212242_130", v: "v_212242_130")

rainfall_df =
  rainfall_dfs
  |> Helper.rename_cols_by_id(weather_station_ids, "10")
  |> Helper.join_dfs()

stream_df =
  stream_dfs
  |> Helper.rename_cols_by_id(stream_station_ids, "100")
  |> Helper.join_dfs()

Joining Data Frames

df =
  water_level_df
  |> DF.join(rainfall_df)
  |> DF.join(stream_df)

DF.names(df)

Prepare Data for Neural Net

feature_columns = [
  "v_568051_10",
  "v_568045_10",
  "v_563079_10",
  "v_563046_10",
  "v_563035_10",
  "v_212250_100",
  "v_212270_100"
]

label_column = "water_level_difference"
model_columns = [label_column | feature_columns]

df = DF.select(df, model_columns)

{train_df, test_df} = Helper.split_data(df, 0.8)
{train_df, validation_df} = Helper.split_data(train_df, 0.8)

training_batches = Helper.df_to_batches(train_df, feature_columns, label_column, 8)
validation_batches = Helper.df_to_batches(validation_df, feature_columns, label_column, 1)
testing_batches = Helper.df_to_batches(test_df, feature_columns, label_column, 1)
row_no = DF.n_rows(df)

graph_data =
  DF.concat_columns(
    df[[label_column]],
    DF.new(%{"count" => Enum.map(1..row_no, &amp; &amp;1)})
  )
  |> DF.to_rows()

Vl.new(width: 800, height: 600)
|> Vl.data_from_values(graph_data |> Enum.take(300))
|> Vl.encode(:y, field: "water_level_difference", type: :quantitative)
|> Vl.encode(:x, field: "count", type: :quantitative)
|> Vl.mark(:line)
|> Vl.param("grid", select: :interval, bind: :scales)

Building the Model

model =
  Axon.input("stream_and_rain_model")
  |> Axon.dropout(rate: 0.5)
  |> Axon.dense(16)
  |> Axon.dropout(rate: 0.5)
  |> Axon.relu()
  |> Axon.dense(1)

Training the Model

model_params =
  model
  |> Axon.Loop.trainer(:mean_absolute_error, Axon.Optimizers.adam(0.001))
  |> Axon.Loop.validate(model, validation_batches)
  |> Axon.Loop.metric(:mean_absolute_error, "validation_loss")
  |> Axon.Loop.run(training_batches, %{}, epochs: 50)

Evaluating the Model

model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:mean_absolute_error)
|> Axon.Loop.run(testing_batches, model_params, epoch: 1)

Prediction vs Actual Change in Water Level

all_features = Helper.df_to_tensor(df[feature_columns])

all_labels =
  df[label_column]
  |> Series.to_tensor()
  |> Nx.to_flat_list()

{_init_fn, predict_fn} = Axon.build(model, mode: :inference)

predictions =
  predict_fn.(model_params, all_features)
  |> Nx.to_flat_list()

row_no = 4640

chart_data =
  DF.new(
    prediction: predictions,
    actual: all_labels,
    count: Enum.map(1..row_no, &amp; &amp;1)
  )
  |> DF.to_rows()

Vl.new(width: 400, height: 400)
|> Vl.data_from_values(Enum.take(chart_data, 300))
|> Vl.layers([
  Vl.new()
  |> Vl.param("prediction_chart", select: :interval, bind: :scales, encodings: ["x", "y"])
  |> Vl.encode(:y, field: "prediction", type: :quantitative)
  |> Vl.encode(:x, field: "count", type: :quantitative)
  |> Vl.mark(:line, color: "orange"),
  Vl.new()
  |> Vl.encode(:x, field: "count", type: :quantitative)
  |> Vl.encode(:y, field: "actual", type: :quantitative)
  |> Vl.mark(:line, color: "blue")
])