Powered by AppSignal & Oban Pro

Enum & Stream Mastery

04-enum-stream.livemd

Enum & Stream Mastery

Mix.install([])

Introduction

Master the Enum and Stream modules - the workhorses of Elixir data processing. Perfect for platform engineering tasks like log processing, metrics aggregation, data transformation, and platform observability.

Part 1: Enum Basics

map - Transform Each Element

# Exercise 1.1: Transform server names
servers = ["web-1", "web-2", "web-3"]

# TODO: Add environment prefix
prod_servers = Enum.map(servers, fn server -> "prod-#{server}" end)
IO.inspect(prod_servers)

# Using capture operator
prod_servers_short = Enum.map(servers, &"prod-#{&1}")
IO.inspect(prod_servers_short)
# Exercise 1.2: Extract specific fields
services = [
  %{name: "api", port: 8080, status: :healthy},
  %{name: "web", port: 8081, status: :degraded},
  %{name: "db", port: 5432, status: :healthy}
]

# TODO: Extract just the names
names = Enum.map(services, & &1.name)
IO.inspect(names, label: "Service names")

# TODO: Create name-port pairs
name_ports = Enum.map(services, &{&1.name, &1.port})
IO.inspect(name_ports, label: "Name-port pairs")

filter - Keep Matching Elements

# Exercise 1.3: Filter by condition
numbers = 1..100 |> Enum.to_list()

# TODO: Keep only even numbers
evens = Enum.filter(numbers, &(rem(&1, 2) == 0))
IO.inspect(Enum.take(evens, 10), label: "First 10 evens")

# TODO: Keep numbers divisible by both 3 and 5
fizzbuzz = Enum.filter(numbers, &(rem(&1, 3) == 0 and rem(&1, 5) == 0))
IO.inspect(fizzbuzz, label: "FizzBuzz numbers")
# Exercise 1.4: Filter services
services = [
  %{name: "api-1", status: :healthy, memory_mb: 512},
  %{name: "api-2", status: :degraded, memory_mb: 256},
  %{name: "db-1", status: :healthy, memory_mb: 2048},
  %{name: "cache-1", status: :healthy, memory_mb: 128}
]

# TODO: Get only healthy services
healthy = Enum.filter(services, &(&1.status == :healthy))
IO.inspect(healthy, label: "Healthy services")

# TODO: Get services with memory > 500MB
high_memory = Enum.filter(services, &(&1.memory_mb > 500))
IO.inspect(high_memory, label: "High memory services")

reduce - Accumulate Results

# Exercise 1.5: Sum and count
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# TODO: Calculate sum
sum = Enum.reduce(numbers, 0, fn x, acc -> x + acc end)
IO.puts("Sum: #{sum}")

# Using shorter syntax
sum_short = Enum.reduce(numbers, 0, &(&1 + &2))
IO.puts("Sum (short): #{sum_short}")

# TODO: Find maximum
max = Enum.reduce(numbers, fn x, acc -> if x > acc, do: x, else: acc end)
IO.puts("Max: #{max}")
# Exercise 1.6: Aggregate service metrics
services = [
  %{name: "api", requests: 1000, errors: 10},
  %{name: "web", requests: 5000, errors: 50},
  %{name: "db", requests: 2000, errors: 5}
]

# TODO: Calculate total requests and errors
totals =
  Enum.reduce(services, %{requests: 0, errors: 0}, fn service, acc ->
    %{
      requests: acc.requests + service.requests,
      errors: acc.errors + service.errors
    }
  end)

IO.inspect(totals, label: "Totals")

# TODO: Calculate error rate
error_rate = Float.round(totals.errors / totals.requests * 100, 2)
IO.puts("Error rate: #{error_rate}%")

Part 2: Advanced Enum Functions

group_by - Organize into Groups

# Exercise 2.1: Group by status
services = [
  %{name: "api-1", status: :healthy, region: "us-east"},
  %{name: "api-2", status: :degraded, region: "us-west"},
  %{name: "db-1", status: :healthy, region: "us-east"},
  %{name: "cache-1", status: :healthy, region: "us-west"}
]

# TODO: Group by status
by_status = Enum.group_by(services, & &1.status)
IO.inspect(by_status, label: "By status")

# TODO: Group by region
by_region = Enum.group_by(services, & &1.region)
IO.inspect(by_region, label: "By region")
# Exercise 2.2: Group logs by hour
logs = [
  %{timestamp: "2024-11-01 10:15:30", level: :error},
  %{timestamp: "2024-11-01 10:45:12", level: :warn},
  %{timestamp: "2024-11-01 11:20:45", level: :error},
  %{timestamp: "2024-11-01 11:50:30", level: :info}
]

# TODO: Group by hour
by_hour =
  Enum.group_by(logs, fn log ->
    log.timestamp |> String.slice(11, 2)
  end)

IO.inspect(by_hour, label: "Logs by hour")

sort_by - Custom Sorting

# Exercise 2.3: Sort services
services = [
  %{name: "cache", memory_mb: 128, priority: 3},
  %{name: "api", memory_mb: 512, priority: 1},
  %{name: "db", memory_mb: 2048, priority: 2}
]

