Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

MET-1246

MET-1246-telemetry-correlation.livemd

MET-1246

Mix.install([
  {:nx, "~> 0.5"},
  {:exla, "~> 0.5.1"},
  {:explorer, "~> 0.5"},
  {:scholar, git: "https://github.com/elixir-nx/scholar"}
])

Nx.global_default_backend(EXLA.Backend)

Covariance and correlation

The new [Scholar][https://github.com/elixir-nx/scholar] package contains this, but it hasn’t been published yet so no docs to refer to. Copy paste here, therefore, of the most relevant bit we’re interested in for now, the correlation matrix:

$$ Corr(X_i, X_j) = \frac{Cov(X_i, X_j)}{\sqrt{Cov(X_i, X_i)Cov(X_j, X_j)}} $$

Where:

  • $X_i$ is a $i$th row of input
  • $Cov(X_i, X_j)$ is covariance between features $X_i$ and $X_j$
# Testing the above, example from the docs
Scholar.Covariance.correlation_matrix(Nx.tensor([[3, 6, 5], [26, 75, 3], [23, 4, 1]]))
Scholar.Covariance.correlation_matrix(Nx.tensor([[1, 10], [5, 51], [10, 101]]))
# An example to show how things are laid out
Scholar.Covariance.correlation_matrix(Nx.tensor([[1, 10, 100], [5, 50, 50], [10, 100, 10]]))

Obtaining data

System.schedulers_online()

Data was exported to S3 from RDS. We have the raw tables in Parquet format. First, we need to read these tables and filter out just the telemetry and errors. The Explorer package was made to do this.

# S3 Fuse stuff with AWS SSO does not work, so we, alas, have to start from 
# local files
base_dir = "/data/livebooks/data/bulk/s3/production_eventstore/public.events/1"
out = "/data/livebooks/data/bulk/telemetry_added.csv"

out_file = File.open!(out, [:write, {:delayed_write, 1024 * 1024 * 100, 10 * 100}])

wanted_event_type = "Elixir.Domain.Monitor.Events.TelemetryAdded"

base_dir
|> File.ls!()
|> Enum.filter(&String.ends_with?(&1, ".parquet"))
|> Task.async_stream(
  fn file ->
    df = Explorer.DataFrame.from_parquet!(Path.join(base_dir, file))

    df
    |> Explorer.DataFrame.filter_with(fn d ->
      Explorer.Series.equal(d["event_type"], wanted_event_type)
    end)
    |> Explorer.DataFrame.select("data")
    |> Explorer.DataFrame.to_series()
    |> Map.get("data")
    |> Explorer.Series.to_list()
    |> Enum.map(fn s ->
      telem = Jason.decode!(Jason.decode!(s))
      {:ok, dt, _} = DateTime.from_iso8601(telem["created_at"] <> "Z")
      ts = DateTime.to_unix(dt, :second)

      IO.write(
        out_file,
        "#{ts},#{telem["id"]},#{telem["check_logical_name"]},#{telem["instance_name"]},#{telem["value"]}\n"
      )
    end)
  end,
  timeout: 100_000,
  ordered: false
)
|> Stream.run()

File.close(out_file)

Fetching errors

As we may need them, we can fetch errors in much the same way.

base_dir = "/data/livebooks/data/bulk/s3/production_eventstore/public.events/1"
out = "/data/livebooks/data/bulk/error_added.csv"

out_file = File.open!(out, [:write, {:delayed_write, 1024 * 1024, 10 * 100}])

wanted_event_type = "Elixir.Domain.Monitor.Events.ErrorAdded"

base_dir
|> File.ls!()
|> Enum.filter(&amp;String.ends_with?(&amp;1, ".parquet"))
|> Enum.take(10)
|> Task.async_stream(
  fn file ->
    df = Explorer.DataFrame.from_parquet!(Path.join(base_dir, file))

    df
    |> Explorer.DataFrame.filter_with(fn d ->
      Explorer.Series.equal(d["event_type"], wanted_event_type)
    end)
    |> Explorer.DataFrame.select("data")
    |> Explorer.DataFrame.to_series()
    |> Map.get("data")
    |> Explorer.Series.to_list()
    |> Enum.map(fn s ->
      telem = Jason.decode!(Jason.decode!(s))
      {:ok, dt, _} = DateTime.from_iso8601(telem["created_at"] <> "Z")
      ts = DateTime.to_unix(dt, :second)

      # It smells like error added events have a bad ID. Form it on the fly instead.
      IO.write(
        out_file,
        "#{ts},#{telem["account_id"]}#{telem["monitor_logical_name"]},#{telem["check_logical_name"]},#{telem["instance_name"]}\n"
      )
    end)
  end,
  timeout: 100_000,
  ordered: false
)
|> Stream.run()

File.close(out_file)

Sorting and reading back

Sorting is best done on the command line:

time sort --field-separator=, --key=1n --parallel=$(expr $(nproc) / 4 \* 3) --output=telemetry_added.srt --buffer-size=75%  telemetry_added.csv 

given that GNU sort already has options to use lots of memory and CPU to get the job done in a couple of minutes.

Next: the hard bit. We want to convert the time series with (date, monitor_logical_name, check, region, value) into columns of a matrix:

Date Acct_Monitor1 Acct_Monitor2
date1 value1 value2
date2 value3 value4

| … | … | … |

So that we can create a correlation matrix over all the values. For now, we drop checks as a dimension, we can always change that.

First, let’s get a feel for the size of the final matrix:

awk -F, '{print $2}' < telemetry_added.srt | sort -u --parallel=20 --buffer-size=40% | wc -l

gives us the number of columns, which is 111. Doing the same for columns 2 and 3 we get 360. We probably should do both.

For the rows, we have 21 months worth of data so an upper limit of around 1 million minutes (the matrix will be sparse, of course, so we need a way to supply/interpolate missing values). A hundred million floats should fit in memory, and the output matrix should be just 111x111 or 360x360 (or 500x500 if we do everything at once).

Building the tensor

input = "/home/eng/dev/backend/livebooks/data/bulk/telemetry_added.csv"
sorted = String.replace(input, ".csv", ".srt")

do_stream_csv = fn filename ->
  filename
  |> File.stream!()
  |> Stream.map(fn line ->
    [dtstring, id, check, instance, valuestring] = String.split(line, ",")
    {dt, _} = Integer.parse(dtstring)
    {value, _} = Float.parse(valuestring)
    {dt, id, check, instance, value}
  end)
end

Finding the columns

keys =
  do_stream_csv.(input)
  |> Stream.map(fn {_dt, id, check, _instance, _value} ->
    shortkey = "#{id}"
    longkey = "#{id}_#{check}"
    {shortkey, longkey}
  end)
  # |> Enum.take(100)
  |> Stream.transform(
    # first
    fn -> MapSet.new() end,
    # reducer
    fn {shortkey, longkey}, set ->
      set =
        set
        |> MapSet.put(shortkey)
        |> MapSet.put(longkey)

      {[], set}
    end,
    # last
    fn acc -> {Enum.to_list(acc), acc} end,
    # after 
    fn _ -> nil end
  )
  |> Enum.sort()
  |> Enum.with_index()

# For now, dump the keys in a temporary file 
File.write!("/data/livebooks/data/bulk/columns.bin", :erlang.term_to_binary(keys))

Finding the rows

keys =
  do_stream_csv.(input)
  |> Stream.map(fn {dt, _id, _check, _instance, _value} ->
    # Convert to minutes
    div(dt, 60)
  end)
  # |> Enum.take(100)
  |> Stream.transform(
    # first
    fn -> MapSet.new() end,
    # reducer
    fn minutes, set ->
      set = MapSet.put(set, minutes)
      {[], set}
    end,
    # last
    fn acc -> {Enum.to_list(acc), acc} end,
    # after 
    fn _ -> nil end
  )
  |> Enum.sort()
  |> Enum.with_index()

# For now, dump the keys in a temporary file 
File.write!("/data/livebooks/data/bulk/rows.bin", :erlang.term_to_binary(keys))

Footnote: this took way too long, but as it is a one-off, we’ll let it slide. This takes hours, sort on the command line minutes. So we know for next time :)

