Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

one billion rows challenge

multikata/onebr/onebr.livemd

one billion rows challenge

Mix.install([
  {:flow, "~> 1.2"}
])

Section

print_result = fn {t, r} ->
  IO.puts(t/1_000_000)
  IO.puts(inspect(r))
  # r 
  # |> Enum.take(100)
  # |> Enum.each(& IO.puts(inspect(&1)))
end

Lee un fichero completo sin hacer casi nada

defmodule Onebr1 do
  def run() do
    "./measurements.txt"
    |> File.stream!()
    |> Stream.take(10_000_000)
    |> Enum.reduce(0, fn _, acc -> acc + 1 end)
  end
end

#:timer.tc(Onebr1, :run, []) |> print_result.()

Lee un fichero completo “parseando” cada línea

defmodule Onebr2 do
  def run() do
    "./measurements.txt"
    |> File.stream!()
    |> Stream.take(1_000_000)
    |> Stream.map(fn l -> l |> String.trim() |> String.split(";") end)
    |> Stream.map(fn [station, value] -> {station, value |> String.to_float()} end)
    |> Enum.reduce(0, fn _, acc -> acc + 1 end)
  end
end

:timer.tc(Onebr2, :run, []) |> print_result.()

Lee fichero completo procesando cada línea en paralelo con Flow

defmodule Onebr3 do
  def run() do
    "./measurements.txt"
    |> File.stream!()
    |> Stream.take(10_000_000)
    |> Flow.from_enumerable(stages: 4, partition: false)
    |> Flow.map(fn l -> l |> String.trim() |> String.split(";") end)
    |> Flow.map(fn [station, value] -> {station, value |> String.to_float()} end)
    |> Enum.reduce(0, fn _, acc -> acc + 1 end)
  end
end

:timer.tc(Onebr3, :run, []) |> print_result.()

cuenta por estación con flow

File.stream!("./measurements.txt")
|> Stream.take(100_000)
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.sort()

Solución completa con Flow

defmodule Onebr4 do
  def run() do
    "./measurements.txt"
    |> File.stream!()
    |> Stream.take(10_000_000)
    |> Flow.from_enumerable()
    |> Flow.map(fn l -> l |> String.trim() |> String.split(";") end)
    |> Flow.map(fn [station, value] -> {station, value |> String.to_float()} end)
    |> Flow.partition()
    |> Flow.reduce(fn -> %{} end, fn {s, v}, acc -> 
        acc |> Map.update(s, {v, v, v, v}, 
          fn {min, max, sum, count} -> 
            {min(min, v), max(max,v), sum + v, count + 1 } 
          end)
      end)
    |> Enum.reduce(%{}, fn {s, {min, max, sum, count}}, acc -> 
        acc |> Map.update(s, {min, max, sum, count}, 
          fn {prev_min, prev_max, prev_sum, prev_count} -> 
            {min(min, prev_min), max(max, prev_max), sum + prev_sum, count + prev_count } 
          end)
      end)
    |> Enum.sort(fn {s1, _} , {s2, _} -> s1 < s2 end)
    |> Enum.map(fn {s, {min, max, sum, count}} -> {s, min, max, sum / count} end)
  end
end

:timer.tc(Onebr4, :run, []) |> print_result.()