Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

MET-1246 Make Pretty Picture

livebooks/MET-1246-analysis.livemd

MET-1246 Make Pretty Picture

Mix.install([
  {:nx, "~> 0.5"},
  {:exla, "~> 0.5.1"},
  {:explorer, "~> 0.5"},
  {:scholar, git: "https://github.com/elixir-nx/scholar"},
  {:jason, "~> 1.4"},
  {:kino, "~> 0.8.1"},
  {:kino_vega_lite, "~> 0.1.7"},
  {:chameleon, "~> 2.5"}
])

Nx.global_default_backend(EXLA.Backend)

Read in data

The first step is to read back the data we have generated. We have three main files:

  1. The main data table. This is a file that has a column for each monitor and each monitor/check combination and a row for each minute since May 25th, 2021 that has any measurements. If the raw data has more than one value for a given cell, we took the average of all measurements. Note that we dropped the “instance” dimension, everything here is aggregated over all instances.
  2. A file with row labels, containing lines with (row number, minute) tuples in case we want to translate back to the actual minute something happened.
  3. A file with column labels, containing lines with (column number, column name) tuples in case we want to translabe back to the actual account/monitor/check where we find interesting stuff.

We’re not really likely to use 2. and 3. but they’re small so it does not hurt to have them aroud and keep all the initialization code in a single block.

base = "/home/eng/dev/backend/livebooks/data/bulk"
rows = File.read!(Path.join(base, "rows.bin")) |> :erlang.binary_to_term()
cols = File.read!(Path.join(base, "columns.bin")) |> :erlang.binary_to_term()
col_count = Enum.count(cols)
row_count = Enum.count(rows)
col_max = col_count - 1

row_map = Map.new(rows)
col_map = Map.new(cols)

data_file = Path.join(base, "matrix.bin")

tensor =
  data_file
  |> File.read!()
  |> Nx.from_binary(:f32)

{points} = Nx.shape(tensor)
data_rows = div(points, col_count)
# Sanity check
^data_rows = row_count

tensor = Nx.reshape(tensor, {row_count, col_count})
as_image = fn tensor ->
  tensor
  |> Nx.multiply(256.0)
  |> Nx.as_type(:u8)
  |> Nx.reshape({col_count, col_count, 1})
  |> Kino.Image.new()
end

# Quick verification: self-correlation
tensor
|> Scholar.Covariance.correlation_matrix()
|> as_image.()

Normalize data

The first step is to normalize the data by converting them into z-scores. This makes correlation calculations (of which we have to do a lot) much simpler:

$$ z_i = \frac{x_i - \mu}{\sigma} $$

The axes: [0] arguments in the invocations below gives us a column-wise result, in other words one value per timeseries (monitor/check).

means = Nx.mean(tensor, axes: [0])
stddevs = Nx.standard_deviation(tensor, axes: [0])
centered = Nx.subtract(tensor, means)
zscores = Nx.divide(centered, stddevs)
# Quick verification, this should look like the one above
zscores
|> Scholar.Covariance.correlation_matrix()
|> as_image.()
# And if things are properly normalized, this should pretty much
# result in the same thing. 
zscores
|> Nx.dot([0], zscores, [0])
|> Nx.divide(row_count)
|> as_image.()

Calculating lagged correlations

Now we have massaged the data, we can calculate the lags. What we want is, for each data series, to see how well it correlates with any other data series when we shift time; dependencies between services don’t happen immediately. So what we do next is the heavy lifting: we calculate for every column the correlation coefficient between that column and every other column while it “slides” in time from 0 to 60 minutes.

Given that we have massaged the data, the calculation of the correlation coefficient of every column against every other column is now a simple matrix dot product. We take one copy that has the first rows sliced off and one copy that has the last rows sliced off to do the time shifting. A sketch makes this easy to see that this is correct. For the one minute shift we slide one copy “up”:

[  1  2  3 ]
[  4  5  6 ]  [  1  2  3 ]
[  7  8  9 ]  [  4  5  6 ]
[ 10 11 12 ]  [  7  8  9 ]
              [ 10 11 12 ]

and then drop the left matrix’ first row and the right matrix’ last row to get the offset data series:

[  4  5  6 ]  [  1  2  3 ]
[  7  8  9 ]  [  4  5  6 ]
[ 10 11 12 ]  [  7  8  9 ]

We can now get all correlation coefficients between all the left and all the right time series.

Note that we normalized the data based on the standard deviation and mean of the whole, not the “sliced off” time series so technically, everything will be a little bit off; however, given that our time series are very large and we’re only removing a handful of data items, it is unlikely that this will meaningfully change the end result.

