stream
Mix.install([
{:benchee, "~> 1.0"},
{:kino, "~> 0.8.0"},
{:req, "~> 0.3.0"},
{:nimble_csv, "~> 1.0"},
{:vega_lite, "~> 0.1.0"},
{:kino_vega_lite, "~> 0.1.0"}
])
alias VegaLite, as: Vl
Stream
-
uses Enumerable protocol
-
lazy
-
the enumerable is traversed only once
resources
Enum vs Stream
-
make sure you measure
-
generally Stream implementation outperforms an Enum implementation with large enough datasets
-
there is some CPU cost associated with using Stream as opposed to Enum
# normal Enum
1..3
|> Enum.map(fn num -> IO.inspect(num * 2, label: "Step 1") end)
|> Enum.map(fn num -> IO.inspect(num + 2, label: "Step 2") end)
# Stream executed with Enum function
1..3
|> Stream.map(fn num -> IO.inspect(num * 2, label: "Step 1") end)
|> Stream.map(fn num -> IO.inspect(num + 2, label: "Step 2") end)
|> Enum.sum()
# Stream executed with Stream.run/1
1..3
|> Stream.map(fn num -> IO.inspect(num * 2, label: "Step 1") end)
|> Stream.map(fn num -> IO.inspect(num + 2, label: "Step 2") end)
|> Stream.run()
Measure execution times
measure_enum_time = fn upper_bound ->
:timer.tc(fn ->
1..upper_bound
|> Enum.reject(&(rem(&1, 2) == 1))
|> Enum.map(&(&1 * 2))
|> Enum.sum()
end)
end
measure_stream_time = fn upper_bound ->
:timer.tc(fn ->
1..upper_bound
|> Stream.reject(&(rem(&1, 2) == 1))
|> Stream.map(&(&1 * 2))
|> Enum.sum()
end)
end
:ok
data =
for n <- [5, 50, 5_000, 500_000, 5_000_000] do
{enum_time_us, _output} = measure_enum_time.(n)
{stream_time_us, _output} = measure_stream_time.(n)
%{
n: n,
enum_time_us: enum_time_us,
stream_time_us: stream_time_us
}
end
Kino.DataTable.new(
data,
keys: [:n, :enum_time_us, :stream_time_us]
)
bench_results =
for n <- [5, 50, 5_000, 500_000, 5_000_000] do
Benchee.run(%{
"enum_time_us #{n}" => fn -> measure_enum_time.(n) end,
"stream_time_us #{n}" => fn -> measure_stream_time.(n) end
})
end
:ok
infinite stream with Stream.iterate/2
-
useful when we do not know the size of data to iterate over
-
be sure to have some sort of safeguard in place as
Stream.iterate
will generate values indefinitely if left unchecked
## Getting three hacker news articles that contain the word "elixir"
initial_item_id = 31_424_400
fetch_item = fn item_id ->
Req.get!("https://hacker-news.firebaseio.com/v0/item/#{item_id}.json")
end
initial_item_id
|> Stream.iterate(&(&1 + 1))
|> Enum.reduce_while([], fn
# halt once the accumulator has at least three elements in it
_item, acc when length(acc) >= 3 ->
{:halt, acc}
item_id, acc ->
case fetch_item.(item_id).body do
%{"type" => "comment", "text" => text} = data
when not is_nil(text) ->
is_elixir_article =
text
|> String.downcase()
|> String.contains?("elixir")
updated_acc =
if is_elixir_article do
[data | acc]
else
acc
end
{:cont, updated_acc}
_data ->
{:cont, acc}
end
end)
processing large files with File.stream!/1
NimbleCSV.define(MyParser, separator: ",")
data_file_path = Path.join(System.tmp_dir!(), "population.csv")
data_source_url =
"https://raw.githubusercontent.com/datasets/population/master/data/population.csv"
%{status: 200} = Req.get!(data_source_url, output: data_file_path)
data =
File.stream!(data_file_path)
|> MyParser.parse_stream()
|> Stream.filter(fn
["Japan", _, _, _] -> true
_ -> false
end)
|> Stream.map(fn [_, _, year, population] ->
%{year: year, population: population}
end)
|> Enum.to_list()
|> dbg()
Vl.new(width: 1600, height: 400)
|> Vl.data_from_values(data)
|> Vl.mark(:bar)
|> Vl.encode_field(:x, "year", type: :nominal, axis: [label_angle: 0])
|> Vl.encode_field(:y, "population", type: :quantitative)
Stream.duplicate/2
sentence = "元気があれば何でもできる!"
color_char = fn c -> [IO.ANSI.color(Enum.random(0..255)), c] end
duplicate_count = 5
stream = sentence |> String.codepoints() |> Stream.duplicate(duplicate_count)
for chars <- stream,
c <- chars do
color_char.(c) |> IO.write()
end
:ok
Stream.cycle/1
-
NOTE: This would run forever without
Enum.take/2
sentence = "元気があれば何でもできる!"
color_char = fn c -> [IO.ANSI.color(Enum.random(0..255)), c] end
cycle_count = 100
stream = sentence |> to_charlist() |> Stream.cycle()
for c <- stream |> Enum.take(cycle_count) do
color_char.(c) |> IO.write()
end
:ok
Stream.repeatedly/1
sentence = "元気があれば何でもできる!"
color_char = fn c -> [IO.ANSI.color(Enum.random(0..255)), c] end
repeat_count = 10
stream = Stream.repeatedly(fn -> sentence |> to_charlist() end)
for c <- stream |> Enum.take(repeat_count) |> List.flatten() do
color_char.(c) |> IO.write()
end
:ok