Streams
Mix.install([
{:kino, github: "livebook-dev/kino", override: true},
{:kino_lab, "~> 0.1.0-dev", github: "jonatanklosko/kino_lab"},
{:vega_lite, "~> 0.1.4"},
{:kino_vega_lite, "~> 0.1.1"},
{:benchee, "~> 0.1"},
{:ecto, "~> 3.7"},
{:math, "~> 0.7.0"},
{:faker, "~> 0.17.0"},
{:utils, path: "#{__DIR__}/../utils"}
])
Navigation
Setup
Ensure you type the ea
keyboard shortcut to evaluate all Elixir cells before starting. Alternatively you can evaluate the Elixir cells as you read.
Enum Vs Stream
Currently, we typically use the Enum
module to work with collections.
The Enum
module uses eager evaluation. That means any
instruction we provide is immediately executed. This requires each element in the enumeration to be stored in
memory during execution.
1..10
|> Enum.map(fn each -> each * 2 end)
|> Enum.filter(fn each -> each <= 10 end)
|> Enum.take(4)
Each Enum
function stores a copy of the the enumerable it creates and executes sequentially as
seen in this slide.
Utils.animate(:eager_evaluation)
While the operation above is a bit contrived for the sake of simplifying the example, you may notice that by
the end of the execution the Enum.take/2
function only needs two elements.
It seems like a waste of resources to execute every operation on every element, and it is. As the list grows, this becomes a massive waste of computation. It would be far more performant if we could instead build up a series of functions and run them one by one only when needed.
Utils.animate(:lazy_evaluation)
We can’t accomplish this with the Enum
module, however we can with Stream
.
Streams are composable, lazy enumerables.
Lazy means they execute on each element in the stream one by one. Composable
means that we build up functions to execute on each element in the stream.
Notice that the following does not execute, it instead builds up future work to execute and returns
a Stream
data structure.
1..10
|> Stream.map(fn each -> each * 2 end)
|> Stream.filter(fn each -> each <= 10 end)
|> Stream.take(4)
The Stream stores the enumerable, and the list of functions to call on it. The Stream will only evaluate
when it’s called with any eager function from the Enum
module. For example,
1..10
|> Stream.map(fn each -> each * 2 end)
|> Stream.filter(fn each -> each <= 10 end)
|> Stream.take(4)
|> Enum.to_list()
As the collection grows, the performance benefits become more clear. Let’s compare the same
functionality from the Stream
module and the Enum
module.
Performance Comparison
Benchee.run(
%{
"enum" => fn ->
1..200
|> Enum.map(fn each -> each * 2 end)
|> Enum.filter(fn each -> each <= 10 end)
|> Enum.take(4)
end,
"stream" => fn ->
1..200
|> Stream.map(fn each -> each * 2 end)
|> Stream.filter(fn each -> each <= 10 end)
|> Stream.take(4)
|> Enum.to_list()
end
},
memory_time: 2
)
While the exact results may vary, you should notice that the stream runs faster and consumes less memory.
Name ips average deviation median 99th %
stream 69.84 K 14.32 μs ±126.27% 11.30 μs 59.40 μs
enum 8.77 K 114.02 μs ±75.94% 106.10 μs 227.45 μs
Comparison:
stream 69.84 K
enum 8.77 K - 7.96x slower +99.70 μs
Memory usage statistics:
Name Memory usage
stream 31.25 KB
enum 191.88 KB - 6.14x memory usage +160.63 KB
Your Turn
In the Elixir cell below, try refactoring this code using the Stream
module.
1..100
|> Enum.map(fn each -> div(each, 2))
|> Enum.map(fn each -> each * 3)
|> Enum.filter(fn each -> rem(each, 2) === 0)
Stream Gotchas
While the Stream module can improve performance, it’s worth mentioning that it’s not a silver bullet.
For example, a single Stream.map
is not faster than Enum.map
.
{stream_time, _result} = :timer.tc(fn -> Stream.map(1..1000, &(&1 * 2)) |> Enum.to_list() end)
{enum_time, _result} = :timer.tc(fn -> Enum.map(1..1000, &(&1 * 2)) end)
%{stream_time: stream_time, enum_time: enum_time}
That’s because we only gain performance benefits when sequential execution on each element in the collection reduces the amount of computation and/or memory necessary.
Notice how when we string
multiple maps together, the Enum
module is still just as fast or even faster than the Stream
module.
stream_fn = fn ->
1..1000
|> Stream.map(&(&1 * 2))
|> Stream.map(&(&1 * 2))
|> Stream.map(&(&1 * 2))
|> Enum.to_list()
end
enum_fn = fn -> 1..1000 |> Enum.map(&(&1 * 2)) |> Enum.map(&(&1 * 2)) |> Enum.map(&(&1 * 2)) end
{stream_time, _result} = :timer.tc(stream_fn)
{enum_time, _result} = :timer.tc(enum_fn)
%{stream_time: stream_time, enum_time: enum_time}
In terms of improving performance, Streams will generally provide the greatest benefits when operations reduce the number of elements in the enumerable.
Stream
As you’ve seen, Streams generate elements one by one and can improve performance.
They are often beneficial when working with large data as they only consume the amount of data
necessary for each operation. On the other hand, Enum
will create an entire copy of
the collection for each intermediate step.
flowchart
subgraph Enum
direction LR
E1[Enum] --> EO1[Operation] --> L1[Collection] --> EO2[Operation] --> L2[Collection]
end
subgraph Stream
S1[Stream] --> SE1[Enum] --> SO1[Operation] --> SR[Collection]
end
Stream provides many equivalent functions that we’ll find on the Enum module. You’ve
already seen map/2
, filter/2
, and take/2
.
We also have access to equivalent functions such as with_index/1
, drop/2
, chunk_every/2
, and each/2
.
We’ve used pattern matching here to demonstrate each has similar behavior.
[{"a", 0}, {"b", 1}, {"c", 2}] = Enum.with_index(["a", "b", "c"])
[{"a", 0}, {"b", 1}, {"c", 2}] = Stream.with_index(["a", "b", "c"]) |> Enum.to_list()
["b", "c"] = Enum.drop(["a", "b", "c"], 1)
["b", "c"] = Stream.drop(["a", "b", "c"], 1) |> Enum.to_list()
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] = Enum.chunk_every(1..10, 2)
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] = Stream.chunk_every(1..10, 2) |> Enum.to_list()
Streams As Generators
However, Streams are more than a performant alternative to Enum. Streams are also used to generate a (potentially infinite) amount of data.
Streams only execute necessary computation, so we can have a theoretically
infinite stream such as 1,2,3,1,2,3,1,2,3....
continue forever. However, we’ll only ever generate the necessary number of elements.
flowchart LR
1 --> 2 --> 3 --> 1
Here’s that shown using Stream.cycle/2
, which cycles through an enumerable.
Stream.cycle([1, 2, 3])
|> Enum.take(10)
We’ve been using Streams for a while now! Did you know that ranges are Streams? Each element in the range is only created as necessary. For example, we can create an incredibly massive range in a single moment.
But when we use that massive range with an Enum
operation, then we pay the computational price!
{creation_time, _result} = :timer.tc(fn -> 1..10_000_000_000_000_000_000_000_000_000 end)
# The below range is smaller to avoid crashing your computer!
{execution_time, _result} = :timer.tc(fn -> Enum.to_list(1..10_000_000) end)
%{creation_time: creation_time, execution_time: execution_time}
Stream.iterate/2
We can use Stream.iterate/2
to iterate over an accumulator.
For example, we could start at 0
, and then add 1
to it for every iteration.
flowchart LR
1 --> 2 --> 3 --> 4 --> 5
n1[n] --> n2[n + 1] --> n3[n + 2] --> n4[n + 3] --> n5[n + 4]
Stream.iterate(0, fn accumulator -> accumulator + 1 end) |> Enum.take(5)
The initial value of the accumulator will be 0
. The next value of the accumulator will
be accumulator + 1
which is 0 + 1
. This pattern continues to create [1, 2, 3, 4, 5]
.
flowchart LR
Accumulator --> Function --> A2[Next Accumulator] --> Function
You could build more complicated sequences this way too. For example, there is such a thing as the Collatz Conjecture. Also known as the simplest math problem that no one can solve.
Kino.YouTube.new("https://www.youtube.com/watch?v=094y1Z2wpJg")
The Collatz Conjecture states that if you take any number and apply the following two rules:
- If the number is odd, multiply the number by three and add 1
- If the number is even, divide by two.
Eventually, no matter the starting number, the sequence should infinitely repeat 4, 2, 1
.
flowchart LR
integer --> even
integer --> odd
even --> a[x / 2]
odd --> b[3x + 1]
a --> 4
b --> 4
4 --> 2 --> 1
For example, let’s take the number 5
to see how it quickly repeats 4, 2, 1
flowchart LR
5 -- 5 * 3 + 1 --> 16 --16 / 2--> 8 -- 8 / 2 --> 4 -- 4 / 2--> 2 -- 2 / 2 --> 1
-- 1 * 3 + 1--> a[4] --4 / 2--> b[2] --2 / 2--> c[1]
Your Turn
In the Elixir cell below, try changing starting_integer
and see how it always
eventually repeats 4, 2, 1
.
We’re limited to how many elements we can display, so if the integer is too large you might not see the end of the list.
starting_integer = 20
Stream.iterate(starting_integer, fn n -> if rem(n, 2) === 0, do: div(n, 2), else: 3 * n + 1 end)
|> Enum.take(100)
Stream.unfold/2
While Stream.iterate/2
treats the accumulator as the value, unfold/2
separates the accumulator and
the return value. So you can accumulate, and then generate a separate value from the accumulator.
flowchart LR
Accumulator --> Function --> Value
Function --> Accumulator
Stream.unfold(5, fn accumulator ->
value = "value: #{accumulator}"
next_accumulator = accumulator + 1
{value, next_accumulator}
end)
|> Enum.take(5)
You can use Stream.unfold/2
with Enum.to_list/2
so long as you specify when the stream should end.
Otherwise, the stream would be infinite and run forever. To specify when the stream should end, you return nil
.
You could do this with a separate function head, or some other control flow.
Stream.unfold(5, fn
0 ->
nil
accumulator ->
value = "value: #{accumulator}"
next_accumulator = accumulator + 1
{value, next_accumulator}
end)
|> Enum.take(5)
For example, we could use Stream.unfold/2
to improve the Collatz Conjecture function from above
to simply stop when the element reaches 4
. That way if it ever stops, it must have
reached 4
and we’ve proven the conjecture.
starting_number = 100
Stream.unfold(starting_number, fn
4 ->
nil
accumulator ->
next_accumulator =
if rem(accumulator, 2) === 0, do: div(accumulator, 2), else: 3 * accumulator + 1
{accumulator, next_accumulator}
end)
|> Enum.to_list()
Your Turn
In the Elixir cell below, use Stream.unfold/2
to generate a list of cubed numbers. Use Enum.take/2
to evaluate the first 100
numbers. You need to reverse the generated stream before you evaluate the first
100
numbers
I.e. $1^3, 2^3, 3^3, 4^3, …$ which would be [1, 8, 27 , 64]
and so on.
Commit Your Progress
Run the following in your command line from the project folder to track and save your progress in a Git commit.
$ git add .
$ git commit -m "finish streams section"