one billion rows challenge
Mix.install([
{:flow, "~> 1.2"}
])
Section
print_result = fn {t, r} ->
IO.puts(t/1_000_000)
IO.puts(inspect(r))
# r
# |> Enum.take(100)
# |> Enum.each(& IO.puts(inspect(&1)))
end
Lee un fichero completo sin hacer casi nada
defmodule Onebr1 do
def run() do
"./measurements.txt"
|> File.stream!()
|> Stream.take(10_000_000)
|> Enum.reduce(0, fn _, acc -> acc + 1 end)
end
end
#:timer.tc(Onebr1, :run, []) |> print_result.()
Lee un fichero completo “parseando” cada línea
defmodule Onebr2 do
def run() do
"./measurements.txt"
|> File.stream!()
|> Stream.take(1_000_000)
|> Stream.map(fn l -> l |> String.trim() |> String.split(";") end)
|> Stream.map(fn [station, value] -> {station, value |> String.to_float()} end)
|> Enum.reduce(0, fn _, acc -> acc + 1 end)
end
end
:timer.tc(Onebr2, :run, []) |> print_result.()
Lee fichero completo procesando cada línea en paralelo con Flow
defmodule Onebr3 do
def run() do
"./measurements.txt"
|> File.stream!()
|> Stream.take(10_000_000)
|> Flow.from_enumerable(stages: 4, partition: false)
|> Flow.map(fn l -> l |> String.trim() |> String.split(";") end)
|> Flow.map(fn [station, value] -> {station, value |> String.to_float()} end)
|> Enum.reduce(0, fn _, acc -> acc + 1 end)
end
end
:timer.tc(Onebr3, :run, []) |> print_result.()
cuenta por estación con flow
File.stream!("./measurements.txt")
|> Stream.take(100_000)
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.sort()
Solución completa con Flow
defmodule Onebr4 do
def run() do
"./measurements.txt"
|> File.stream!()
|> Stream.take(10_000_000)
|> Flow.from_enumerable()
|> Flow.map(fn l -> l |> String.trim() |> String.split(";") end)
|> Flow.map(fn [station, value] -> {station, value |> String.to_float()} end)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn {s, v}, acc ->
acc |> Map.update(s, {v, v, v, v},
fn {min, max, sum, count} ->
{min(min, v), max(max,v), sum + v, count + 1 }
end)
end)
|> Enum.reduce(%{}, fn {s, {min, max, sum, count}}, acc ->
acc |> Map.update(s, {min, max, sum, count},
fn {prev_min, prev_max, prev_sum, prev_count} ->
{min(min, prev_min), max(max, prev_max), sum + prev_sum, count + prev_count }
end)
end)
|> Enum.sort(fn {s1, _} , {s2, _} -> s1 < s2 end)
|> Enum.map(fn {s, {min, max, sum, count}} -> {s, min, max, sum / count} end)
end
end
:timer.tc(Onebr4, :run, []) |> print_result.()