Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Streams

reading/streams.livemd

Streams

Mix.install([
  {:jason, "~> 1.4"},
  {:kino, "~> 0.9", override: true},
  {:youtube, github: "brooklinjazz/youtube"},
  {:hidden_cell, github: "brooklinjazz/hidden_cell"},
  {:benchee, "~> 1.1"}
])

Navigation

Home Report An Issue Battle MapFile

Learning Outcomes

Upon completing this lesson, a student should be able to answer the following questions.

  • How do you use streams to avoid memory impact or executing more instructions than necessary?
  • When might you use Enum vs Stream?

Enum Vs Stream

Currently, we typically use the Enum module to work with collections. The Enum module uses eager evaluation. That means any instruction we provide is immediately executed. This requires each element in the enumeration to be stored in memory during execution.

1..10
|> Enum.map(fn each -> each * 2 end)
|> Enum.filter(fn each -> each <= 10 end)
|> Enum.take(4)

Each Enum function stores a copy of the the enumerable it creates and executes sequentially as seen in this diagram.

box = fn value, color ->
  "#{value}"
end

get_color = fn in_memory ->
  if in_memory, do: "green", else: "lightgray"
end

Kino.animate(1000, 1, fn _, row ->
  md =
    Kino.Markdown.new("""
    
      

1..10:

#{Enum.map(1..10, fn index -> box.(index, get_color.(row >= 0)) end)}

Enum.map

#{Enum.map(1..10, fn index -> box.(index * 2, get_color.(row >= 1)) end)}

Enum.filter

#{Enum.map(1..10, fn index -> box.(index * 2, get_color.(index <= 5 and row >= 2)) end)}

Enum.take

#{Enum.map(1..10, fn index -> box.(index * 2, get_color.(index <= 4 and row >= 3)) end)} """) {:cont, md, rem(row + 1, 4)} end)

While the operation above is a bit contrived for the sake of simplifying the example, you may notice that by the end of the execution the Enum.take/2 function only needs four elements.

It’s a waste of resources to execute every operation on every element. It would be far more performant if we could instead build up a series of functions and run them one by one only when needed.

By using Streams and Lazy evaluation, we can massively improve memory usage as seen in the animation below.

box = fn value, color ->
  "#{value}"
end

get_color = fn in_memory ->
  if in_memory, do: "green", else: "lightgray"
end

Kino.animate(500, {1, 1}, fn _, {column, row} = position ->
  md =
    Kino.Markdown.new("""
    
      

In Memory:

#{Enum.map(1..10, fn index -> box.(index, get_color.(row == 1 and column == index)) end)}

In Memory:

#{Enum.map(1..10, fn index -> box.(index * 2, get_color.(row == 2 and column == index)) end)}

In Memory:

#{Enum.map(1..10, fn index -> box.(index * 2, get_color.(row == 3 and column == index)) end)}

In Memory:

#{Enum.map(1..10, fn index -> box.(index * 2, get_color.(column > index || (row == 4 and column == index))) end)} """) next_position = case position do {4, 4} -> {1, 1} {column, 4} -> {column + 1, 1} {column, row} -> {column, row + 1} end {:cont, md, next_position} end)

We can’t accomplish this with the Enum module, however we can with Stream. Streams are composable, lazy enumerables. Lazy means they execute on each element in the stream one by one. Composable means that we build up functions to execute on each element in the stream.

Notice that the following does not execute, it instead builds up future work to execute and returns a Stream data structure with a list of functions to apply.

1..10
|> Stream.map(fn each -> each * 2 end)
|> Stream.filter(fn each -> each <= 10 end)
|> Stream.take(4)

The Stream stores the enumerable, and the list of functions to call on it. The Stream will only evaluate when it’s called with any eager function from the Enum module. For example,

1..10
|> Stream.map(fn each -> each * 2 end)
|> Stream.filter(fn each -> each <= 10 end)
|> Stream.take(4)
|> Enum.to_list()

We can see this more clearly if we use IO.puts/1 to trigger a side effect. Notice we’re not printing anything.

Stream.each(1..10, &amp;IO.puts/1)

IO.puts/1 is only called when we run the stream with Stream.run/1 or when we use an Enum function.

1..10
|> Stream.each(&amp;IO.puts/1)
|> Stream.run()
1..10
|> Stream.each(&amp;IO.puts/1)
|> Enum.to_list()

As the collection grows, the performance benefits become more clear. Let’s compare the same functionality from the Stream module and the Enum module.

Performance Comparison

Benchee.run(
  %{
    "enum" => fn ->
      1..200
      |> Enum.map(fn each -> each * 2 end)
      |> Enum.filter(fn each -> each <= 10 end)
      |> Enum.take(4)
    end,
    "stream" => fn ->
      1..200
      |> Stream.map(fn each -> each * 2 end)
      |> Stream.filter(fn each -> each <= 10 end)
      |> Stream.take(4)
      |> Enum.to_list()
    end
  },
  memory_time: 2
)