# TODO: Sort by memory (ascending)
by_memory = Enum.sort_by(services, & &1.memory_mb)
IO.inspect(by_memory, label: "By memory")

# TODO: Sort by priority (ascending)
by_priority = Enum.sort_by(services, & &1.priority)
IO.inspect(by_priority, label: "By priority")

# TODO: Sort by memory (descending)
by_memory_desc = Enum.sort_by(services, & &1.memory_mb, :desc)
IO.inspect(by_memory_desc, label: "By memory desc")

chunk_every - Split into Batches

# Exercise 2.4: Batch processing
servers = Enum.to_list(1..20)

# TODO: Split into batches of 5
batches = Enum.chunk_every(servers, 5)
IO.inspect(batches, label: "Batches of 5")

# Simulate batch deployment
Enum.each(batches, fn batch ->
  IO.puts("Deploying batch: #{inspect(batch)}")
  # Process.sleep(1000)  # Uncomment to see delay
end)

take / drop - Limit Results

# Exercise 2.5: Pagination
all_servers = Enum.map(1..100, &"server-#{&1}")

# TODO: Get first page (10 items)
page_1 = Enum.take(all_servers, 10)
IO.inspect(page_1, label: "Page 1")

# TODO: Get second page (skip 10, take 10)
page_2 = all_servers |> Enum.drop(10) |> Enum.take(10)
IO.inspect(page_2, label: "Page 2")

# TODO: Get last 5 servers
last_5 = Enum.take(all_servers, -5)
IO.inspect(last_5, label: "Last 5")

Part 3: Chaining Operations

# Exercise 3.1: Complex pipeline
logs = [
  %{level: :info, service: "api", duration_ms: 45},
  %{level: :error, service: "api", duration_ms: 120},
  %{level: :warn, service: "db", duration_ms: 250},
  %{level: :error, service: "db", duration_ms: 300},
  %{level: :info, service: "web", duration_ms: 30},
  %{level: :error, service: "web", duration_ms: 150}
]

# TODO: Find average duration of error logs
error_avg =
  logs
  |> Enum.filter(&(&1.level == :error))
  |> Enum.map(& &1.duration_ms)
  |> Enum.sum()
  |> Kernel./(3)

IO.puts("Average error duration: #{error_avg}ms")

# TODO: Count errors by service
error_counts =
  logs
  |> Enum.filter(&(&1.level == :error))
  |> Enum.group_by(& &1.service)
  |> Enum.map(fn {service, errors} -> {service, length(errors)} end)
  |> Map.new()

IO.inspect(error_counts, label: "Errors by service")
# Exercise 3.2: Process deployment data
deployments = [
  %{service: "api", version: "1.0", success: true, duration: 120},
  %{service: "api", version: "1.1", success: false, duration: 45},
  %{service: "web", version: "2.0", success: true, duration: 90},
  %{service: "web", version: "2.1", success: true, duration: 95},
  %{service: "db", version: "3.0", success: true, duration: 200}
]

# TODO: Calculate success rate and average duration per service
stats =
  deployments
  |> Enum.group_by(& &1.service)
  |> Enum.map(fn {service, deploys} ->
    total = length(deploys)
    successful = Enum.count(deploys, & &1.success)
    avg_duration = Enum.sum(Enum.map(deploys, & &1.duration)) / total

    {service, %{
      total: total,
      success_rate: Float.round(successful / total * 100, 1),
      avg_duration: Float.round(avg_duration, 1)
    }}
  end)
  |> Map.new()

IO.inspect(stats, label: "Deployment stats")

Part 4: Stream - Lazy Evaluation

# Exercise 4.1: Infinite sequences
# Stream doesn't compute until needed

# TODO: Generate infinite sequence
infinite = Stream.iterate(1, &(&1 + 1))

# Take first 10
first_10 = infinite |> Enum.take(10)
IO.inspect(first_10, label: "First 10 from infinite stream")

# TODO: Generate fibonacci
fib =
  Stream.unfold({0, 1}, fn {a, b} ->
    {a, {b, a + b}}
  end)

first_fib = fib |> Enum.take(10)
IO.inspect(first_fib, label: "First 10 Fibonacci")
# Exercise 4.2: Lazy processing (memory efficient)
# Process large range without creating intermediate lists

result =
  1..1_000_000
  |> Stream.map(&(&1 * 2))
  |> Stream.filter(&(rem(&1, 3) == 0))
  |> Stream.take(10)
  |> Enum.to_list()

IO.inspect(result, label: "Lazy result")
# Exercise 4.3: File processing (simulated)
# In real world, use File.stream! for large files

log_lines = [
  "2024-11-01 ERROR: Connection failed",
  "2024-11-01 INFO: Server started",
  "2024-11-01 ERROR: Timeout",
  "2024-11-01 WARN: High memory",
  "2024-11-01 ERROR: Database down"
]

