Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

MET-1246 Extract errors from data dump

livebooks/MET-1246-extract-errors.livemd

MET-1246 Extract errors from data dump

Mix.install([
  {:nx, "~> 0.5"},
  {:exla, "~> 0.5.1"},
  {:explorer, "~> 0.5"},
  {:scholar, git: "https://github.com/elixir-nx/scholar"},
  {:jason, "~> 1.4"}
])

Nx.global_default_backend(EXLA.Backend)

Globals

base_dir = "/home/eng/dev/backend/livebooks/data/bulk/s3/production_eventstore/public.events/1"
out = "/home/eng/dev/backend/livebooks/data/bulk/error_added.bin"

Creation

out_file = File.open!(out, [:write, :binary, {:delayed_write, 1024 * 1024 * 100, 10 * 100}])

wanted_event_type = "Elixir.Domain.Monitor.Events.ErrorAdded"

processed =
  base_dir
  |> File.ls!()
  |> Enum.filter(&String.ends_with?(&1, ".parquet"))
  |> Task.async_stream(
    fn file ->
      df = Explorer.DataFrame.from_parquet!(Path.join(base_dir, file))

      df
      |> Explorer.DataFrame.filter_with(fn d ->
        Explorer.Series.equal(d["event_type"], wanted_event_type)
      end)
      |> Explorer.DataFrame.select("data")
      |> Explorer.DataFrame.to_series()
      |> Map.get("data")
      |> Explorer.Series.to_list()
      |> Enum.map(fn s ->
        telem = Jason.decode!(Jason.decode!(s))
        {:ok, dt, _} = DateTime.from_iso8601(telem["time"] <> "Z")
        ts = DateTime.to_unix(dt, :second)

        record = {ts, telem["id"], telem["check_logical_name"], telem["instance_name"]}

        bytes = :erlang.term_to_binary(record)
        size = :erlang.size(bytes)
        IO.binwrite(out_file, <>)
      end)
    end,
    timeout: 1_000_000,
    ordered: false
  )
  |> Enum.count()

File.close(out_file)

"Found #{processed} errors (should be > 370000)"

Verification

# verification
defmodule Binfile do
  def stream_from(filename) do
    Stream.resource(
      fn -> File.open!(filename, [:read, :binary]) end,
      fn handle ->
        case record(handle) do
          nil -> {:halt, handle}
          record -> {[record], handle}
        end
      end,
      fn handle -> File.close(handle) end
    )
  end

  def record(handle) do
    case :file.read(handle, 4) do
      :eof ->
        nil

      {:ok, <>} ->
        {:ok, binary} = :file.read(handle, size)
        :erlang.binary_to_term(binary)
    end
  end
end

out
|> Binfile.stream_from()
|> Enum.count()