While the exact results may vary, you should notice that the stream runs faster and consumes less memory.

Name             ips        average  deviation         median         99th %
stream      172.86 K        5.79 μs   ±418.69%           4 μs       18.50 μs
enum          9.64 K      103.76 μs    ±21.83%       98.70 μs      190.13 μs

Comparison: 
stream      172.86 K
enum          9.64 K - 17.94x slower +97.97 μs

Memory usage statistics:

Name      Memory usage
stream         8.54 KB
enum         181.20 KB - 21.22x memory usage +172.66 KB

**All measurements for memory usage were the same**

Your Turn

In the Elixir cell below, try refactoring this code using the Stream module.

1..100
|> Enum.map(fn each -> div(each, 2) end)
|> Enum.map(fn each -> each * 3 end)
|> Enum.filter(fn each -> rem(each, 2) === 0 end)
1..100
|> Stream.map(fn each -> div(each, 2) end)
|> Stream.map(fn each -> each * 3 end)
|> Stream.filter(fn each -> rem(each, 2) === 0 end)
|> Enum.to_list()

Stream Gotchas

While the Stream module can improve performance, it’s worth mentioning that it’s not a silver bullet. For example, a single Stream.map is not faster than Enum.map.

{stream_time, _result} = :timer.tc(fn -> Stream.map(1..1000, &amp;(&amp;1 * 2)) |> Enum.to_list() end)
{enum_time, _result} = :timer.tc(fn -> Enum.map(1..1000, &amp;(&amp;1 * 2)) end)

%{stream_time: stream_time, enum_time: enum_time}

That’s because we only gain performance benefits when sequential execution on each element in the collection reduces the amount of computation and/or memory necessary.

Notice how when we string multiple maps together, the Enum module is still just as fast or even faster than the Stream module.

stream_fn = fn ->
  1..1000
  |> Stream.map(&amp;(&amp;1 * 2))
  |> Stream.map(&amp;(&amp;1 * 2))
  |> Stream.map(&amp;(&amp;1 * 2))
  |> Enum.to_list()
end

enum_fn = fn -> 1..1000 |> Enum.map(&amp;(&amp;1 * 2)) |> Enum.map(&amp;(&amp;1 * 2)) |> Enum.map(&amp;(&amp;1 * 2)) end

{stream_time, _result} = :timer.tc(stream_fn)
{enum_time, _result} = :timer.tc(enum_fn)

%{stream_time: stream_time, enum_time: enum_time}

In terms of improving performance, Streams will generally provide the greatest benefits when operations reduce the number of elements in the enumerable. Whenever optimizing, use benchmarks to verify your assumptions rather than relying on theoretical performance gains.

Stream

As you’ve seen, Streams generate elements one by one and can improve performance. They are often beneficial when working with large data as they only consume the amount of data necessary for each operation. On the other hand, Enum will create an entire copy of the collection for each intermediate step.

flowchart
  subgraph Enum
    direction LR
    E1[Enum] --> EO1[Operation] --> L1[Collection] --> EO2[Operation] --> L2[Collection]
  end
  subgraph Stream
    S1[Stream] --> SE1[Enum] --> SO1[Operation] --> SR[Collection]
  end

Stream provides many equivalent functions that we’ll find on the Enum module. You’ve already seen map/2, filter/2, and take/2.

We also have access to equivalent functions such as with_index/1, drop/2, chunk_every/2, and each/2. We’ve used pattern matching here to demonstrate each has similar behavior.

[{"a", 0}, {"b", 1}, {"c", 2}] = Enum.with_index(["a", "b", "c"])
[{"a", 0}, {"b", 1}, {"c", 2}] = Stream.with_index(["a", "b", "c"]) |> Enum.to_list()

["b", "c"] = Enum.drop(["a", "b", "c"], 1)
["b", "c"] = Stream.drop(["a", "b", "c"], 1) |> Enum.to_list()

[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] = Enum.chunk_every(1..10, 2)
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] = Stream.chunk_every(1..10, 2) |> Enum.to_list()

Streams As Generators

However, Streams are more than a performant alternative to Enum. Streams are also used to generate a (potentially infinite) amount of data.

Streams only execute necessary computation, so we can have a theoretically infinite stream such as 1,2,3,1,2,3,1,2,3.... continue forever. However, we’ll only ever generate the necessary number of elements.

flowchart LR
1 --> 2 --> 3 --> 1

Here’s that shown using Stream.cycle/2, which cycles through an enumerable.

Stream.cycle([1, 2, 3])
|> Enum.take(300)
|> Enum.chunk_every(3)

Lazy Evaluation

We’ve been using lazy evaluation for a while now! Ranges only create each element in the list when necessary, so we can create a massive range without any noticeable performance impact.

