MET-1246
Mix.install([
{:nx, "~> 0.5"},
{:exla, "~> 0.5.1"},
{:explorer, "~> 0.5"},
{:scholar, git: "https://github.com/elixir-nx/scholar"}
])
Nx.global_default_backend(EXLA.Backend)
Covariance and correlation
The new [Scholar][https://github.com/elixir-nx/scholar] package contains this, but it hasn’t been published yet so no docs to refer to. Copy paste here, therefore, of the most relevant bit we’re interested in for now, the correlation matrix:
$$ Corr(X_i, X_j) = \frac{Cov(X_i, X_j)}{\sqrt{Cov(X_i, X_i)Cov(X_j, X_j)}} $$
Where:
- $X_i$ is a $i$th row of input
- $Cov(X_i, X_j)$ is covariance between features $X_i$ and $X_j$
# Testing the above, example from the docs
Scholar.Covariance.correlation_matrix(Nx.tensor([[3, 6, 5], [26, 75, 3], [23, 4, 1]]))
Scholar.Covariance.correlation_matrix(Nx.tensor([[1, 10], [5, 51], [10, 101]]))
# An example to show how things are laid out
Scholar.Covariance.correlation_matrix(Nx.tensor([[1, 10, 100], [5, 50, 50], [10, 100, 10]]))
Obtaining data
System.schedulers_online()
Data was exported to S3 from RDS. We have the raw tables in Parquet format. First, we need to read these tables and filter out just the telemetry and errors. The Explorer package was made to do this.
# S3 Fuse stuff with AWS SSO does not work, so we, alas, have to start from
# local files
base_dir = "/data/livebooks/data/bulk/s3/production_eventstore/public.events/1"
out = "/data/livebooks/data/bulk/telemetry_added.csv"
out_file = File.open!(out, [:write, {:delayed_write, 1024 * 1024 * 100, 10 * 100}])
wanted_event_type = "Elixir.Domain.Monitor.Events.TelemetryAdded"
base_dir
|> File.ls!()
|> Enum.filter(&String.ends_with?(&1, ".parquet"))
|> Task.async_stream(
fn file ->
df = Explorer.DataFrame.from_parquet!(Path.join(base_dir, file))
df
|> Explorer.DataFrame.filter_with(fn d ->
Explorer.Series.equal(d["event_type"], wanted_event_type)
end)
|> Explorer.DataFrame.select("data")
|> Explorer.DataFrame.to_series()
|> Map.get("data")
|> Explorer.Series.to_list()
|> Enum.map(fn s ->
telem = Jason.decode!(Jason.decode!(s))
{:ok, dt, _} = DateTime.from_iso8601(telem["created_at"] <> "Z")
ts = DateTime.to_unix(dt, :second)
IO.write(
out_file,
"#{ts},#{telem["id"]},#{telem["check_logical_name"]},#{telem["instance_name"]},#{telem["value"]}\n"
)
end)
end,
timeout: 100_000,
ordered: false
)
|> Stream.run()
File.close(out_file)
Fetching errors
As we may need them, we can fetch errors in much the same way.
base_dir = "/data/livebooks/data/bulk/s3/production_eventstore/public.events/1"
out = "/data/livebooks/data/bulk/error_added.csv"
out_file = File.open!(out, [:write, {:delayed_write, 1024 * 1024, 10 * 100}])
wanted_event_type = "Elixir.Domain.Monitor.Events.ErrorAdded"
base_dir
|> File.ls!()
|> Enum.filter(&String.ends_with?(&1, ".parquet"))
|> Enum.take(10)
|> Task.async_stream(
fn file ->
df = Explorer.DataFrame.from_parquet!(Path.join(base_dir, file))
df
|> Explorer.DataFrame.filter_with(fn d ->
Explorer.Series.equal(d["event_type"], wanted_event_type)
end)
|> Explorer.DataFrame.select("data")
|> Explorer.DataFrame.to_series()
|> Map.get("data")
|> Explorer.Series.to_list()
|> Enum.map(fn s ->
telem = Jason.decode!(Jason.decode!(s))
{:ok, dt, _} = DateTime.from_iso8601(telem["created_at"] <> "Z")
ts = DateTime.to_unix(dt, :second)
# It smells like error added events have a bad ID. Form it on the fly instead.
IO.write(
out_file,
"#{ts},#{telem["account_id"]}#{telem["monitor_logical_name"]},#{telem["check_logical_name"]},#{telem["instance_name"]}\n"
)
end)
end,
timeout: 100_000,
ordered: false
)
|> Stream.run()
File.close(out_file)
Sorting and reading back
Sorting is best done on the command line:
time sort --field-separator=, --key=1n --parallel=$(expr $(nproc) / 4 \* 3) --output=telemetry_added.srt --buffer-size=75% telemetry_added.csv
given that GNU sort already has options to use lots of memory and CPU to get the job done in a couple of minutes.
Next: the hard bit. We want to convert the time series with (date, monitor_logical_name, check, region, value) into columns of a matrix:
Date | Acct_Monitor1 | Acct_Monitor2 | … |
---|---|---|---|
date1 | value1 | value2 | … |
date2 | value3 | value4 | … |
| … | … | … |
So that we can create a correlation matrix over all the values. For now, we drop checks as a dimension, we can always change that.
First, let’s get a feel for the size of the final matrix:
awk -F, '{print $2}' < telemetry_added.srt | sort -u --parallel=20 --buffer-size=40% | wc -l
gives us the number of columns, which is 111. Doing the same for columns 2 and 3 we get 360. We probably should do both.
For the rows, we have 21 months worth of data so an upper limit of around 1 million minutes (the matrix will be sparse, of course, so we need a way to supply/interpolate missing values). A hundred million floats should fit in memory, and the output matrix should be just 111x111 or 360x360 (or 500x500 if we do everything at once).
Building the tensor
input = "/home/eng/dev/backend/livebooks/data/bulk/telemetry_added.csv"
sorted = String.replace(input, ".csv", ".srt")
do_stream_csv = fn filename ->
filename
|> File.stream!()
|> Stream.map(fn line ->
[dtstring, id, check, instance, valuestring] = String.split(line, ",")
{dt, _} = Integer.parse(dtstring)
{value, _} = Float.parse(valuestring)
{dt, id, check, instance, value}
end)
end
Finding the columns
keys =
do_stream_csv.(input)
|> Stream.map(fn {_dt, id, check, _instance, _value} ->
shortkey = "#{id}"
longkey = "#{id}_#{check}"
{shortkey, longkey}
end)
# |> Enum.take(100)
|> Stream.transform(
# first
fn -> MapSet.new() end,
# reducer
fn {shortkey, longkey}, set ->
set =
set
|> MapSet.put(shortkey)
|> MapSet.put(longkey)
{[], set}
end,
# last
fn acc -> {Enum.to_list(acc), acc} end,
# after
fn _ -> nil end
)
|> Enum.sort()
|> Enum.with_index()
# For now, dump the keys in a temporary file
File.write!("/data/livebooks/data/bulk/columns.bin", :erlang.term_to_binary(keys))
Finding the rows
keys =
do_stream_csv.(input)
|> Stream.map(fn {dt, _id, _check, _instance, _value} ->
# Convert to minutes
div(dt, 60)
end)
# |> Enum.take(100)
|> Stream.transform(
# first
fn -> MapSet.new() end,
# reducer
fn minutes, set ->
set = MapSet.put(set, minutes)
{[], set}
end,
# last
fn acc -> {Enum.to_list(acc), acc} end,
# after
fn _ -> nil end
)
|> Enum.sort()
|> Enum.with_index()
# For now, dump the keys in a temporary file
File.write!("/data/livebooks/data/bulk/rows.bin", :erlang.term_to_binary(keys))
Footnote: this took way too long, but as it is a one-off, we’ll let it slide. This takes hours, sort
on the command line minutes. So we know for next time :)
Creating the datastructure
With this, we can now build up the matrix - we have the row/column layout and the data is sorted so we fill a minute row, then continue to the next one.
It’s hard to find methods for building up these data structures in memory, but as this is an intermediate result that’s expensive to create it won’t hurt to build it up on disk. We create row by row and then dump the binary to disk; this way we can recreate the whole matrix by simply reading the whole file in one go and reshaping the resulting tensor to the correct size.
base = "/home/eng/dev/backend/livebooks/data/bulk"
rows = File.read!(Path.join(base, "rows.bin")) |> :erlang.binary_to_term()
cols = File.read!(Path.join(base, "columns.bin")) |> :erlang.binary_to_term()
col_count = Enum.count(cols)
col_max = col_count - 1
row_map = Map.new(rows)
col_map = Map.new(cols)
table = :ets.new(:column_map, [:private])
out = File.open!(Path.join(base, "matrix.bin"), [:write, :binary])
result =
do_stream_csv.(sorted)
# |> Stream.take(100000)
|> Stream.chunk_by(fn {dt, _, _, _, _} -> div(dt, 60) end)
|> Task.async_stream(
fn chunk ->
{_row, cols} =
Enum.reduce(chunk, {-1, %{}}, fn {dt, id, check, _instance, value}, {row, cols} ->
# If we don't have the row number yet, fetch it
row =
case row do
-1 -> Map.get(row_map, div(dt, 60)) || raise("Row #{row} not in map!")
row -> row
end
short_key = "#{id}"
long_key = "#{id}_#{check}"
cols =
[short_key, long_key]
|> Enum.reduce(cols, fn key, cols ->
col = Map.get(col_map, key)
Map.update(cols, col, [value], fn cur -> [value | cur] end)
end)
{row, cols}
end)
0..col_max
|> Enum.map(fn i ->
cols
|> Map.get(i, [0.0])
|> Nx.tensor(type: :f32)
|> Nx.mean()
end)
|> Nx.stack()
|> Nx.to_binary()
end,
timeout: 100_000
)
|> Stream.map(fn {:ok, bytes} ->
IO.binwrite(out, bytes)
end)
|> Enum.to_list()
|> Enum.count()
File.close(out)
"matrix.bin created as :f32 with #{result} rows. Reshape on reading with col count #{col_count} and the first dimension calculated from actual size."