Flow pruebita
Section
Mix.install([
{:flow, "~> 1.2.4"},
{:ex_aws, "~> 2.5"},
{:ex_aws_s3, "~> 2.5"},
{:hackney, "~> 1.9"},
{:sweet_xml, "~> 0.7.5"},
{:jason, "~> 1.4.4"},
{:adbc, "~> 0.7.9"},
{:kino_db, "~> 0.3"},
{:explorer, "~> 0.10.1"},
{:kino_explorer, "~> 0.1.24"},
])
defmodule Utils do
@window_size_secs 300
def list_ndjson_files(bucket, prefix, auth) do
ExAws.S3.list_objects_v2(bucket, prefix: prefix)
|> ExAws.request!(auth)
|> get_in([:body, :contents])
|> Enum.map(& &1[:key])
|> Enum.filter(&String.ends_with?(&1, ".ndjson"))
end
def load_ndjson_file(bucket, key, auth) do
ExAws.S3.get_object(bucket, key)
|> ExAws.request!(auth)
|> Map.get(:body)
|> String.split("\n", trim: true)
|> Enum.map(&Jason.decode!/1)
end
def load_all_ndjson(bucket, prefix, auth) do
list_ndjson_files(bucket, prefix, auth)
|> Enum.flat_map(fn key -> Enum.to_list(load_ndjson_file(bucket, key, auth)) end)
end
def round_down_epoch_to_5_min(timestamp) when is_integer(timestamp) do
(timestamp |> div(@window_size_secs)) * @window_size_secs
end
# For a list of epoch timestamps:
def round_down_epoch_list(timestamps) when is_list(timestamps) do
Enum.map(timestamps, &Utils.round_down_epoch_to_5_min/1)
end
def round_down_epoch_ExplorerSeries(timestamps) do
timestamps
|> Explorer.Series.to_list()
|> Enum.map(&round_down_epoch_to_5_min/1)
end
def mixed_field_maps_to_df(list_of_maps) do
new_old_base = %{"new" => nil, "old" => nil}
log_params_base = %{"userid" => nil,
"edits" => nil,
"derived" => nil,
"img_sha1" => nil,
"img_timestamp" => nil,
"newuser" => nil,
"olduser" => nil,
"action" => nil,
"filter" => nil,
"actions" => nil,
"log" => nil,
"auto" => nil,
"curid" => nil,
"previd" => nil,
"target" => nil,
"noredir" => nil,
"count" => %{"files" => nil, "revisions" => nil},
"blockId" => nil,
"duration" => nil,
"flags" => nil,
"sitewide" => nil,
"details" => nil,
"description" => nil,
"cascade" => nil,
"suppressredirects" => nil,
"lista_o_algo" => nil,
"movepages" => nil,
"added" => nil,
"removed" => nil}
meta_base = %{"domain" => nil,
"dt" => nil,
"id" => nil,
"offset" => nil,
"partition" => nil,
"request_id" => nil,
"stream" => nil,
"topic" => nil,
"uri" => nil
}
all_keys =
list_of_maps
|> Enum.flat_map(&Map.keys/1)
|> Enum.uniq()
normalized_maps =
list_of_maps
|> Task.async_stream(fn map ->
all_keys
|> Task.async_stream(fn key ->
cond do
Enum.member?(["length", "revision"], key) ->
{key, Map.merge(new_old_base, Map.get(map, key, new_old_base))}
key == "log_params" ->
log_params_val = Map.get(map, key, log_params_base)
# log_params_val = Utils.annoying_log_params(log_params_val)
# {key, Map.merge(log_params_base, log_params_val)}
{key, inspect(log_params_val)}
key == "count" ->
log_params_val = Map.get(map, key, log_params_base)
log_params_val = if is_number(log_params_val) or is_bitstring(log_params_val) do
%{"files" => nil, "revisions" => nil}
else
log_params_val
end
{key, Map.merge(log_params_base, log_params_val)}
key == "meta" ->
{key, Map.merge(meta_base, Map.get(map, key, meta_base))}
true ->
{key, Map.get(map, key, nil)}
end
end)
|> Enum.map(&elem(&1, 1))
end)
|> Enum.map(&elem(&1, 1))
Explorer.DataFrame.new(normalized_maps)
end
def annoying_log_params(log_params_val) when is_list(log_params_val) do
list_stringified_elems = Enum.map(log_params_val, &to_string/1)
%{"lista_o_algo" => list_stringified_elems}
end
def annoying_log_params(log_params_val) do
log_params_val
end
def from_multiple_ndjson(explorer_s3_config, aws_config, bucket, prefix) do
ndjson_files =
ExAws.S3.list_objects_v2(bucket, prefix: prefix)
|> ExAws.stream!(aws_config)
|> Stream.map(& &1.key)
|> Stream.filter(&String.ends_with?(&1, ".ndjson"))
|> Enum.to_list()
dfs =
Enum.map(ndjson_files, fn file ->
s3_path =
"s3://#{bucket}/#{file}"
|> String.replace(" ", "%20")
Explorer.DataFrame.from_ndjson!(s3_path, config: explorer_s3_config)
end)
IO.inspect dfs
Explorer.DataFrame.concat_rows(dfs)
end
end
access_key_id = "GK21f88119e76b969d6a1ae052"
secret_access_key = "abf070f8d57bc050c4c2bcc3fba6cc70028441ade9c5fc6d87d9a7b5f9949eff"
auth_config = [
access_key_id: access_key_id,
secret_access_key: secret_access_key,
region: "garage",
scheme: "http",
host: "garage",
port: "3900"
]
s3_config = %FSS.S3.Config{
access_key_id: access_key_id,
endpoint: "http://garage:3900",
region: "garage",
secret_access_key: secret_access_key,
}
data =
Utils.load_all_ndjson("testbucket", "test_folder_2025", auth_config)
test_sample_data_maps =
data
|> Utils.mixed_field_maps_to_df
|> Explorer.DataFrame.sample(100)
|> Explorer.DataFrame.pull("data")
|> Explorer.Series.to_list()
|> Enum.map(&Jason.decode!/1)
test_sample_data_df =
test_sample_data_maps
|> Utils.mixed_field_maps_to_df
# require Explorer.DataFrame
# require Explorer.Series
# timestamps =
# test_sample_data_df
# |> Explorer.DataFrame.sort_by(timestamp)
# |> (&Explorer.Series.to_list(&1["timestamp"])).()
# |> Enum.map(fn
# nil -> 0
# x -> x
# end)
# sorted = Enum.sort(timestamps)
# # Group
# grouped =
# Enum.reduce(sorted, [[]], fn x, acc ->
# pass = List.last(acc)
# ref = List.first(pass, x)
# new_acc = if x > ref + 300 do
# Enum.reverse([ [x] | Enum.reverse(acc) ])
# else
# [tail | head] = Enum.reverse(acc)
# new_tail = Enum.reverse([ x | Enum.reverse(tail) ])
# Enum.reverse([ new_tail | head ])
# end
# new_acc
# end)
# # Interval
# intervals =
# Enum.map(grouped, fn
# [] -> []
# [h | _t] = sublist -> List.duplicate(h, length(sublist))
# end)
# # Flatten
# result = List.flatten(intervals)
# require Explorer.DataFrame
# require Explorer.Series
# sorted_df = Explorer.DataFrame.sort_with(test_sample_data_df, &[asc: &1["timestamp"]], nils: :first)
# sorted_df[["timestamp", "wiki"]]
require Explorer.DataFrame
require Explorer.Series
grouped =
test_sample_data_df
|> (&Explorer.Series.to_list(&1["timestamp"])).()
|> Enum.map(fn
nil -> 0
x -> x
end)
|> Utils.round_down_epoch_list()
|> Explorer.Series.from_list()
|> (&Explorer.DataFrame.put(test_sample_data_df, "interval", &1)).()
|> Explorer.DataFrame.group_by([:interval, :wiki])
|> Explorer.DataFrame.group_by([:wiki])
|> Explorer.DataFrame.summarise(total: count(interval))
|> Explorer.DataFrame.rename(total: "n_elems")
|> Explorer.DataFrame.sort_with(&[asc: &1["interval"]], nils: :first)
window =
Flow.Window.fixed(5, :minute, fn %{"timestamp" => timestamp} -> timestamp * 1000 end)
|> Flow.Window.allowed_lateness(1, :minute)
flow_result =
test_sample_data_maps
|> Enum.sort_by(&Map.get(&1, "timestamp"))
|> Flow.from_enumerable()
|> Flow.filter(fn map ->
Map.has_key?(map, "wiki")
end)
|> Flow.partition(window: window, key: {:key, "wiki"})
|> Flow.group_by(& &1["wiki"])
|> Flow.on_trigger(fn acc, _partition_info, {_type, window, trigger} ->
if trigger == :done do
results =
Enum.map(acc, fn {key, events} ->
timestamps = get_in(events, [Access.all(), "timestamp"])
%{wiki: key,
n_elems: Enum.count(events),
window: trunc(window / 1000),
timestamps: Enum.sort(timestamps)
}
end)
{results, acc}
else
{[], acc}
end
end)
|> Flow.stream(link: false)
|> Enum.to_list
|> Enum.sort_by(&Map.get(&1, :n_elems), :desc)
|> Explorer.DataFrame.new
|> Explorer.DataFrame.rename(window: "interval")
|> Explorer.DataFrame.select(["interval", "wiki", "n_elems"])
|> Explorer.DataFrame.sort_with(&[asc: &1["interval"]], nils: :first)
ndjson =
test_sample_data_df
|> Explorer.DataFrame.dump_ndjson!()
ExAws.S3.put_object("testbucket", "test_data/test_src_data.ndjson", ndjson, content_type: "application/x-ndjson")
|> ExAws.request(auth_config)
ndjson =
grouped
|> Explorer.DataFrame.dump_ndjson!()
ExAws.S3.put_object("testbucket", "test_data/test_ref.ndjson", ndjson, content_type: "application/x-ndjson")
|> ExAws.request(auth_config)
ndjson =
flow_result
|> Explorer.DataFrame.dump_ndjson!()
ExAws.S3.put_object("testbucket", "test_data/test_result.ndjson", ndjson, content_type: "application/x-ndjson")
|> ExAws.request(auth_config)