# TODO: Process with Stream
error_logs =
  log_lines
  |> Stream.filter(&String.contains?(&1, "ERROR"))
  |> Stream.map(&String.split(&1, ": ", parts: 2))
  |> Stream.map(fn [_, message] -> message end)
  |> Enum.to_list()

IO.inspect(error_logs, label: "Error messages")

Challenge Exercises

Challenge 1: Metrics Aggregator

defmodule MetricsAggregator do
  def process(metrics) do
    # Aggregate by service, calculate min/max/avg
    metrics
    |> Enum.group_by(& &1.service)
    |> Enum.map(fn {service, service_metrics} ->
      values = Enum.map(service_metrics, & &1.value)

      {service, %{
        count: length(values),
        min: Enum.min(values),
        max: Enum.max(values),
        avg: Float.round(Enum.sum(values) / length(values), 2)
      }}
    end)
    |> Map.new()
  end
end

# Test:
metrics = [
  %{service: "api", value: 45, timestamp: 1},
  %{service: "api", value: 52, timestamp: 2},
  %{service: "api", value: 48, timestamp: 3},
  %{service: "db", value: 120, timestamp: 1},
  %{service: "db", value: 150, timestamp: 2}
]

aggregated = MetricsAggregator.process(metrics)
IO.inspect(aggregated, label: "Aggregated metrics")

Challenge 2: Top N Services

defmodule TopServices do
  def by_errors(logs, n \\ 5) do
    logs
    |> Enum.filter(&(&1.level == :error))
    |> Enum.group_by(& &1.service)
    |> Enum.map(fn {service, errors} -> {service, length(errors)} end)
    |> Enum.sort_by(fn {_, count} -> count end, :desc)
    |> Enum.take(n)
  end

  def by_response_time(requests, n \\ 5) do
    requests
    |> Enum.group_by(& &1.service)
    |> Enum.map(fn {service, reqs} ->
      avg = Enum.sum(Enum.map(reqs, & &1.duration)) / length(reqs)
      {service, Float.round(avg, 2)}
    end)
    |> Enum.sort_by(fn {_, avg} -> avg end, :desc)
    |> Enum.take(n)
  end
end

# Test:
logs = [
  %{level: :error, service: "api"},
  %{level: :error, service: "api"},
  %{level: :error, service: "db"},
  %{level: :info, service: "web"}
]

top_errors = TopServices.by_errors(logs, 2)
IO.inspect(top_errors, label: "Top error services")

Challenge 3: Data Pipeline

defmodule DataPipeline do
  def process(events) do
    events
    |> Stream.reject(&(&1.type == :heartbeat))
    |> Stream.map(&enrich/1)
    |> Stream.filter(&valid?/1)
    |> Stream.map(&transform/1)
    |> Enum.to_list()
  end

  defp enrich(event) do
    Map.put(event, :processed_at, DateTime.utc_now())
  end

  defp valid?(event) do
    Map.has_key?(event, :service) and Map.has_key?(event, :value)
  end

  defp transform(event) do
    %{
      service: event.service,
      value: event.value * 1.1,
      timestamp: event.processed_at
    }
  end
end

# Test:
events = [
  %{type: :metric, service: "api", value: 100},
  %{type: :heartbeat, service: "api"},
  %{type: :metric, service: "db", value: 200},
  %{type: :metric, value: 150}
]

processed = DataPipeline.process(events)
IO.inspect(processed, label: "Processed events")

Practice Problems

Problem 1: Server Health Report

defmodule HealthReport do
  def generate(servers) do
    %{
      total: length(servers),
      by_status: count_by_status(servers),
      by_region: count_by_region(servers),
      high_memory: high_memory_servers(servers)
    }
  end

  defp count_by_status(servers) do
    servers
    |> Enum.group_by(& &1.status)
    |> Enum.map(fn {status, srvs} -> {status, length(srvs)} end)
    |> Map.new()
  end

  defp count_by_region(servers) do
    servers
    |> Enum.group_by(& &1.region)
    |> Enum.map(fn {region, srvs} -> {region, length(srvs)} end)
    |> Map.new()
  end

  defp high_memory_servers(servers) do
    servers
    |> Enum.filter(&(&1.memory_mb > 1000))
    |> Enum.map(& &1.name)
  end
end

# Test:
servers = [
  %{name: "api-1", status: :healthy, region: "us-east", memory_mb: 512},
  %{name: "api-2", status: :degraded, region: "us-west", memory_mb: 256},
  %{name: "db-1", status: :healthy, region: "us-east", memory_mb: 2048}
]

report = HealthReport.generate(servers)
IO.inspect(report, label: "Health report")

Key Takeaways

Enum: Eager evaluation, processes immediately ✓ Stream: Lazy evaluation, memory efficient ✓ Chaining: Pipe operations for clarity ✓ map/filter/reduce: Core transformations ✓ group_by/sort_by: Organization and ordering ✓ Batch processing: chunk_every for large datasets

Next Steps

  1. Practice with real log files
  2. Build data processing pipelines
  3. Optimize performance with Stream
  4. Combine with pattern matching

Excellent work mastering Enum and Stream! 🎯