Thanks to the z-score massaging, the correlation matrix is just the matrix dot product divided by the number of values in each series, so all we need to do is calculate that 60 times, for each minute shift.

lags = 60

correlations =
  0..lags
  |> Enum.map(fn shift ->
    cur_row_count = row_count - shift
    left = Nx.slice(zscores, [cur_row_count, 0], [cur_row_count, col_count])
    right = Nx.slice(zscores, [0, 0], [cur_row_count, col_count])
    dot = Nx.dot(left, [0], right, [0]) |> Nx.divide(cur_row_count)
    # We need to be aggresive here, otherwise XLA will happily eat all the memory
    # and get the VM OOM killed.
    :erlang.garbage_collect()
    dot
  end)

Visualization

For visualization, we can simply convert one of these 500x500-ish matrixes to a greyscale to see whether anything is in there.

pos = Kino.Input.range("Lag", min: 0, max: lags - 1)
cur_pos = Kino.Input.read(pos) |> round()
img =
  correlations
  |> Enum.at(cur_pos)
  |> as_image.()

But we can do a bit better. If, for every point, we look at the maximum correlation value and then use that as the brightness and the amount of shift as the color, for example.

# Stack all the correlations and find the depth/location of the maxima for each pixel
stacked = Nx.stack(correlations, axis: -1)
maxima = Nx.argmax(stacked, axis: -1)

# Time to leave Nx and go at it oldskool
maxima_enum = Nx.to_list(maxima)
stacks_enum = Nx.to_list(stacked)

values =
  for i <- 0..(col_count - 1) do
    for j <- 0..(col_count - 1) do
      max = Enum.at(maxima_enum, i) |> Enum.at(j)
      coeff = Enum.at(stacks_enum, i) |> Enum.at(j) |> Enum.at(max)
      {max, coeff}
    end
  end
cutoff_r = 0.65

color_data =
  for row <- values do
    for {max, coeff} <- row do
      # HSL goes around the circle from red to red, 
      # so we use only half of it to make things go from red to blue.
      h = max / lags * 240
      # lightness is correlation coefficient, but we cut it off to focus on the 
      # "interesting" values. 
      l = if coeff > cutoff_r, do: coeff * 100, else: 0
      color = Chameleon.HSL.new(h, 100, l)
      color = Chameleon.convert(color, Chameleon.Color.RGB)
      [color.r, color.g, color.b]
    end
  end
color_data
|> Nx.tensor(type: :u8)
|> Kino.Image.new()

Images like the above help in quickly setting a reasonable cut-off value for correlation coefficients we want to zoom in on, but we now need to cnovert the data back to a table with the proper labels.

labels =
  col_map
  |> Enum.sort_by(fn {_label, index} -> index end)
  |> Enum.map(fn {label, _index} -> label end)

# if true, we keep this data. We are looking for saas services being 
# dependent on cloud services - as we don't have tags here, wing it.
is_cloud = ~r/az|gcp|aws/

keep? = fn row_label, col_label ->
  row_label != col_label and
    Regex.match?(is_cloud, col_label) and
    not Regex.match?(is_cloud, row_label)
end

labeled =
  for {row_label, row} <- Enum.zip(labels, values) do
    row =
      row
      |> Enum.zip(labels)
      |> Enum.map(fn {{lag_index, value}, col_label} ->
        if keep?.(row_label, col_label) &amp;&amp; value > cutoff_r do
          {value, lag_index, col_label}
        else
          nil
        end
      end)
      # We have data labeled now so no need for absolute row/col positions
      |> Enum.reject(&amp;is_nil/1)

    case row do
      [] -> nil
      row -> {row_label, row}
    end
  end
  |> Enum.reject(&amp;is_nil/1)
# We now have {label => [{label, r}, ...]} and we need a flat list
# to be a proper table.
labeled =
  labeled
  |> Enum.map(fn {row_label, row} ->
    row_map = %{id: row_label}

    Enum.reduce(row, row_map, fn {r, lag, col_label}, row_map ->
      Map.put(row_map, col_label, {r, lag})
    end)
  end)
# We can't do sparse, so the above is nice for a concise summary but we have to expand everything with nills. 
nils =
  labeled
  |> Enum.map(&amp;Map.keys/1)
  |> List.flatten()
  |> Enum.uniq()
  |> Enum.map(fn k -> {k, nil} end)
  |> Map.new()

unsparse =
  labeled
  |> Enum.map(fn row ->
    Map.merge(nils, row)
  end)
Kino.DataTable.new(unsparse)