Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Flow pruebita

flow_experimenting.livemd

Flow pruebita

Section

Mix.install([
  {:flow, "~> 1.2.4"},
  {:ex_aws, "~> 2.5"},
  {:ex_aws_s3, "~> 2.5"},
  {:hackney, "~> 1.9"},
  {:sweet_xml, "~> 0.7.5"},
  {:jason, "~> 1.4.4"},
  {:adbc, "~> 0.7.9"},
  {:kino_db, "~> 0.3"},
  {:explorer, "~> 0.10.1"},
  {:kino_explorer, "~> 0.1.24"},
])
defmodule Utils do
  @window_size_secs 300

  def list_ndjson_files(bucket, prefix, auth) do
    ExAws.S3.list_objects_v2(bucket, prefix: prefix)
    |> ExAws.request!(auth)
    |> get_in([:body, :contents])
    |> Enum.map(& &1[:key])
    |> Enum.filter(&String.ends_with?(&1, ".ndjson"))
  end

  def load_ndjson_file(bucket, key, auth) do
    ExAws.S3.get_object(bucket, key)
    |> ExAws.request!(auth)
    |> Map.get(:body)
    |> String.split("\n", trim: true)
    |> Enum.map(&Jason.decode!/1)
  end

  def load_all_ndjson(bucket, prefix, auth) do
    list_ndjson_files(bucket, prefix, auth)
    |> Enum.flat_map(fn key -> Enum.to_list(load_ndjson_file(bucket, key, auth)) end)
  end
  
  def round_down_epoch_to_5_min(timestamp) when is_integer(timestamp) do
    (timestamp |> div(@window_size_secs)) * @window_size_secs
  end
  
  # For a list of epoch timestamps:
  def round_down_epoch_list(timestamps) when is_list(timestamps) do
    Enum.map(timestamps, &Utils.round_down_epoch_to_5_min/1)
  end

  def round_down_epoch_ExplorerSeries(timestamps) do
    timestamps
    |> Explorer.Series.to_list()
    |> Enum.map(&round_down_epoch_to_5_min/1)
  end

  def mixed_field_maps_to_df(list_of_maps) do
    new_old_base = %{"new" => nil, "old" => nil}

    log_params_base = %{"userid" => nil,
                    "edits" => nil,
                    "derived" => nil,
                    "img_sha1" => nil,
                    "img_timestamp" => nil,
                    "newuser" => nil,
                    "olduser" => nil,
                    "action" => nil,
                    "filter" => nil,
                    "actions" => nil,
                    "log" => nil,
                    "auto" => nil,
                    "curid" => nil,
                    "previd" => nil,
                    "target" => nil,
                    "noredir" => nil,
                    "count" => %{"files" => nil, "revisions" => nil},
                    "blockId" => nil,
                    "duration" => nil,
                    "flags" => nil,
                    "sitewide" => nil,
                    "details" => nil,
                    "description" => nil,
                    "cascade" => nil,
                    "suppressredirects" => nil,
                    "lista_o_algo" => nil,
                    "movepages" => nil,
                    "added" => nil,
                    "removed" => nil}

    meta_base = %{"domain" => nil,
                  "dt" => nil,
                  "id" => nil,
                  "offset" => nil,
                  "partition" => nil,
                  "request_id" => nil,
                  "stream" => nil,
                  "topic" => nil,
                  "uri" => nil
                }
    
    all_keys =
      list_of_maps
      |> Enum.flat_map(&Map.keys/1)
      |> Enum.uniq()
    
    normalized_maps =
      list_of_maps
      |> Task.async_stream(fn map ->
        all_keys
        |> Task.async_stream(fn key ->
            cond do
              Enum.member?(["length", "revision"], key) ->
                {key, Map.merge(new_old_base, Map.get(map, key, new_old_base))}
              key == "log_params" ->
                log_params_val = Map.get(map, key, log_params_base)
                # log_params_val = Utils.annoying_log_params(log_params_val)
                # {key, Map.merge(log_params_base, log_params_val)}
                {key, inspect(log_params_val)}
              key == "count" ->
                log_params_val = Map.get(map, key, log_params_base)
                log_params_val = if is_number(log_params_val) or is_bitstring(log_params_val) do
                  %{"files" => nil, "revisions" => nil}
                else
                  log_params_val
                end
                {key, Map.merge(log_params_base, log_params_val)}
              key == "meta" ->
                {key, Map.merge(meta_base, Map.get(map, key, meta_base))}
              true ->
                {key, Map.get(map, key, nil)}
            end
          end)
        |> Enum.map(&elem(&1, 1))
        end)
      |> Enum.map(&elem(&1, 1))
  
    Explorer.DataFrame.new(normalized_maps)
  end

  def annoying_log_params(log_params_val) when is_list(log_params_val) do
    list_stringified_elems = Enum.map(log_params_val, &to_string/1)
    %{"lista_o_algo" => list_stringified_elems}
  end

  def annoying_log_params(log_params_val) do
    log_params_val
  end

  def from_multiple_ndjson(explorer_s3_config, aws_config, bucket, prefix) do

    ndjson_files =
      ExAws.S3.list_objects_v2(bucket, prefix: prefix)
      |> ExAws.stream!(aws_config)
      |> Stream.map(& &1.key)
      |> Stream.filter(&String.ends_with?(&1, ".ndjson"))
      |> Enum.to_list()

    dfs = 
      Enum.map(ndjson_files, fn file ->
        s3_path = 
            "s3://#{bucket}/#{file}"
            |> String.replace(" ", "%20")
        Explorer.DataFrame.from_ndjson!(s3_path, config: explorer_s3_config)
      end)
    IO.inspect dfs
    Explorer.DataFrame.concat_rows(dfs)
  end
