MET-1246 Extract errors from data dump
Mix.install([
{:nx, "~> 0.5"},
{:exla, "~> 0.5.1"},
{:explorer, "~> 0.5"},
{:scholar, git: "https://github.com/elixir-nx/scholar"},
{:jason, "~> 1.4"}
])
Nx.global_default_backend(EXLA.Backend)
Globals
base_dir = "/home/eng/dev/backend/livebooks/data/bulk/s3/production_eventstore/public.events/1"
out = "/home/eng/dev/backend/livebooks/data/bulk/error_added.bin"
Creation
out_file = File.open!(out, [:write, :binary, {:delayed_write, 1024 * 1024 * 100, 10 * 100}])
wanted_event_type = "Elixir.Domain.Monitor.Events.ErrorAdded"
processed =
base_dir
|> File.ls!()
|> Enum.filter(&String.ends_with?(&1, ".parquet"))
|> Task.async_stream(
fn file ->
df = Explorer.DataFrame.from_parquet!(Path.join(base_dir, file))
df
|> Explorer.DataFrame.filter_with(fn d ->
Explorer.Series.equal(d["event_type"], wanted_event_type)
end)
|> Explorer.DataFrame.select("data")
|> Explorer.DataFrame.to_series()
|> Map.get("data")
|> Explorer.Series.to_list()
|> Enum.map(fn s ->
telem = Jason.decode!(Jason.decode!(s))
{:ok, dt, _} = DateTime.from_iso8601(telem["time"] <> "Z")
ts = DateTime.to_unix(dt, :second)
record = {ts, telem["id"], telem["check_logical_name"], telem["instance_name"]}
bytes = :erlang.term_to_binary(record)
size = :erlang.size(bytes)
IO.binwrite(out_file, <>)
end)
end,
timeout: 1_000_000,
ordered: false
)
|> Enum.count()
File.close(out_file)
"Found #{processed} errors (should be > 370000)"
Verification
# verification
defmodule Binfile do
def stream_from(filename) do
Stream.resource(
fn -> File.open!(filename, [:read, :binary]) end,
fn handle ->
case record(handle) do
nil -> {:halt, handle}
record -> {[record], handle}
end
end,
fn handle -> File.close(handle) end
)
end
def record(handle) do
case :file.read(handle, 4) do
:eof ->
nil
{:ok, <>} ->
{:ok, binary} = :file.read(handle, size)
:erlang.binary_to_term(binary)
end
end
end
out
|> Binfile.stream_from()
|> Enum.count()