Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Streams

streams.livemd

Streams

Mix.install([
  {:kino, github: "livebook-dev/kino", override: true},
  {:kino_lab, "~> 0.1.0-dev", github: "jonatanklosko/kino_lab"},
  {:vega_lite, "~> 0.1.4"},
  {:kino_vega_lite, "~> 0.1.1"},
  {:benchee, "~> 0.1"},
  {:ecto, "~> 3.7"},
  {:math, "~> 0.7.0"},
  {:faker, "~> 0.17.0"},
  {:utils, path: "#{__DIR__}/../utils"}
])

Navigation

Return Home Report An Issue

Setup

Ensure you type the ea keyboard shortcut to evaluate all Elixir cells before starting. Alternatively you can evaluate the Elixir cells as you read.

Enum Vs Stream

Currently, we typically use the Enum module to work with collections. The Enum module uses eager evaluation. That means any instruction we provide is immediately executed. This requires each element in the enumeration to be stored in memory during execution.

1..10
|> Enum.map(fn each -> each * 2 end)
|> Enum.filter(fn each -> each <= 10 end)
|> Enum.take(4)

Each Enum function stores a copy of the the enumerable it creates and executes sequentially as seen in this slide.

Utils.animate(:eager_evaluation)

While the operation above is a bit contrived for the sake of simplifying the example, you may notice that by the end of the execution the Enum.take/2 function only needs two elements.

It seems like a waste of resources to execute every operation on every element, and it is. As the list grows, this becomes a massive waste of computation. It would be far more performant if we could instead build up a series of functions and run them one by one only when needed.

Utils.animate(:lazy_evaluation)

We can’t accomplish this with the Enum module, however we can with Stream. Streams are composable, lazy enumerables. Lazy means they execute on each element in the stream one by one. Composable means that we build up functions to execute on each element in the stream.

Notice that the following does not execute, it instead builds up future work to execute and returns a Stream data structure.

1..10
|> Stream.map(fn each -> each * 2 end)
|> Stream.filter(fn each -> each <= 10 end)
|> Stream.take(4)

The Stream stores the enumerable, and the list of functions to call on it. The Stream will only evaluate when it’s called with any eager function from the Enum module. For example,

1..10
|> Stream.map(fn each -> each * 2 end)
|> Stream.filter(fn each -> each <= 10 end)
|> Stream.take(4)
|> Enum.to_list()

As the collection grows, the performance benefits become more clear. Let’s compare the same functionality from the Stream module and the Enum module.

Performance Comparison

Benchee.run(
  %{
    "enum" => fn ->
      1..200
      |> Enum.map(fn each -> each * 2 end)
      |> Enum.filter(fn each -> each <= 10 end)
      |> Enum.take(4)
    end,
    "stream" => fn ->
      1..200
      |> Stream.map(fn each -> each * 2 end)
      |> Stream.filter(fn each -> each <= 10 end)
      |> Stream.take(4)
      |> Enum.to_list()
    end
  },
  memory_time: 2
)

While the exact results may vary, you should notice that the stream runs faster and consumes less memory.

Name             ips        average  deviation         median         99th %
stream       69.84 K       14.32 μs   ±126.27%       11.30 μs       59.40 μs
enum          8.77 K      114.02 μs    ±75.94%      106.10 μs      227.45 μs

Comparison:
stream       69.84 K
enum          8.77 K - 7.96x slower +99.70 μs

Memory usage statistics:

Name      Memory usage
stream        31.25 KB
enum         191.88 KB - 6.14x memory usage +160.63 KB

Your Turn

In the Elixir cell below, try refactoring this code using the Stream module.

1..100
|> Enum.map(fn each -> div(each, 2))
|> Enum.map(fn each -> each * 3)
|> Enum.filter(fn each -> rem(each, 2) === 0)

Stream Gotchas

While the Stream module can improve performance, it’s worth mentioning that it’s not a silver bullet. For example, a single Stream.map is not faster than Enum.map.

