Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

stream

notebooks/stream.livemd

stream

Mix.install([
  {:benchee, "~> 1.0"},
  {:kino, "~> 0.8.0"},
  {:req, "~> 0.3.0"},
  {:nimble_csv, "~> 1.0"},
  {:vega_lite, "~> 0.1.0"},
  {:kino_vega_lite, "~> 0.1.0"}
])

alias VegaLite, as: Vl

Stream

  • uses Enumerable protocol
  • lazy
  • the enumerable is traversed only once

resources

Enum vs Stream

  • make sure you measure
  • generally Stream implementation outperforms an Enum implementation with large enough datasets
  • there is some CPU cost associated with using Stream as opposed to Enum
# normal Enum 
1..3
|> Enum.map(fn num -> IO.inspect(num * 2, label: "Step 1") end)
|> Enum.map(fn num -> IO.inspect(num + 2, label: "Step 2") end)
# Stream executed with Enum function
1..3
|> Stream.map(fn num -> IO.inspect(num * 2, label: "Step 1") end)
|> Stream.map(fn num -> IO.inspect(num + 2, label: "Step 2") end)
|> Enum.sum()
# Stream executed with Stream.run/1
1..3
|> Stream.map(fn num -> IO.inspect(num * 2, label: "Step 1") end)
|> Stream.map(fn num -> IO.inspect(num + 2, label: "Step 2") end)
|> Stream.run()

Measure execution times

measure_enum_time = fn upper_bound ->
  :timer.tc(fn ->
    1..upper_bound
    |> Enum.reject(&(rem(&1, 2) == 1))
    |> Enum.map(&(&1 * 2))
    |> Enum.sum()
  end)
end

measure_stream_time = fn upper_bound ->
  :timer.tc(fn ->
    1..upper_bound
    |> Stream.reject(&(rem(&1, 2) == 1))
    |> Stream.map(&(&1 * 2))
    |> Enum.sum()
  end)
end

:ok
data =
  for n <- [5, 50, 5_000, 500_000, 5_000_000] do
    {enum_time_us, _output} = measure_enum_time.(n)
    {stream_time_us, _output} = measure_stream_time.(n)

    %{
      n: n,
      enum_time_us: enum_time_us,
      stream_time_us: stream_time_us
    }
  end

Kino.DataTable.new(
  data,
  keys: [:n, :enum_time_us, :stream_time_us]
)
bench_results =
  for n <- [5, 50, 5_000, 500_000, 5_000_000] do
    Benchee.run(%{
      "enum_time_us #{n}" => fn -> measure_enum_time.(n) end,
      "stream_time_us #{n}" => fn -> measure_stream_time.(n) end
    })
  end

:ok

infinite stream with Stream.iterate/2

  • useful when we do not know the size of data to iterate over
  • be sure to have some sort of safeguard in place as Stream.iterate will generate values indefinitely if left unchecked
## Getting three hacker news articles that contain the word "elixir"

initial_item_id = 31_424_400

fetch_item = fn item_id ->
  Req.get!("https://hacker-news.firebaseio.com/v0/item/#{item_id}.json")
end

initial_item_id
|> Stream.iterate(&amp;(&amp;1 + 1))
|> Enum.reduce_while([], fn
  # halt once the accumulator has at least three elements in it
  _item, acc when length(acc) >= 3 ->
    {:halt, acc}

  item_id, acc ->
    case fetch_item.(item_id).body do
      %{"type" => "comment", "text" => text} = data
      when not is_nil(text) ->
        is_elixir_article =
          text
          |> String.downcase()
          |> String.contains?("elixir")

        updated_acc =
          if is_elixir_article do
            [data | acc]
          else
            acc
          end

        {:cont, updated_acc}

      _data ->
        {:cont, acc}
    end
end)

processing large files with File.stream!/1

NimbleCSV.define(MyParser, separator: ",")

data_file_path = Path.join(System.tmp_dir!(), "population.csv")

data_source_url =
  "https://raw.githubusercontent.com/datasets/population/master/data/population.csv"

%{status: 200} = Req.get!(data_source_url, output: data_file_path)

data =
  File.stream!(data_file_path)
  |> MyParser.parse_stream()
  |> Stream.filter(fn
    ["Japan", _, _, _] -> true
    _ -> false
  end)
  |> Stream.map(fn [_, _, year, population] ->
    %{year: year, population: population}
  end)
  |> Enum.to_list()
  |> dbg()

Vl.new(width: 1600, height: 400)
|> Vl.data_from_values(data)
|> Vl.mark(:bar)
|> Vl.encode_field(:x, "year", type: :nominal, axis: [label_angle: 0])
|> Vl.encode_field(:y, "population", type: :quantitative)

Stream.duplicate/2

sentence = "元気があれば何でもできる!"
color_char = fn c -> [IO.ANSI.color(Enum.random(0..255)), c] end
duplicate_count = 5

stream = sentence |> String.codepoints() |> Stream.duplicate(duplicate_count)

for chars <- stream,
    c <- chars do
  color_char.(c) |> IO.write()
end

:ok

Stream.cycle/1

  • NOTE: This would run forever without Enum.take/2
sentence = "元気があれば何でもできる!"
color_char = fn c -> [IO.ANSI.color(Enum.random(0..255)), c] end
cycle_count = 100

stream = sentence |> to_charlist() |> Stream.cycle()

for c <- stream |> Enum.take(cycle_count) do
  color_char.(c) |> IO.write()
end

:ok

Stream.repeatedly/1

sentence = "元気があれば何でもできる!"
color_char = fn c -> [IO.ANSI.color(Enum.random(0..255)), c] end
repeat_count = 10

stream = Stream.repeatedly(fn -> sentence |> to_charlist() end)

for c <- stream |> Enum.take(repeat_count) |> List.flatten() do
  color_char.(c) |> IO.write()
end

:ok