Creating the datastructure

With this, we can now build up the matrix - we have the row/column layout and the data is sorted so we fill a minute row, then continue to the next one.

It’s hard to find methods for building up these data structures in memory, but as this is an intermediate result that’s expensive to create it won’t hurt to build it up on disk. We create row by row and then dump the binary to disk; this way we can recreate the whole matrix by simply reading the whole file in one go and reshaping the resulting tensor to the correct size.

base = "/home/eng/dev/backend/livebooks/data/bulk"
rows = File.read!(Path.join(base, "rows.bin")) |> :erlang.binary_to_term()
cols = File.read!(Path.join(base, "columns.bin")) |> :erlang.binary_to_term()
col_count = Enum.count(cols)
col_max = col_count - 1

row_map = Map.new(rows)
col_map = Map.new(cols)
table = :ets.new(:column_map, [:private])
out = File.open!(Path.join(base, "matrix.bin"), [:write, :binary])

result =
  do_stream_csv.(sorted)
  # |> Stream.take(100000)
  |> Stream.chunk_by(fn {dt, _, _, _, _} -> div(dt, 60) end)
  |> Task.async_stream(
    fn chunk ->
      {_row, cols} =
        Enum.reduce(chunk, {-1, %{}}, fn {dt, id, check, _instance, value}, {row, cols} ->
          # If we don't have the row number yet, fetch it
          row =
            case row do
              -1 -> Map.get(row_map, div(dt, 60)) || raise("Row #{row} not in map!")
              row -> row
            end

          short_key = "#{id}"
          long_key = "#{id}_#{check}"

          cols =
            [short_key, long_key]
            |> Enum.reduce(cols, fn key, cols ->
              col = Map.get(col_map, key)
              Map.update(cols, col, [value], fn cur -> [value | cur] end)
            end)

          {row, cols}
        end)

      0..col_max
      |> Enum.map(fn i ->
        cols
        |> Map.get(i, [0.0])
        |> Nx.tensor(type: :f32)
        |> Nx.mean()
      end)
      |> Nx.stack()
      |> Nx.to_binary()
    end,
    timeout: 100_000
  )
  |> Stream.map(fn {:ok, bytes} ->
    IO.binwrite(out, bytes)
  end)
  |> Enum.to_list()
  |> Enum.count()

File.close(out)

"matrix.bin created as :f32 with #{result} rows. Reshape on reading with col count #{col_count} and the first dimension calculated from actual size."