Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Wikipedia data exploration

wikipedia_data_exploration.livemd

Wikipedia data exploration

Mix.install([
  {:adbc, "~> 0.7.9"},
  {:kino_db, "~> 0.3"},
  {:explorer, "~> 0.10.1"},
  {:kino_explorer, "~> 0.1.24"},
  {:jason, "~> 1.4.4"},
  {:ex_aws, "~> 2.5"},
  {:ex_aws_s3, "~> 2.5"},
  {:hackney, "~> 1.9"},
  {:sweet_xml, "~> 0.7.5"}
])

Section

defmodule Utils do
  @window_size_secs 300
  
  def round_down_epoch_to_5_min(timestamp) when is_integer(timestamp) do
    (timestamp |> div(@window_size_secs)) * @window_size_secs
  end
  
  # For a list of epoch timestamps:
  def round_down_epoch_list(timestamps) when is_list(timestamps) do
    Enum.map(timestamps, &Utils.round_down_epoch_to_5_min/1)
  end

  def round_down_epoch_ExplorerSeries(timestamps) do
    timestamps
    |> Explorer.Series.to_list()
    |> Enum.map(&round_down_epoch_to_5_min/1)
  end

  def mixed_field_maps_to_df(list_of_maps) do
    new_old_base = %{"new" => nil, "old" => nil}

    log_params_base = %{"userid" => nil,
                    "edits" => nil,
                    "derived" => nil,
                    "img_sha1" => nil,
                    "img_timestamp" => nil,
                    "newuser" => nil,
                    "olduser" => nil,
                    "action" => nil,
                    "filter" => nil,
                    "actions" => nil,
                    "log" => nil,
                    "auto" => nil,
                    "curid" => nil,
                    "previd" => nil,
                    "target" => nil,
                    "noredir" => nil,
                    "count" => %{"files" => nil, "revisions" => nil},
                    "blockId" => nil,
                    "duration" => nil,
                    "flags" => nil,
                    "sitewide" => nil,
                    "details" => nil,
                    "description" => nil,
                    "cascade" => nil,
                    "suppressredirects" => nil,
                    "lista_o_algo" => nil,
                    "movepages" => nil,
                    "added" => nil,
                    "removed" => nil}

    meta_base = %{"domain" => nil,
                  "dt" => nil,
                  "id" => nil,
                  "offset" => nil,
                  "partition" => nil,
                  "request_id" => nil,
                  "stream" => nil,
                  "topic" => nil,
                  "uri" => nil
                }
    
    all_keys =
      list_of_maps
      |> Enum.flat_map(&Map.keys/1)
      |> Enum.uniq()
    
    normalized_maps =
      list_of_maps
      |> Task.async_stream(fn map ->
        all_keys
        |> Task.async_stream(fn key ->
            cond do
              Enum.member?(["length", "revision"], key) ->
                {key, Map.merge(new_old_base, Map.get(map, key, new_old_base))}
              key == "log_params" ->
                log_params_val = Map.get(map, key, log_params_base)
                # log_params_val = Utils.annoying_log_params(log_params_val)
                # {key, Map.merge(log_params_base, log_params_val)}
                {key, inspect(log_params_val)}
              key == "count" ->
                log_params_val = Map.get(map, key, log_params_base)
                log_params_val = if is_number(log_params_val) or is_bitstring(log_params_val) do
                  %{"files" => nil, "revisions" => nil}
                else
                  log_params_val
                end
                {key, Map.merge(log_params_base, log_params_val)}
              key == "meta" ->
                {key, Map.merge(meta_base, Map.get(map, key, meta_base))}
              true ->
                {key, Map.get(map, key, nil)}
            end
          end)
        |> Enum.map(&elem(&1, 1))
        end)
      |> Enum.map(&elem(&1, 1))
  
    Explorer.DataFrame.new(normalized_maps)
  end

  def annoying_log_params(log_params_val) when is_list(log_params_val) do
    list_stringified_elems = Enum.map(log_params_val, &to_string/1)
    %{"lista_o_algo" => list_stringified_elems}
  end

  def annoying_log_params(log_params_val) do
    log_params_val
  end

  def from_multiple_ndjson(explorer_s3_config, aws_config, bucket, prefix) do

    ndjson_files =
      ExAws.S3.list_objects_v2(bucket, prefix: prefix)
      |> ExAws.stream!(aws_config)
      |> Stream.map(& &1.key)
      |> Stream.filter(&String.ends_with?(&1, ".ndjson"))
      |> Enum.to_list()

    dfs = 
      Enum.map(ndjson_files, fn file ->
        s3_path = 
            "s3://#{bucket}/#{file}"
            |> String.replace(" ", "%20")
        Explorer.DataFrame.from_ndjson!(s3_path, config: explorer_s3_config)
      end)
    IO.inspect dfs
    Explorer.DataFrame.concat_rows(dfs)
  end
end
s3_config = %FSS.S3.Config{
  access_key_id: "GK21f88119e76b969d6a1ae052",
  endpoint: "http://garage:3900",
  region: "garage",
  secret_access_key: "abf070f8d57bc050c4c2bcc3fba6cc70028441ade9c5fc6d87d9a7b5f9949eff",
}
auth_config = [
  access_key_id: "GK21f88119e76b969d6a1ae052",
  secret_access_key: "abf070f8d57bc050c4c2bcc3fba6cc70028441ade9c5fc6d87d9a7b5f9949eff",
  region: "garage",
  scheme: "http",
  host: "garage",
  port: "3900"
]
source_df = Utils.from_multiple_ndjson(s3_config, auth_config, "testbucket", "test_folder_2025-07-03/test_file_2025-07-0")
# https://github.com/pola-rs/polars/issues/10047

data_df = 
  source_df
|> Explorer.DataFrame.pull("data")
|> Explorer.Series.to_list()
|> Enum.map(&Jason.decode!/1)
|> Utils.mixed_field_maps_to_df
data_df
|> Explorer.DataFrame.frequencies([:wiki])
data_df
|> Explorer.DataFrame.frequencies([:bot])
data_df
|> Explorer.DataFrame.frequencies([:log_type])
data_df
|> Explorer.DataFrame.frequencies([:type])
data_df
|> Explorer.DataFrame.group_by([:wiki])
|> Explorer.DataFrame.frequencies([:bot])
require Explorer.DataFrame

data_df
|> Explorer.DataFrame.mutate_with(&[interval: Utils.round_down_epoch_ExplorerSeries(&1["timestamp"])])
|> Explorer.DataFrame.group_by([:wiki, :interval])
|> Explorer.DataFrame.mutate(count: count(b))
require Explorer.DataFrame
require Explorer.Series

grouped =
  Explorer.Series.to_list(data_df["timestamp"])
  |> Enum.map(fn
    nil -> 0
    x -> x
  end)
  |> Utils.round_down_epoch_list()
  |> Explorer.Series.from_list()
  |> (&Explorer.DataFrame.put(data_df, "interval", &1)).()
  |> Explorer.DataFrame.group_by([:interval, :wiki])
  |> Explorer.DataFrame.summarise(total: count(interval))