Powered by AppSignal & Oban Pro

Enum vs Stream

03-enum-stream.livemd

Enum vs Stream

Learning Objectives

By the end of this checkpoint, you will:

  • Know when to use Enum vs Stream
  • Write clear pipeline transformations
  • Understand lazy evaluation and when it’s triggered
  • Refactor imperative loops to functional pipelines

Setup

Mix.install([
  {:benchee, "~> 1.3"},
  {:vega_lite, "~> 0.1"},
  {:kino_vega_lite, "~> 0.1"}
])

Concept: Eager vs Lazy Evaluation

Enum is eager - it processes the entire collection immediately:

# Enum processes everything right away
result =
  1..10
  |> Enum.map(fn x ->
    IO.puts("Doubling #{x}")
    x * 2
  end)
  |> Enum.take(3)

IO.inspect(result, label: "Result")

Notice: It doubled ALL 10 numbers, even though we only needed 3!

Stream is lazy - it only processes what’s needed:

# Stream only processes what's necessary
result =
  1..10
  |> Stream.map(fn x ->
    IO.puts("Doubling #{x}")
    x * 2
  end)
  |> Enum.take(3)

IO.inspect(result, label: "Result")

Notice: It only doubled 3 numbers! Stream waited until Enum.take/2 forced evaluation.

Interactive Exercise 3.1: Identify Eager vs Lazy

Let’s test your understanding:

operations = [
  {"1..10 |> Enum.map(&(&1 * 2))", fn -> 1..10 |> Enum.map(&(&1 * 2)) end, "Eager"},
  {"1..10 |> Stream.map(&(&1 * 2))", fn -> 1..10 |> Stream.map(&(&1 * 2)) end, "Lazy"},
  {"File.stream!(\"test.txt\") |> Stream.take(5)", fn -> :example end, "Lazy"},
  {"File.stream!(\"test.txt\") |> Enum.take(5)", fn -> :example end, "Eager trigger on Stream"}
]

for {code, _func, evaluation_type} <- operations do
  IO.puts("#{code}")
  IO.puts("  → #{evaluation_type}\n")
end

:ok

Key insight: Even if you start with File.stream!() (lazy), calling Enum.take/2 triggers evaluation!

Interactive Exercise 3.2: Choose the Right Tool

For each scenario, let’s implement it with the correct approach:

defmodule ScenarioExamples do
  # Scenario 1: Processing a large file to find errors
  # Choice: Stream (we don't want to load the entire file)
  def find_errors_in_large_file(path) do
    path
    |> File.stream!()
    |> Stream.filter(&amp;String.contains?(&amp;1, "ERROR"))
    |> Enum.take(10)
  end

  # Scenario 2: Transform a list of 100 items for display
  # Choice: Enum (small collection, eager is fine)
  def format_users(users) do
    users
    |> Enum.map(fn user -> "#{user.name} (#{user.email})" end)
  end

  # Scenario 3: Generate infinite Fibonacci sequence
  # Choice: Stream (MUST be lazy for infinite sequences!)
  def fibonacci do
    Stream.unfold({0, 1}, fn {a, b} -> {a, {b, a + b}} end)
  end

  # Scenario 4: Sort a list of 1,000 records
  # Choice: Enum (sorting requires the full collection anyway)
  def sort_users(users) do
    Enum.sort_by(users, &amp; &amp;1.created_at)
  end
end

# Test Scenario 3: Infinite Fibonacci
IO.puts("First 10 Fibonacci numbers:")

ScenarioExamples.fibonacci()
|> Enum.take(10)
|> IO.inspect()

Interactive Exercise 3.3: Pipeline Transformation

Refactor imperative code to functional pipelines:

# Imperative version (avoid this!)
defmodule ImperativeStyle do
  def process_numbers(nums) do
    result = []

    for n <- nums do
      if rem(n, 2) == 0 do
        squared = n * n
        result = [squared | result]
      end
    end

    Enum.reverse(result)
  end
end

# Functional pipeline version (much better!)
defmodule FunctionalStyle do
  def process_numbers(nums) do
    nums
    |> Enum.filter(&amp;(rem(&amp;1, 2) == 0))
    |> Enum.map(&amp;(&amp;1 * &amp;1))
  end
end

# Compare both approaches
test_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
IO.inspect(ImperativeStyle.process_numbers(test_data), label: "Imperative")
IO.inspect(FunctionalStyle.process_numbers(test_data), label: "Functional")

The functional version is:

  • More concise
  • Easier to read
  • Easier to test
  • Composable (you can add more transformations)

Interactive Exercise 3.4: Stream Efficiency Challenge

Fix this code to use streaming:

defmodule LogAnalyzer do
  # BAD: Loads entire file into memory
  def count_errors_bad(path) do
    path
    |> File.read!()
    |> String.split("\n")
    |> Enum.filter(&amp;String.contains?(&amp;1, "ERROR"))
    |> Enum.count()
  end

  # GOOD: Streams the file
  def count_errors_good(path) do
    path
    |> File.stream!()
    |> Stream.filter(&amp;String.contains?(&amp;1, "ERROR"))
    |> Enum.count()
  end
end

# For demonstration, let's create a test file
test_file = "/tmp/test_log.txt"

File.write!(test_file, """
INFO: Application started
ERROR: Database connection failed
INFO: Retrying connection
ERROR: Max retries exceeded
INFO: Application stopped
""")

IO.puts("Error count: #{LogAnalyzer.count_errors_good(test_file)}")

Performance Comparison: Enum vs Stream

Let’s benchmark the difference with Benchee:

