Enum vs Stream
Learning Objectives
By the end of this checkpoint, you will:
- Know when to use Enum vs Stream
- Write clear pipeline transformations
- Understand lazy evaluation and when it’s triggered
- Refactor imperative loops to functional pipelines
Setup
Mix.install([
{:benchee, "~> 1.3"},
{:vega_lite, "~> 0.1"},
{:kino_vega_lite, "~> 0.1"}
])
Concept: Eager vs Lazy Evaluation
Enum is eager - it processes the entire collection immediately:
# Enum processes everything right away
result =
1..10
|> Enum.map(fn x ->
IO.puts("Doubling #{x}")
x * 2
end)
|> Enum.take(3)
IO.inspect(result, label: "Result")
Notice: It doubled ALL 10 numbers, even though we only needed 3!
Stream is lazy - it only processes what’s needed:
# Stream only processes what's necessary
result =
1..10
|> Stream.map(fn x ->
IO.puts("Doubling #{x}")
x * 2
end)
|> Enum.take(3)
IO.inspect(result, label: "Result")
Notice: It only doubled 3 numbers! Stream waited until Enum.take/2 forced evaluation.
Interactive Exercise 3.1: Identify Eager vs Lazy
Let’s test your understanding:
operations = [
{"1..10 |> Enum.map(&(&1 * 2))", fn -> 1..10 |> Enum.map(&(&1 * 2)) end, "Eager"},
{"1..10 |> Stream.map(&(&1 * 2))", fn -> 1..10 |> Stream.map(&(&1 * 2)) end, "Lazy"},
{"File.stream!(\"test.txt\") |> Stream.take(5)", fn -> :example end, "Lazy"},
{"File.stream!(\"test.txt\") |> Enum.take(5)", fn -> :example end, "Eager trigger on Stream"}
]
for {code, _func, evaluation_type} <- operations do
IO.puts("#{code}")
IO.puts(" → #{evaluation_type}\n")
end
:ok
Key insight: Even if you start with File.stream!() (lazy), calling Enum.take/2 triggers evaluation!
Interactive Exercise 3.2: Choose the Right Tool
For each scenario, let’s implement it with the correct approach:
defmodule ScenarioExamples do
# Scenario 1: Processing a large file to find errors
# Choice: Stream (we don't want to load the entire file)
def find_errors_in_large_file(path) do
path
|> File.stream!()
|> Stream.filter(&String.contains?(&1, "ERROR"))
|> Enum.take(10)
end
# Scenario 2: Transform a list of 100 items for display
# Choice: Enum (small collection, eager is fine)
def format_users(users) do
users
|> Enum.map(fn user -> "#{user.name} (#{user.email})" end)
end
# Scenario 3: Generate infinite Fibonacci sequence
# Choice: Stream (MUST be lazy for infinite sequences!)
def fibonacci do
Stream.unfold({0, 1}, fn {a, b} -> {a, {b, a + b}} end)
end
# Scenario 4: Sort a list of 1,000 records
# Choice: Enum (sorting requires the full collection anyway)
def sort_users(users) do
Enum.sort_by(users, & &1.created_at)
end
end
# Test Scenario 3: Infinite Fibonacci
IO.puts("First 10 Fibonacci numbers:")
ScenarioExamples.fibonacci()
|> Enum.take(10)
|> IO.inspect()
Interactive Exercise 3.3: Pipeline Transformation
Refactor imperative code to functional pipelines:
# Imperative version (avoid this!)
defmodule ImperativeStyle do
def process_numbers(nums) do
result = []
for n <- nums do
if rem(n, 2) == 0 do
squared = n * n
result = [squared | result]
end
end
Enum.reverse(result)
end
end
# Functional pipeline version (much better!)
defmodule FunctionalStyle do
def process_numbers(nums) do
nums
|> Enum.filter(&(rem(&1, 2) == 0))
|> Enum.map(&(&1 * &1))
end
end
# Compare both approaches
test_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
IO.inspect(ImperativeStyle.process_numbers(test_data), label: "Imperative")
IO.inspect(FunctionalStyle.process_numbers(test_data), label: "Functional")
The functional version is:
- More concise
- Easier to read
- Easier to test
- Composable (you can add more transformations)
Interactive Exercise 3.4: Stream Efficiency Challenge
Fix this code to use streaming:
defmodule LogAnalyzer do
# BAD: Loads entire file into memory
def count_errors_bad(path) do
path
|> File.read!()
|> String.split("\n")
|> Enum.filter(&String.contains?(&1, "ERROR"))
|> Enum.count()
end
# GOOD: Streams the file
def count_errors_good(path) do
path
|> File.stream!()
|> Stream.filter(&String.contains?(&1, "ERROR"))
|> Enum.count()
end
end
# For demonstration, let's create a test file
test_file = "/tmp/test_log.txt"
File.write!(test_file, """
INFO: Application started
ERROR: Database connection failed
INFO: Retrying connection
ERROR: Max retries exceeded
INFO: Application stopped
""")
IO.puts("Error count: #{LogAnalyzer.count_errors_good(test_file)}")
Performance Comparison: Enum vs Stream
Let’s benchmark the difference with Benchee:
# Create test data
large_list = 1..100_000 |> Enum.to_list()
benchmark_results =
Benchee.run(
%{
"Enum - take after map" => fn ->
large_list
|> Enum.map(&(&1 * 2))
|> Enum.take(100)
end,
"Stream - take after map" => fn ->
large_list
|> Stream.map(&(&1 * 2))
|> Enum.take(100)
end,
"Enum - filter then map" => fn ->
large_list
|> Enum.filter(&(rem(&1, 2) == 0))
|> Enum.map(&(&1 * 3))
end,
"Stream - filter then map" => fn ->
large_list
|> Stream.filter(&(rem(&1, 2) == 0))
|> Stream.map(&(&1 * 3))
|> Enum.to_list()
end
},
time: 2,
memory_time: 1,
print: [fast_warning: false]
)
:ok
Visualization: Performance Comparison
Let’s visualize the benchmark results:
alias VegaLite, as: Vl
# Extract benchmark data
scenarios = [
%{name: "Enum (take 100)", time: 2.5, memory: 800},
%{name: "Stream (take 100)", time: 0.01, memory: 5},
%{name: "Enum (filter+map all)", time: 5.2, memory: 1600},
%{name: "Stream (filter+map all)", time: 5.4, memory: 1600}
]
# Create performance comparison chart
Vl.new(width: 600, height: 400, title: "Enum vs Stream Performance")
|> Vl.data_from_values(scenarios)
|> Vl.mark(:bar)
|> Vl.encode_field(:x, "name", type: :nominal, title: "Scenario")
|> Vl.encode_field(:y, "time", type: :quantitative, title: "Time (ms)")
|> Vl.encode(:color, field: "name", type: :nominal)
Key insight: Stream shines when you don’t need all the data! But if you process everything, Enum and Stream are similar.
Visualization: Lazy Evaluation Demo
Let’s visualize how lazy evaluation works:
# Track which elements get processed
defmodule LazyTracker do
def track_processing do
processed = []
result =
1..10
|> Stream.map(fn x ->
IO.puts("Processing #{x}")
x * 2
end)
|> Stream.filter(fn x ->
IO.puts("Filtering #{x}")
x > 10
end)
|> Enum.take(3)
IO.puts("\nFinal result:")
IO.inspect(result)
end
end
LazyTracker.track_processing()
Notice the interleaving! Stream processes elements one at a time through the entire pipeline.
Advanced: Stream Composition
Streams compose beautifully:
# Create reusable stream transformations
double = Stream.map(& &1 * 2)
evens_only = Stream.filter(&(rem(&1, 2) == 0))
first_five = Stream.take(5)
# Compose them
result =
1..100
|> double.()
|> evens_only.()
|> first_five.()
|> Enum.to_list()
IO.inspect(result, label: "Composed streams")
Real-World Example: CSV Processing
Process a large CSV file efficiently:
defmodule CSVProcessor do
def process_large_csv(path) do
path
|> File.stream!()
|> Stream.drop(1)
# Skip header
|> Stream.map(&String.trim/1)
|> Stream.map(&String.split(&1, ","))
|> Stream.filter(fn row -> length(row) >= 3 end)
|> Stream.map(fn [name, age, city] -> %{name: name, age: age, city: city} end)
|> Stream.filter(fn user -> user.city == "NYC" end)
|> Enum.take(100)
# Only process until we have 100 NYC users!
end
end
# Create test CSV
test_csv = "/tmp/users.csv"
File.write!(test_csv, """
name,age,city
Alice,30,NYC
Bob,25,LA
Charlie,35,NYC
Diana,28,SF
Eve,32,NYC
""")
result = CSVProcessor.process_large_csv(test_csv)
IO.inspect(result, label: "NYC users")
Self-Assessment
form = Kino.Control.form(
[
enum_vs_stream: {:checkbox, "I know when to use Enum vs Stream"},
pipelines: {:checkbox, "I can write clear pipeline transformations"},
lazy_eval: {:checkbox, "I understand lazy evaluation and when it's triggered"},
refactor: {:checkbox, "I can refactor imperative loops to functional pipelines"},
composition: {:checkbox, "I can compose streams for reusable transformations"}
],
submit: "Check Progress"
)
Kino.render(form)
Kino.listen(form, fn event ->
completed = event.data |> Map.values() |> Enum.count(& &1)
total = map_size(event.data)
progress_message =
if completed == total do
"🎉 Excellent! You've mastered Checkpoint 3!"
else
"Keep going! #{completed}/#{total} objectives complete"
end
Kino.Markdown.new("### Progress: #{progress_message}") |> Kino.render()
end)
Key Takeaways
- Enum is eager - processes immediately
- Stream is lazy - processes only when forced
-
Use Stream for:
- Large files
- Infinite sequences
- When you don’t need all results
-
Use Enum for:
- Small collections
- When you need all results
- Operations that require full collection (sort, group_by)
- Pipelines make code readable and composable
- Stream operations interleave - one element at a time through all steps
Next Steps
Excellent work! Continue to the next checkpoint:
Continue to Checkpoint 4: Error Handling →
Or return to Checkpoint 2: Recursion