Wikipedia data exploration
Mix.install([
{:adbc, "~> 0.7.9"},
{:kino_db, "~> 0.3"},
{:explorer, "~> 0.10.1"},
{:kino_explorer, "~> 0.1.24"},
{:jason, "~> 1.4.4"},
{:ex_aws, "~> 2.5"},
{:ex_aws_s3, "~> 2.5"},
{:hackney, "~> 1.9"},
{:sweet_xml, "~> 0.7.5"}
])
Section
defmodule Utils do
@window_size_secs 300
def round_down_epoch_to_5_min(timestamp) when is_integer(timestamp) do
(timestamp |> div(@window_size_secs)) * @window_size_secs
end
# For a list of epoch timestamps:
def round_down_epoch_list(timestamps) when is_list(timestamps) do
Enum.map(timestamps, &Utils.round_down_epoch_to_5_min/1)
end
def round_down_epoch_ExplorerSeries(timestamps) do
timestamps
|> Explorer.Series.to_list()
|> Enum.map(&round_down_epoch_to_5_min/1)
end
def mixed_field_maps_to_df(list_of_maps) do
new_old_base = %{"new" => nil, "old" => nil}
log_params_base = %{"userid" => nil,
"edits" => nil,
"derived" => nil,
"img_sha1" => nil,
"img_timestamp" => nil,
"newuser" => nil,
"olduser" => nil,
"action" => nil,
"filter" => nil,
"actions" => nil,
"log" => nil,
"auto" => nil,
"curid" => nil,
"previd" => nil,
"target" => nil,
"noredir" => nil,
"count" => %{"files" => nil, "revisions" => nil},
"blockId" => nil,
"duration" => nil,
"flags" => nil,
"sitewide" => nil,
"details" => nil,
"description" => nil,
"cascade" => nil,
"suppressredirects" => nil,
"lista_o_algo" => nil,
"movepages" => nil,
"added" => nil,
"removed" => nil}
meta_base = %{"domain" => nil,
"dt" => nil,
"id" => nil,
"offset" => nil,
"partition" => nil,
"request_id" => nil,
"stream" => nil,
"topic" => nil,
"uri" => nil
}
all_keys =
list_of_maps
|> Enum.flat_map(&Map.keys/1)
|> Enum.uniq()
normalized_maps =
list_of_maps
|> Task.async_stream(fn map ->
all_keys
|> Task.async_stream(fn key ->
cond do
Enum.member?(["length", "revision"], key) ->
{key, Map.merge(new_old_base, Map.get(map, key, new_old_base))}
key == "log_params" ->
log_params_val = Map.get(map, key, log_params_base)
# log_params_val = Utils.annoying_log_params(log_params_val)
# {key, Map.merge(log_params_base, log_params_val)}
{key, inspect(log_params_val)}
key == "count" ->
log_params_val = Map.get(map, key, log_params_base)
log_params_val = if is_number(log_params_val) or is_bitstring(log_params_val) do
%{"files" => nil, "revisions" => nil}
else
log_params_val
end
{key, Map.merge(log_params_base, log_params_val)}
key == "meta" ->
{key, Map.merge(meta_base, Map.get(map, key, meta_base))}
true ->
{key, Map.get(map, key, nil)}
end
end)
|> Enum.map(&elem(&1, 1))
end)
|> Enum.map(&elem(&1, 1))
Explorer.DataFrame.new(normalized_maps)
end
def annoying_log_params(log_params_val) when is_list(log_params_val) do
list_stringified_elems = Enum.map(log_params_val, &to_string/1)
%{"lista_o_algo" => list_stringified_elems}
end
def annoying_log_params(log_params_val) do
log_params_val
end
def from_multiple_ndjson(explorer_s3_config, aws_config, bucket, prefix) do
ndjson_files =
ExAws.S3.list_objects_v2(bucket, prefix: prefix)
|> ExAws.stream!(aws_config)
|> Stream.map(& &1.key)
|> Stream.filter(&String.ends_with?(&1, ".ndjson"))
|> Enum.to_list()
dfs =
Enum.map(ndjson_files, fn file ->
s3_path =
"s3://#{bucket}/#{file}"
|> String.replace(" ", "%20")
Explorer.DataFrame.from_ndjson!(s3_path, config: explorer_s3_config)
end)
IO.inspect dfs
Explorer.DataFrame.concat_rows(dfs)
end
end
s3_config = %FSS.S3.Config{
access_key_id: "GK21f88119e76b969d6a1ae052",
endpoint: "http://garage:3900",
region: "garage",
secret_access_key: "abf070f8d57bc050c4c2bcc3fba6cc70028441ade9c5fc6d87d9a7b5f9949eff",
}
auth_config = [
access_key_id: "GK21f88119e76b969d6a1ae052",
secret_access_key: "abf070f8d57bc050c4c2bcc3fba6cc70028441ade9c5fc6d87d9a7b5f9949eff",
region: "garage",
scheme: "http",
host: "garage",
port: "3900"
]
source_df = Utils.from_multiple_ndjson(s3_config, auth_config, "testbucket", "test_folder_2025-07-03/test_file_2025-07-0")
# https://github.com/pola-rs/polars/issues/10047
data_df =
source_df
|> Explorer.DataFrame.pull("data")
|> Explorer.Series.to_list()
|> Enum.map(&Jason.decode!/1)
|> Utils.mixed_field_maps_to_df
data_df
|> Explorer.DataFrame.frequencies([:wiki])
data_df
|> Explorer.DataFrame.frequencies([:bot])
data_df
|> Explorer.DataFrame.frequencies([:log_type])
data_df
|> Explorer.DataFrame.frequencies([:type])
data_df
|> Explorer.DataFrame.group_by([:wiki])
|> Explorer.DataFrame.frequencies([:bot])
require Explorer.DataFrame
data_df
|> Explorer.DataFrame.mutate_with(&[interval: Utils.round_down_epoch_ExplorerSeries(&1["timestamp"])])
|> Explorer.DataFrame.group_by([:wiki, :interval])
|> Explorer.DataFrame.mutate(count: count(b))
require Explorer.DataFrame
require Explorer.Series
grouped =
Explorer.Series.to_list(data_df["timestamp"])
|> Enum.map(fn
nil -> 0
x -> x
end)
|> Utils.round_down_epoch_list()
|> Explorer.Series.from_list()
|> (&Explorer.DataFrame.put(data_df, "interval", &1)).()
|> Explorer.DataFrame.group_by([:interval, :wiki])
|> Explorer.DataFrame.summarise(total: count(interval))