When we use that massive range with an Enum operation, then we pay the computational price!

{creation_time, _result} = :timer.tc(fn -> 1..10_000_000 end)
{execution_time, _result} = :timer.tc(fn -> Enum.to_list(1..10_000_000) end)

%{creation_time: creation_time, execution_time: execution_time}

Stream.iterate/2

We can use Stream.iterate/2 to iterate over an accumulator. For example, we could start at 0, and then add 1 to it for every iteration.

flowchart LR
  1 --> 2 --> 3 --> 4 --> 5
  n1[n] --> n2[n + 1] --> n3[n + 2] --> n4[n + 3] --> n5[n + 4]
Stream.iterate(0, fn accumulator -> accumulator + 1 end) |> Enum.take(5)

The initial value of the accumulator will be 0. The next value of the accumulator will be accumulator + 1 which is 0 + 1. This pattern continues to create [1, 2, 3, 4, 5].

flowchart LR
Accumulator --> Function --> A2[Next Accumulator] --> Function

You could build more complicated sequences this way too. For example, there is such a thing as the Collatz Conjecture. Also known as the simplest math problem that no one can solve.

YouTube.new("https://www.youtube.com/watch?v=094y1Z2wpJg")

The Collatz Conjecture states that if you take any number and apply the following two rules:

  1. If the number is odd, multiply the number by three and add 1
  2. If the number is even, divide by two.

Eventually, no matter the starting number, the sequence should infinitely repeat 4, 2, 1.

flowchart LR
integer --> even
integer --> odd
  even --> a[x / 2]
  odd --> b[3x + 1]
  a --> 4
  b --> 4
  4 --> 2 --> 1

For example, let’s take the number 5 to see how it quickly repeats 4, 2, 1

flowchart LR
  5 -- 5 * 3 + 1 --> 16 --16 / 2--> 8 -- 8 / 2 --> 4 -- 4 / 2--> 2 -- 2 / 2 --> 1
  -- 1 * 3 + 1--> a[4] --4 / 2--> b[2] --2 / 2--> c[1]

Your Turn

Use Stream.iterate to implement the collatz conjecture given a starting_integer such as 20 as the initial accumulator.

  1. If the number is odd, multiply the number by three and add 1
  2. If the number is even, divide by two.

Use Enum.take_while/2 to generate elements from the stream until it returns 4 to prove it eventually repeats 4, 2, 1, forever. Try changing your starting_integer to provide this for larger numbers.

Example Solution

starting_integer = 20

Stream.iterate(starting_integer, fn int ->
  if rem(int, 2) == 0 do
    div(int, 2)
  else
    int * 3 + 1
  end
end)
|> Enum.take_while(fn integer -> integer != 4 end)
100
|> Stream.iterate(fn int ->
  if rem(int, 2) == 0 do
    div(int, 2)
  else
    int * 3 + 1
  end
end)
|> Enum.take_while(fn integer -> integer != 4 end)

Stream.unfold/2

While Stream.iterate/2 treats the accumulator as the value, unfold/2 separates the accumulator and the return value. So you can accumulate, and then generate a separate value from the accumulator.

flowchart LR
  Accumulator --> Function --> Value
  Function --> Accumulator
Stream.unfold(5, fn accumulator ->
  value = "value: #{accumulator}"
  next_accumulator = accumulator + 1
  {value, next_accumulator}
end)
|> Enum.take(5)

You can use Stream.unfold/2 with Enum.to_list/2 so long as you specify when the stream should end. Otherwise, the stream would be infinite and run forever. To specify when the stream should end, you return nil.

You could do this with a separate function head, or some other control flow.

Stream.unfold(0, fn
  10 ->
    nil

  accumulator ->
    value = "value: #{accumulator}"
    next_accumulator = accumulator + 1
    {value, next_accumulator}
end)
|> Enum.to_list()

Your Turn

Use Stream.unfold/2 to generate a list of cubed numbers. Use Enum.take/2 to take the first 20 cubed numbers.

I.e. $1^3, 2^3, 3^3, 4^3, …$ which would be [1, 8, 27 , 64] and so on.

Example Solution

1
|> Stream.unfold(fn int ->
  {int ** 3, int + 1}
end)
|> Enum.take(20)
1
|> Stream.unfold(fn int ->
  {int ** 3, int + 1}
end)
|> Enum.take(10)

Commit Your Progress

DockYard Academy now recommends you use the latest Release rather than forking or cloning our repository.

Run git status to ensure there are no undesirable changes. Then run the following in your command line from the curriculum folder to commit your progress.

$ git add .
$ git commit -m "finish Streams reading"
$ git push

We’re proud to offer our open-source curriculum free of charge for anyone to learn from at their own pace.

We also offer a paid course where you can learn from an instructor alongside a cohort of your peers. We will accept applications for the June-August 2023 cohort soon.

Navigation

Home Report An Issue Battle MapFile