{stream_time, _result} = :timer.tc(fn -> Stream.map(1..1000, &amp;(&amp;1 * 2)) |> Enum.to_list() end)
{enum_time, _result} = :timer.tc(fn -> Enum.map(1..1000, &amp;(&amp;1 * 2)) end)

%{stream_time: stream_time, enum_time: enum_time}

That’s because we only gain performance benefits when sequential execution on each element in the collection reduces the amount of computation and/or memory necessary.

Notice how when we string multiple maps together, the Enum module is still just as fast or even faster than the Stream module.

stream_fn = fn ->
  1..1000
  |> Stream.map(&amp;(&amp;1 * 2))
  |> Stream.map(&amp;(&amp;1 * 2))
  |> Stream.map(&amp;(&amp;1 * 2))
  |> Enum.to_list()
end

enum_fn = fn -> 1..1000 |> Enum.map(&amp;(&amp;1 * 2)) |> Enum.map(&amp;(&amp;1 * 2)) |> Enum.map(&amp;(&amp;1 * 2)) end

{stream_time, _result} = :timer.tc(stream_fn)
{enum_time, _result} = :timer.tc(enum_fn)

%{stream_time: stream_time, enum_time: enum_time}

In terms of improving performance, Streams will generally provide the greatest benefits when operations reduce the number of elements in the enumerable.

Stream

As you’ve seen, Streams generate elements one by one and can improve performance. They are often beneficial when working with large data as they only consume the amount of data necessary for each operation. On the other hand, Enum will create an entire copy of the collection for each intermediate step.

flowchart
  subgraph Enum
    direction LR
    E1[Enum] --> EO1[Operation] --> L1[Collection] --> EO2[Operation] --> L2[Collection]
  end
  subgraph Stream
    S1[Stream] --> SE1[Enum] --> SO1[Operation] --> SR[Collection]
  end

Stream provides many equivalent functions that we’ll find on the Enum module. You’ve already seen map/2, filter/2, and take/2.

We also have access to equivalent functions such as with_index/1, drop/2, chunk_every/2, and each/2. We’ve used pattern matching here to demonstrate each has similar behavior.

[{"a", 0}, {"b", 1}, {"c", 2}] = Enum.with_index(["a", "b", "c"])
[{"a", 0}, {"b", 1}, {"c", 2}] = Stream.with_index(["a", "b", "c"]) |> Enum.to_list()

["b", "c"] = Enum.drop(["a", "b", "c"], 1)
["b", "c"] = Stream.drop(["a", "b", "c"], 1) |> Enum.to_list()

[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] = Enum.chunk_every(1..10, 2)
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] = Stream.chunk_every(1..10, 2) |> Enum.to_list()

Streams As Generators

However, Streams are more than a performant alternative to Enum. Streams are also used to generate a (potentially infinite) amount of data.

Streams only execute necessary computation, so we can have a theoretically infinite stream such as 1,2,3,1,2,3,1,2,3.... continue forever. However, we’ll only ever generate the necessary number of elements.

flowchart LR
1 --> 2 --> 3 --> 1

Here’s that shown using Stream.cycle/2, which cycles through an enumerable.

Stream.cycle([1, 2, 3])
|> Enum.take(10)

We’ve been using Streams for a while now! Did you know that ranges are Streams? Each element in the range is only created as necessary. For example, we can create an incredibly massive range in a single moment.

But when we use that massive range with an Enum operation, then we pay the computational price!

{creation_time, _result} = :timer.tc(fn -> 1..10_000_000_000_000_000_000_000_000_000 end)
# The below range is smaller to avoid crashing your computer!
{execution_time, _result} = :timer.tc(fn -> Enum.to_list(1..10_000_000) end)

%{creation_time: creation_time, execution_time: execution_time}

Stream.iterate/2

We can use Stream.iterate/2 to iterate over an accumulator. For example, we could start at 0, and then add 1 to it for every iteration.