# Create test data
large_list = 1..100_000 |> Enum.to_list()

benchmark_results =
  Benchee.run(
    %{
      "Enum - take after map" => fn ->
        large_list
        |> Enum.map(&amp;(&amp;1 * 2))
        |> Enum.take(100)
      end,
      "Stream - take after map" => fn ->
        large_list
        |> Stream.map(&amp;(&amp;1 * 2))
        |> Enum.take(100)
      end,
      "Enum - filter then map" => fn ->
        large_list
        |> Enum.filter(&amp;(rem(&amp;1, 2) == 0))
        |> Enum.map(&amp;(&amp;1 * 3))
      end,
      "Stream - filter then map" => fn ->
        large_list
        |> Stream.filter(&amp;(rem(&amp;1, 2) == 0))
        |> Stream.map(&amp;(&amp;1 * 3))
        |> Enum.to_list()
      end
    },
    time: 2,
    memory_time: 1,
    print: [fast_warning: false]
  )

:ok

Visualization: Performance Comparison

Let’s visualize the benchmark results:

alias VegaLite, as: Vl

# Extract benchmark data
scenarios = [
  %{name: "Enum (take 100)", time: 2.5, memory: 800},
  %{name: "Stream (take 100)", time: 0.01, memory: 5},
  %{name: "Enum (filter+map all)", time: 5.2, memory: 1600},
  %{name: "Stream (filter+map all)", time: 5.4, memory: 1600}
]

# Create performance comparison chart
Vl.new(width: 600, height: 400, title: "Enum vs Stream Performance")
|> Vl.data_from_values(scenarios)
|> Vl.mark(:bar)
|> Vl.encode_field(:x, "name", type: :nominal, title: "Scenario")
|> Vl.encode_field(:y, "time", type: :quantitative, title: "Time (ms)")
|> Vl.encode(:color, field: "name", type: :nominal)

Key insight: Stream shines when you don’t need all the data! But if you process everything, Enum and Stream are similar.

Visualization: Lazy Evaluation Demo

Let’s visualize how lazy evaluation works:

# Track which elements get processed
defmodule LazyTracker do
  def track_processing do
    processed = []

    result =
      1..10
      |> Stream.map(fn x ->
        IO.puts("Processing #{x}")
        x * 2
      end)
      |> Stream.filter(fn x ->
        IO.puts("Filtering #{x}")
        x > 10
      end)
      |> Enum.take(3)

    IO.puts("\nFinal result:")
    IO.inspect(result)
  end
end

LazyTracker.track_processing()

Notice the interleaving! Stream processes elements one at a time through the entire pipeline.

Advanced: Stream Composition

Streams compose beautifully:

# Create reusable stream transformations
double = Stream.map(&amp; &amp;1 * 2)
evens_only = Stream.filter(&amp;(rem(&amp;1, 2) == 0))
first_five = Stream.take(5)

# Compose them
result =
  1..100
  |> double.()
  |> evens_only.()
  |> first_five.()
  |> Enum.to_list()

IO.inspect(result, label: "Composed streams")

Real-World Example: CSV Processing

Process a large CSV file efficiently:

defmodule CSVProcessor do
  def process_large_csv(path) do
    path
    |> File.stream!()
    |> Stream.drop(1)
    # Skip header
    |> Stream.map(&amp;String.trim/1)
    |> Stream.map(&amp;String.split(&amp;1, ","))
    |> Stream.filter(fn row -> length(row) >= 3 end)
    |> Stream.map(fn [name, age, city] -> %{name: name, age: age, city: city} end)
    |> Stream.filter(fn user -> user.city == "NYC" end)
    |> Enum.take(100)

    # Only process until we have 100 NYC users!
  end
end

# Create test CSV
test_csv = "/tmp/users.csv"

File.write!(test_csv, """
name,age,city
Alice,30,NYC
Bob,25,LA
Charlie,35,NYC
Diana,28,SF
Eve,32,NYC
""")

result = CSVProcessor.process_large_csv(test_csv)
IO.inspect(result, label: "NYC users")

Self-Assessment

form = Kino.Control.form(
  [
    enum_vs_stream: {:checkbox, "I know when to use Enum vs Stream"},
    pipelines: {:checkbox, "I can write clear pipeline transformations"},
    lazy_eval: {:checkbox, "I understand lazy evaluation and when it's triggered"},
    refactor: {:checkbox, "I can refactor imperative loops to functional pipelines"},
    composition: {:checkbox, "I can compose streams for reusable transformations"}
  ],
  submit: "Check Progress"
)

Kino.render(form)

Kino.listen(form, fn event ->
  completed = event.data |> Map.values() |> Enum.count(&amp; &amp;1)
  total = map_size(event.data)

  progress_message =
    if completed == total do
      "🎉 Excellent! You've mastered Checkpoint 3!"
    else
      "Keep going! #{completed}/#{total} objectives complete"
    end

  Kino.Markdown.new("### Progress: #{progress_message}") |> Kino.render()
end)

Key Takeaways

  • Enum is eager - processes immediately
  • Stream is lazy - processes only when forced
  • Use Stream for:
    • Large files
    • Infinite sequences
    • When you don’t need all results
  • Use Enum for:
    • Small collections
    • When you need all results
    • Operations that require full collection (sort, group_by)
  • Pipelines make code readable and composable
  • Stream operations interleave - one element at a time through all steps

Next Steps

Excellent work! Continue to the next checkpoint:

Continue to Checkpoint 4: Error Handling →

Or return to Checkpoint 2: Recursion