warragamba.livebook
Mix.install(
[
{:httpoison, "~> 1.8"},
{:jason, "~> 1.4"},
{:vega_lite, "~> 0.1.5"},
{:kino, "~> 0.8.0"},
{:kino_vega_lite, "~> 0.1.1"},
{:explorer, "~> 0.4.0"},
{:axon, "~> 0.3.0"},
{:exla, "~> 0.4.0"},
{:nx, "~> 0.4.0"},
{:table_rex, "~> 3.1"}
],
config: [
nx: [default_backend: EXLA.Backend]
]
)
alias VegaLite, as: Vl
alias Explorer.DataFrame, as: DF
alias Explorer.Series
# Sets the global compilation options
Nx.Defn.global_default_options(compiler: EXLA)
# Sets the process-level compilation options
Nx.Defn.default_options(compiler: EXLA)
Fetch Data From API
# defining api calls to get data
require Explorer.DataFrame
defmodule WaterAPI do
require Logger
@water_nsw_api_endpoint "https://realtimedata.waternsw.com.au/cgi/webservice.pl"
def get_station_by_coords(longitude, latitude, radius) do
%{
"function" => "get_db_info",
"version" => 3,
"params" => %{
"table_name" => "site",
"return_type" => "array",
"filter_values" => %{"active" => "true"},
"geo_filter" => %{
"circle" => [longitude, latitude, radius]
}
}
}
|> Jason.encode!()
|> post_to_api()
end
def get_variable_list(station_ids, opts \\ []) do
data_source = Keyword.get(opts, :data_source, "A")
station_ids_as_string = Enum.join(station_ids, ",")
%{
"function" => "get_variable_list",
"version" => 1,
"params" => %{
"site_list" => station_ids_as_string,
"datasource" => data_source
}
}
|> Jason.encode!()
|> post_to_api()
end
def get_ts_trace(station_id, start_time, end_time, var_list, interval, aggregate, opts) do
datasource = Keyword.get(opts, :data_source, "A")
multiplier = Keyword.get(opts, :multiplier, "1")
var_list_string = Enum.join(var_list, ",")
%{
function: "get_ts_traces",
version: 2,
params: %{
site_list: station_id,
datasource: datasource,
start_time: start_time,
end_time: end_time,
var_list: var_list_string,
interval: interval,
multiplier: multiplier,
data_type: aggregate
}
}
|> Jason.encode!()
|> post_to_api()
|> case do
{:ok, data} ->
data["traces"]
|> List.first()
|> Map.get("trace")
{:error, msg} ->
msg
end
end
defp post_to_api(payload) do
HTTPoison.post!(
@water_nsw_api_endpoint,
payload,
[],
timeout: 50_000,
recv_timeout: 50_000
)
|> then(fn r -> r.body end)
|> Jason.decode!()
|> case do
%{"_return" => data} -> {:ok, data}
%{"error_msg" => error_msg} -> {:error, error_msg}
end
end
end
defmodule Helper do
def rename_cols_by_id(dfs, station_ids, variable_no) do
for {df, station_id} <- Enum.zip(dfs, station_ids) do
DF.rename(
df,
q: "q_#{station_id}_#{variable_no}",
v: "v_#{station_id}_#{variable_no}"
)
end
end
def join_dfs(dfs) when is_list(dfs) do
Enum.reduce(dfs, fn df, acc ->
DF.join(df, acc, how: :inner)
end)
end
def split_data(df, decimal) do
row_no = DF.n_rows(df)
{first, second} =
df
|> DF.to_rows()
|> Enum.shuffle()
|> Enum.split(round(decimal * row_no))
{DF.new(first), DF.new(second)}
end
def df_to_batches(df, feature_cols, label_col, batch_size \\ 1) do
Stream.zip(
df_to_tensor_batches(df[feature_cols], batch_size),
df_to_tensor_batches(df[[label_col]], batch_size)
)
end
def df_to_tensor_batches(df, batch_size) do
df
|> df_to_tensor()
|> Nx.shuffle(axis: 0)
|> Nx.to_batched(batch_size, leftover: :discard)
end
def df_to_tensor(df) do
df
|> DF.names()
|> Enum.map(&Explorer.Series.to_tensor(df[&1]))
|> Nx.stack(axis: 1)
end
end
# weather and stream station ids, closest to the dam, see here for more info:
# https://realtimedata.waternsw.com.au/
weather_station_ids = ["563035", "563046", "563079", "568045", "568051"]
stream_station_ids = ["212250", "212270"]
water_level_df =
WaterAPI.get_ts_trace(
"212242",
"20080130000000",
"20220112000000",
["130.00"],
"day",
"mean",
datasource: "CP"
)
|> DF.new()
|> DF.mutate(v: Series.cast(v, :float))
rainfall_dfs =
for station_id <- weather_station_ids do
WaterAPI.get_ts_trace(
station_id,
"20080130000000",
"20220112000000",
["10.00"],
"day",
"tot",
datasource: "CP"
)
|> DF.new()
|> DF.mutate(v: Series.cast(v, :float))
end
stream_dfs =
for station_id <- stream_station_ids do
WaterAPI.get_ts_trace(
station_id,
"20080130000000",
"20220112000000",
["100.00"],
"day",
"mean",
datasource: "CP"
)
|> DF.new()
|> DF.mutate(v: Series.cast(v, :float))
end
Deriving ‘Water Level Difference’
# making new column which is water level shifted by one day
water_level_tomorrow =
water_level_df["v"]
|> Series.shift(-1)
|> then(fn s -> DF.new(water_level_tomorrow: s) end)
water_level_df =
water_level_df
|> DF.concat_columns(water_level_tomorrow)
|> DF.mutate(water_level_difference: water_level_tomorrow - v)
|> DF.discard("water_level_tomorrow")
Cleaning
water_level_df =
DF.filter(
water_level_df,
q != 201 and q != 255
)
rainfall_dfs =
for df <- rainfall_dfs do
df
|> DF.mutate(m: cast(q != 201 and q != 255, :integer))
|> DF.mutate(v: v * m)
|> DF.discard(:m)
end
stream_dfs =
for df <- stream_dfs do
df
|> DF.mutate(m: cast(q != 201 and q != 255, :integer))
|> DF.mutate(v: v * m)
|> DF.discard(:m)
end
Renaming Columns
water_level_df = DF.rename(water_level_df, q: "q_212242_130", v: "v_212242_130")
rainfall_df =
rainfall_dfs
|> Helper.rename_cols_by_id(weather_station_ids, "10")
|> Helper.join_dfs()
stream_df =
stream_dfs
|> Helper.rename_cols_by_id(stream_station_ids, "100")
|> Helper.join_dfs()
Joining Data Frames
df =
water_level_df
|> DF.join(rainfall_df)
|> DF.join(stream_df)
DF.names(df)
Prepare Data for Neural Net
feature_columns = [
"v_568051_10",
"v_568045_10",
"v_563079_10",
"v_563046_10",
"v_563035_10",
"v_212250_100",
"v_212270_100"
]
label_column = "water_level_difference"
model_columns = [label_column | feature_columns]
df = DF.select(df, model_columns)
{train_df, test_df} = Helper.split_data(df, 0.8)
{train_df, validation_df} = Helper.split_data(train_df, 0.8)
training_batches = Helper.df_to_batches(train_df, feature_columns, label_column, 8)
validation_batches = Helper.df_to_batches(validation_df, feature_columns, label_column, 1)
testing_batches = Helper.df_to_batches(test_df, feature_columns, label_column, 1)
row_no = DF.n_rows(df)
graph_data =
DF.concat_columns(
df[[label_column]],
DF.new(%{"count" => Enum.map(1..row_no, & &1)})
)
|> DF.to_rows()
Vl.new(width: 800, height: 600)
|> Vl.data_from_values(graph_data |> Enum.take(300))
|> Vl.encode(:y, field: "water_level_difference", type: :quantitative)
|> Vl.encode(:x, field: "count", type: :quantitative)
|> Vl.mark(:line)
|> Vl.param("grid", select: :interval, bind: :scales)
Building the Model
model =
Axon.input("stream_and_rain_model")
|> Axon.dropout(rate: 0.5)
|> Axon.dense(16)
|> Axon.dropout(rate: 0.5)
|> Axon.relu()
|> Axon.dense(1)
Training the Model
model_params =
model
|> Axon.Loop.trainer(:mean_absolute_error, Axon.Optimizers.adam(0.001))
|> Axon.Loop.validate(model, validation_batches)
|> Axon.Loop.metric(:mean_absolute_error, "validation_loss")
|> Axon.Loop.run(training_batches, %{}, epochs: 50)
Evaluating the Model
model
|> Axon.Loop.evaluator()
|> Axon.Loop.metric(:mean_absolute_error)
|> Axon.Loop.run(testing_batches, model_params, epoch: 1)
Prediction vs Actual Change in Water Level
all_features = Helper.df_to_tensor(df[feature_columns])
all_labels =
df[label_column]
|> Series.to_tensor()
|> Nx.to_flat_list()
{_init_fn, predict_fn} = Axon.build(model, mode: :inference)
predictions =
predict_fn.(model_params, all_features)
|> Nx.to_flat_list()
row_no = 4640
chart_data =
DF.new(
prediction: predictions,
actual: all_labels,
count: Enum.map(1..row_no, & &1)
)
|> DF.to_rows()
Vl.new(width: 400, height: 400)
|> Vl.data_from_values(Enum.take(chart_data, 300))
|> Vl.layers([
Vl.new()
|> Vl.param("prediction_chart", select: :interval, bind: :scales, encodings: ["x", "y"])
|> Vl.encode(:y, field: "prediction", type: :quantitative)
|> Vl.encode(:x, field: "count", type: :quantitative)
|> Vl.mark(:line, color: "orange"),
Vl.new()
|> Vl.encode(:x, field: "count", type: :quantitative)
|> Vl.encode(:y, field: "actual", type: :quantitative)
|> Vl.mark(:line, color: "blue")
])