flowchart LR
  1 --> 2 --> 3 --> 4 --> 5
  n1[n] --> n2[n + 1] --> n3[n + 2] --> n4[n + 3] --> n5[n + 4]
Stream.iterate(0, fn accumulator -> accumulator + 1 end) |> Enum.take(5)

The initial value of the accumulator will be 0. The next value of the accumulator will be accumulator + 1 which is 0 + 1. This pattern continues to create [1, 2, 3, 4, 5].

flowchart LR
Accumulator --> Function --> A2[Next Accumulator] --> Function

You could build more complicated sequences this way too. For example, there is such a thing as the Collatz Conjecture. Also known as the simplest math problem that no one can solve.

Kino.YouTube.new("https://www.youtube.com/watch?v=094y1Z2wpJg")

The Collatz Conjecture states that if you take any number and apply the following two rules:

  1. If the number is odd, multiply the number by three and add 1
  2. If the number is even, divide by two.

Eventually, no matter the starting number, the sequence should infinitely repeat 4, 2, 1.

flowchart LR
integer --> even
integer --> odd
  even --> a[x / 2]
  odd --> b[3x + 1]
  a --> 4
  b --> 4
  4 --> 2 --> 1

For example, let’s take the number 5 to see how it quickly repeats 4, 2, 1

flowchart LR
  5 -- 5 * 3 + 1 --> 16 --16 / 2--> 8 -- 8 / 2 --> 4 -- 4 / 2--> 2 -- 2 / 2 --> 1
  -- 1 * 3 + 1--> a[4] --4 / 2--> b[2] --2 / 2--> c[1]

Your Turn

In the Elixir cell below, try changing starting_integer and see how it always eventually repeats 4, 2, 1.

We’re limited to how many elements we can display, so if the integer is too large you might not see the end of the list.

starting_integer = 20

Stream.iterate(starting_integer, fn n -> if rem(n, 2) === 0, do: div(n, 2), else: 3 * n + 1 end)
|> Enum.take(100)

Stream.unfold/2

While Stream.iterate/2 treats the accumulator as the value, unfold/2 separates the accumulator and the return value. So you can accumulate, and then generate a separate value from the accumulator.

flowchart LR
  Accumulator --> Function --> Value
  Function --> Accumulator
Stream.unfold(5, fn accumulator ->
  value = "value: #{accumulator}"
  next_accumulator = accumulator + 1
  {value, next_accumulator}
end)
|> Enum.take(5)

You can use Stream.unfold/2 with Enum.to_list/2 so long as you specify when the stream should end. Otherwise, the stream would be infinite and run forever. To specify when the stream should end, you return nil.

You could do this with a separate function head, or some other control flow.

Stream.unfold(5, fn
  0 ->
    nil

  accumulator ->
    value = "value: #{accumulator}"
    next_accumulator = accumulator + 1
    {value, next_accumulator}
end)
|> Enum.take(5)

For example, we could use Stream.unfold/2 to improve the Collatz Conjecture function from above to simply stop when the element reaches 4. That way if it ever stops, it must have reached 4 and we’ve proven the conjecture.

starting_number = 100

Stream.unfold(starting_number, fn
  4 ->
    nil

  accumulator ->
    next_accumulator =
      if rem(accumulator, 2) === 0, do: div(accumulator, 2), else: 3 * accumulator + 1

    {accumulator, next_accumulator}
end)
|> Enum.to_list()

Your Turn

In the Elixir cell below, use Stream.unfold/2 to generate a list of cubed numbers. Use Enum.take/2 to evaluate the first 100 numbers. You need to reverse the generated stream before you evaluate the first 100 numbers

I.e. $1^3, 2^3, 3^3, 4^3, …$ which would be [1, 8, 27 , 64] and so on.

Commit Your Progress

Run the following in your command line from the project folder to track and save your progress in a Git commit.

$ git add .
$ git commit -m "finish streams section"