end
access_key_id = "GK21f88119e76b969d6a1ae052"
secret_access_key = "abf070f8d57bc050c4c2bcc3fba6cc70028441ade9c5fc6d87d9a7b5f9949eff"
auth_config = [
  access_key_id: access_key_id,
  secret_access_key: secret_access_key,
  region: "garage",
  scheme: "http",
  host: "garage",
  port: "3900"
]
s3_config = %FSS.S3.Config{
  access_key_id: access_key_id,
  endpoint: "http://garage:3900",
  region: "garage",
  secret_access_key: secret_access_key,
}
data = 
 Utils.load_all_ndjson("testbucket", "test_folder_2025", auth_config)
test_sample_data_maps = 
  data
  |> Utils.mixed_field_maps_to_df
  |> Explorer.DataFrame.sample(100)
  |> Explorer.DataFrame.pull("data")
  |> Explorer.Series.to_list()
  |> Enum.map(&Jason.decode!/1)
test_sample_data_df = 
  test_sample_data_maps
  |> Utils.mixed_field_maps_to_df

# require Explorer.DataFrame
# require Explorer.Series

# timestamps = 
#   test_sample_data_df
#   |> Explorer.DataFrame.sort_by(timestamp)
#   |> (&Explorer.Series.to_list(&1["timestamp"])).()
#   |> Enum.map(fn
#       nil -> 0
#       x -> x
#     end)

# sorted = Enum.sort(timestamps)

# # Group
# grouped =
#   Enum.reduce(sorted, [[]], fn x, acc  ->
#     pass = List.last(acc)
#     ref = List.first(pass, x)
#     new_acc = if x > ref + 300 do
#       Enum.reverse([ [x] | Enum.reverse(acc) ])
#     else
#       [tail | head] = Enum.reverse(acc)
#       new_tail = Enum.reverse([ x | Enum.reverse(tail) ])
#       Enum.reverse([ new_tail | head ])
#     end
#     new_acc
#   end)

# # Interval
# intervals =
#   Enum.map(grouped, fn
#     [] -> []
#     [h | _t] = sublist -> List.duplicate(h, length(sublist))
#   end)

# # Flatten
# result = List.flatten(intervals)
# require Explorer.DataFrame
# require Explorer.Series

# sorted_df = Explorer.DataFrame.sort_with(test_sample_data_df, &[asc: &1["timestamp"]], nils: :first)
# sorted_df[["timestamp", "wiki"]]
require Explorer.DataFrame
require Explorer.Series

grouped =
  test_sample_data_df
  |> (&Explorer.Series.to_list(&1["timestamp"])).()
  |> Enum.map(fn
      nil -> 0
      x -> x
    end)
  |> Utils.round_down_epoch_list()
  |> Explorer.Series.from_list()
  |> (&Explorer.DataFrame.put(test_sample_data_df, "interval", &1)).()
  |> Explorer.DataFrame.group_by([:interval, :wiki])
  |> Explorer.DataFrame.group_by([:wiki])
  |> Explorer.DataFrame.summarise(total: count(interval))
  |> Explorer.DataFrame.rename(total: "n_elems")
  |> Explorer.DataFrame.sort_with(&[asc: &1["interval"]], nils: :first)
window = 
  Flow.Window.fixed(5, :minute, fn %{"timestamp" => timestamp} -> timestamp * 1000 end)
  |> Flow.Window.allowed_lateness(1, :minute)

flow_result =
  test_sample_data_maps
  |> Enum.sort_by(&Map.get(&1, "timestamp"))
  |> Flow.from_enumerable()
  |> Flow.filter(fn map ->
        Map.has_key?(map, "wiki")
    end)
  |> Flow.partition(window: window, key: {:key, "wiki"})
  |> Flow.group_by(& &1["wiki"])
  |> Flow.on_trigger(fn acc, _partition_info, {_type, window, trigger} ->
      if trigger == :done do
          results = 
            Enum.map(acc, fn {key, events} ->
                timestamps = get_in(events, [Access.all(), "timestamp"])
                %{wiki: key, 
                  n_elems: Enum.count(events),
                  window: trunc(window / 1000),
                  timestamps: Enum.sort(timestamps)
                }
              end)
          {results, acc}
      else
          {[], acc}
      end
    end)
  |> Flow.stream(link: false)
  |> Enum.to_list
  |> Enum.sort_by(&Map.get(&1, :n_elems), :desc)
  |> Explorer.DataFrame.new
  |> Explorer.DataFrame.rename(window: "interval")
  |> Explorer.DataFrame.select(["interval", "wiki", "n_elems"])
  |> Explorer.DataFrame.sort_with(&[asc: &1["interval"]], nils: :first)
ndjson = 
  test_sample_data_df
  |> Explorer.DataFrame.dump_ndjson!()

ExAws.S3.put_object("testbucket", "test_data/test_src_data.ndjson", ndjson, content_type: "application/x-ndjson")
|> ExAws.request(auth_config)
ndjson = 
  grouped
  |> Explorer.DataFrame.dump_ndjson!()

ExAws.S3.put_object("testbucket", "test_data/test_ref.ndjson", ndjson, content_type: "application/x-ndjson")
|> ExAws.request(auth_config)
ndjson = 
  flow_result
  |> Explorer.DataFrame.dump_ndjson!()

ExAws.S3.put_object("testbucket", "test_data/test_result.ndjson", ndjson, content_type: "application/x-ndjson")
|> ExAws.request(auth_config)