Enum & Stream Mastery
Mix.install([])
Introduction
Master the Enum and Stream modules - the workhorses of Elixir data processing. Perfect for platform engineering tasks like log processing, metrics aggregation, data transformation, and platform observability.
Part 1: Enum Basics
map - Transform Each Element
# Exercise 1.1: Transform server names
servers = ["web-1", "web-2", "web-3"]
# TODO: Add environment prefix
prod_servers = Enum.map(servers, fn server -> "prod-#{server}" end)
IO.inspect(prod_servers)
# Using capture operator
prod_servers_short = Enum.map(servers, &"prod-#{&1}")
IO.inspect(prod_servers_short)
# Exercise 1.2: Extract specific fields
services = [
%{name: "api", port: 8080, status: :healthy},
%{name: "web", port: 8081, status: :degraded},
%{name: "db", port: 5432, status: :healthy}
]
# TODO: Extract just the names
names = Enum.map(services, & &1.name)
IO.inspect(names, label: "Service names")
# TODO: Create name-port pairs
name_ports = Enum.map(services, &{&1.name, &1.port})
IO.inspect(name_ports, label: "Name-port pairs")
filter - Keep Matching Elements
# Exercise 1.3: Filter by condition
numbers = 1..100 |> Enum.to_list()
# TODO: Keep only even numbers
evens = Enum.filter(numbers, &(rem(&1, 2) == 0))
IO.inspect(Enum.take(evens, 10), label: "First 10 evens")
# TODO: Keep numbers divisible by both 3 and 5
fizzbuzz = Enum.filter(numbers, &(rem(&1, 3) == 0 and rem(&1, 5) == 0))
IO.inspect(fizzbuzz, label: "FizzBuzz numbers")
# Exercise 1.4: Filter services
services = [
%{name: "api-1", status: :healthy, memory_mb: 512},
%{name: "api-2", status: :degraded, memory_mb: 256},
%{name: "db-1", status: :healthy, memory_mb: 2048},
%{name: "cache-1", status: :healthy, memory_mb: 128}
]
# TODO: Get only healthy services
healthy = Enum.filter(services, &(&1.status == :healthy))
IO.inspect(healthy, label: "Healthy services")
# TODO: Get services with memory > 500MB
high_memory = Enum.filter(services, &(&1.memory_mb > 500))
IO.inspect(high_memory, label: "High memory services")
reduce - Accumulate Results
# Exercise 1.5: Sum and count
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# TODO: Calculate sum
sum = Enum.reduce(numbers, 0, fn x, acc -> x + acc end)
IO.puts("Sum: #{sum}")
# Using shorter syntax
sum_short = Enum.reduce(numbers, 0, &(&1 + &2))
IO.puts("Sum (short): #{sum_short}")
# TODO: Find maximum
max = Enum.reduce(numbers, fn x, acc -> if x > acc, do: x, else: acc end)
IO.puts("Max: #{max}")
# Exercise 1.6: Aggregate service metrics
services = [
%{name: "api", requests: 1000, errors: 10},
%{name: "web", requests: 5000, errors: 50},
%{name: "db", requests: 2000, errors: 5}
]
# TODO: Calculate total requests and errors
totals =
Enum.reduce(services, %{requests: 0, errors: 0}, fn service, acc ->
%{
requests: acc.requests + service.requests,
errors: acc.errors + service.errors
}
end)
IO.inspect(totals, label: "Totals")
# TODO: Calculate error rate
error_rate = Float.round(totals.errors / totals.requests * 100, 2)
IO.puts("Error rate: #{error_rate}%")
Part 2: Advanced Enum Functions
group_by - Organize into Groups
# Exercise 2.1: Group by status
services = [
%{name: "api-1", status: :healthy, region: "us-east"},
%{name: "api-2", status: :degraded, region: "us-west"},
%{name: "db-1", status: :healthy, region: "us-east"},
%{name: "cache-1", status: :healthy, region: "us-west"}
]
# TODO: Group by status
by_status = Enum.group_by(services, & &1.status)
IO.inspect(by_status, label: "By status")
# TODO: Group by region
by_region = Enum.group_by(services, & &1.region)
IO.inspect(by_region, label: "By region")
# Exercise 2.2: Group logs by hour
logs = [
%{timestamp: "2024-11-01 10:15:30", level: :error},
%{timestamp: "2024-11-01 10:45:12", level: :warn},
%{timestamp: "2024-11-01 11:20:45", level: :error},
%{timestamp: "2024-11-01 11:50:30", level: :info}
]
# TODO: Group by hour
by_hour =
Enum.group_by(logs, fn log ->
log.timestamp |> String.slice(11, 2)
end)
IO.inspect(by_hour, label: "Logs by hour")
sort_by - Custom Sorting
# Exercise 2.3: Sort services
services = [
%{name: "cache", memory_mb: 128, priority: 3},
%{name: "api", memory_mb: 512, priority: 1},
%{name: "db", memory_mb: 2048, priority: 2}
]
# TODO: Sort by memory (ascending)
by_memory = Enum.sort_by(services, & &1.memory_mb)
IO.inspect(by_memory, label: "By memory")
# TODO: Sort by priority (ascending)
by_priority = Enum.sort_by(services, & &1.priority)
IO.inspect(by_priority, label: "By priority")
# TODO: Sort by memory (descending)
by_memory_desc = Enum.sort_by(services, & &1.memory_mb, :desc)
IO.inspect(by_memory_desc, label: "By memory desc")
chunk_every - Split into Batches
# Exercise 2.4: Batch processing
servers = Enum.to_list(1..20)
# TODO: Split into batches of 5
batches = Enum.chunk_every(servers, 5)
IO.inspect(batches, label: "Batches of 5")
# Simulate batch deployment
Enum.each(batches, fn batch ->
IO.puts("Deploying batch: #{inspect(batch)}")
# Process.sleep(1000) # Uncomment to see delay
end)
take / drop - Limit Results
# Exercise 2.5: Pagination
all_servers = Enum.map(1..100, &"server-#{&1}")
# TODO: Get first page (10 items)
page_1 = Enum.take(all_servers, 10)
IO.inspect(page_1, label: "Page 1")
# TODO: Get second page (skip 10, take 10)
page_2 = all_servers |> Enum.drop(10) |> Enum.take(10)
IO.inspect(page_2, label: "Page 2")
# TODO: Get last 5 servers
last_5 = Enum.take(all_servers, -5)
IO.inspect(last_5, label: "Last 5")
Part 3: Chaining Operations
# Exercise 3.1: Complex pipeline
logs = [
%{level: :info, service: "api", duration_ms: 45},
%{level: :error, service: "api", duration_ms: 120},
%{level: :warn, service: "db", duration_ms: 250},
%{level: :error, service: "db", duration_ms: 300},
%{level: :info, service: "web", duration_ms: 30},
%{level: :error, service: "web", duration_ms: 150}
]
# TODO: Find average duration of error logs
error_avg =
logs
|> Enum.filter(&(&1.level == :error))
|> Enum.map(& &1.duration_ms)
|> Enum.sum()
|> Kernel./(3)
IO.puts("Average error duration: #{error_avg}ms")
# TODO: Count errors by service
error_counts =
logs
|> Enum.filter(&(&1.level == :error))
|> Enum.group_by(& &1.service)
|> Enum.map(fn {service, errors} -> {service, length(errors)} end)
|> Map.new()
IO.inspect(error_counts, label: "Errors by service")
# Exercise 3.2: Process deployment data
deployments = [
%{service: "api", version: "1.0", success: true, duration: 120},
%{service: "api", version: "1.1", success: false, duration: 45},
%{service: "web", version: "2.0", success: true, duration: 90},
%{service: "web", version: "2.1", success: true, duration: 95},
%{service: "db", version: "3.0", success: true, duration: 200}
]
# TODO: Calculate success rate and average duration per service
stats =
deployments
|> Enum.group_by(& &1.service)
|> Enum.map(fn {service, deploys} ->
total = length(deploys)
successful = Enum.count(deploys, & &1.success)
avg_duration = Enum.sum(Enum.map(deploys, & &1.duration)) / total
{service, %{
total: total,
success_rate: Float.round(successful / total * 100, 1),
avg_duration: Float.round(avg_duration, 1)
}}
end)
|> Map.new()
IO.inspect(stats, label: "Deployment stats")
Part 4: Stream - Lazy Evaluation
# Exercise 4.1: Infinite sequences
# Stream doesn't compute until needed
# TODO: Generate infinite sequence
infinite = Stream.iterate(1, &(&1 + 1))
# Take first 10
first_10 = infinite |> Enum.take(10)
IO.inspect(first_10, label: "First 10 from infinite stream")
# TODO: Generate fibonacci
fib =
Stream.unfold({0, 1}, fn {a, b} ->
{a, {b, a + b}}
end)
first_fib = fib |> Enum.take(10)
IO.inspect(first_fib, label: "First 10 Fibonacci")
# Exercise 4.2: Lazy processing (memory efficient)
# Process large range without creating intermediate lists
result =
1..1_000_000
|> Stream.map(&(&1 * 2))
|> Stream.filter(&(rem(&1, 3) == 0))
|> Stream.take(10)
|> Enum.to_list()
IO.inspect(result, label: "Lazy result")
# Exercise 4.3: File processing (simulated)
# In real world, use File.stream! for large files
log_lines = [
"2024-11-01 ERROR: Connection failed",
"2024-11-01 INFO: Server started",
"2024-11-01 ERROR: Timeout",
"2024-11-01 WARN: High memory",
"2024-11-01 ERROR: Database down"
]
# TODO: Process with Stream
error_logs =
log_lines
|> Stream.filter(&String.contains?(&1, "ERROR"))
|> Stream.map(&String.split(&1, ": ", parts: 2))
|> Stream.map(fn [_, message] -> message end)
|> Enum.to_list()
IO.inspect(error_logs, label: "Error messages")
Challenge Exercises
Challenge 1: Metrics Aggregator
defmodule MetricsAggregator do
def process(metrics) do
# Aggregate by service, calculate min/max/avg
metrics
|> Enum.group_by(& &1.service)
|> Enum.map(fn {service, service_metrics} ->
values = Enum.map(service_metrics, & &1.value)
{service, %{
count: length(values),
min: Enum.min(values),
max: Enum.max(values),
avg: Float.round(Enum.sum(values) / length(values), 2)
}}
end)
|> Map.new()
end
end
# Test:
metrics = [
%{service: "api", value: 45, timestamp: 1},
%{service: "api", value: 52, timestamp: 2},
%{service: "api", value: 48, timestamp: 3},
%{service: "db", value: 120, timestamp: 1},
%{service: "db", value: 150, timestamp: 2}
]
aggregated = MetricsAggregator.process(metrics)
IO.inspect(aggregated, label: "Aggregated metrics")
Challenge 2: Top N Services
defmodule TopServices do
def by_errors(logs, n \\ 5) do
logs
|> Enum.filter(&(&1.level == :error))
|> Enum.group_by(& &1.service)
|> Enum.map(fn {service, errors} -> {service, length(errors)} end)
|> Enum.sort_by(fn {_, count} -> count end, :desc)
|> Enum.take(n)
end
def by_response_time(requests, n \\ 5) do
requests
|> Enum.group_by(& &1.service)
|> Enum.map(fn {service, reqs} ->
avg = Enum.sum(Enum.map(reqs, & &1.duration)) / length(reqs)
{service, Float.round(avg, 2)}
end)
|> Enum.sort_by(fn {_, avg} -> avg end, :desc)
|> Enum.take(n)
end
end
# Test:
logs = [
%{level: :error, service: "api"},
%{level: :error, service: "api"},
%{level: :error, service: "db"},
%{level: :info, service: "web"}
]
top_errors = TopServices.by_errors(logs, 2)
IO.inspect(top_errors, label: "Top error services")
Challenge 3: Data Pipeline
defmodule DataPipeline do
def process(events) do
events
|> Stream.reject(&(&1.type == :heartbeat))
|> Stream.map(&enrich/1)
|> Stream.filter(&valid?/1)
|> Stream.map(&transform/1)
|> Enum.to_list()
end
defp enrich(event) do
Map.put(event, :processed_at, DateTime.utc_now())
end
defp valid?(event) do
Map.has_key?(event, :service) and Map.has_key?(event, :value)
end
defp transform(event) do
%{
service: event.service,
value: event.value * 1.1,
timestamp: event.processed_at
}
end
end
# Test:
events = [
%{type: :metric, service: "api", value: 100},
%{type: :heartbeat, service: "api"},
%{type: :metric, service: "db", value: 200},
%{type: :metric, value: 150}
]
processed = DataPipeline.process(events)
IO.inspect(processed, label: "Processed events")
Practice Problems
Problem 1: Server Health Report
defmodule HealthReport do
def generate(servers) do
%{
total: length(servers),
by_status: count_by_status(servers),
by_region: count_by_region(servers),
high_memory: high_memory_servers(servers)
}
end
defp count_by_status(servers) do
servers
|> Enum.group_by(& &1.status)
|> Enum.map(fn {status, srvs} -> {status, length(srvs)} end)
|> Map.new()
end
defp count_by_region(servers) do
servers
|> Enum.group_by(& &1.region)
|> Enum.map(fn {region, srvs} -> {region, length(srvs)} end)
|> Map.new()
end
defp high_memory_servers(servers) do
servers
|> Enum.filter(&(&1.memory_mb > 1000))
|> Enum.map(& &1.name)
end
end
# Test:
servers = [
%{name: "api-1", status: :healthy, region: "us-east", memory_mb: 512},
%{name: "api-2", status: :degraded, region: "us-west", memory_mb: 256},
%{name: "db-1", status: :healthy, region: "us-east", memory_mb: 2048}
]
report = HealthReport.generate(servers)
IO.inspect(report, label: "Health report")
Key Takeaways
✓ Enum: Eager evaluation, processes immediately ✓ Stream: Lazy evaluation, memory efficient ✓ Chaining: Pipe operations for clarity ✓ map/filter/reduce: Core transformations ✓ group_by/sort_by: Organization and ordering ✓ Batch processing: chunk_every for large datasets
Next Steps
- Practice with real log files
- Build data processing pipelines
- Optimize performance with Stream
- Combine with pattern matching
Excellent work mastering Enum and Stream! 🎯