GorillaStream: GenStage Pipeline Integration
Mix.install([
{:gorilla_stream, "~> 1.1"},
{:gen_stage, "~> 1.2"}
])
Introduction
This notebook demonstrates how to integrate GorillaStream with GenStage to build robust, backpressure-aware streaming data pipelines. You’ll learn how to:
- Create producer stages that generate time series data
- Build producer-consumer stages for compression processing
- Implement consumer stages for data storage and analytics
- Handle backpressure and demand-driven processing
- Scale processing with multiple concurrent stages
- Implement error handling and recovery
1. Basic GenStage Pipeline
Let’s start with a simple three-stage pipeline: Producer → Compressor → Consumer
defmodule DataProducer do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
sensor_type = Keyword.get(opts, :sensor_type, :temperature)
base_value = Keyword.get(opts, :base_value, 20.0)
state = %{
counter: 0,
sensor_type: sensor_type,
base_value: base_value,
start_time: System.system_time(:second)
}
IO.puts("🏭 DataProducer started for #{sensor_type} sensor")
{:producer, state}
end
def handle_demand(demand, state) when demand > 0 do
IO.puts("📊 DataProducer: generating #{demand} data points")
# Generate data points based on demand
events = for i <- state.counter..(state.counter + demand - 1) do
timestamp = state.start_time + i * 60 # Every minute
value = generate_sensor_value(state.sensor_type, state.base_value, i)
%{
timestamp: timestamp,
value: value,
sensor_type: state.sensor_type,
sequence: i,
generated_at: System.system_time(:millisecond)
}
end
new_state = %{state | counter: state.counter + demand}
{:noreply, events, new_state}
end
defp generate_sensor_value(:temperature, base, i) do
# Simulate daily temperature cycle with noise
daily_cycle = 5 * :math.sin(i / 144) # 24-hour cycle (1440 minutes / 10)
noise = (:rand.uniform() - 0.5) * 1.0
Float.round(base + daily_cycle + noise, 2)
end
defp generate_sensor_value(:humidity, base, i) do
# Simulate humidity variations
variation = 10 * :math.cos(i / 100) + (:rand.uniform() - 0.5) * 5
Float.round(base + variation, 2)
end
defp generate_sensor_value(:pressure, base, i) do
# Simulate atmospheric pressure changes
variation = 20 * :math.sin(i / 200) + (:rand.uniform() - 0.5) * 5
Float.round(base + variation, 2)
end
end
defmodule CompressionProcessor do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
compression_mode = Keyword.get(opts, :compression_mode, :individual)
use_zlib = Keyword.get(opts, :use_zlib, false)
state = %{
compression_mode: compression_mode,
use_zlib: use_zlib,
processed_count: 0,
total_original_size: 0,
total_compressed_size: 0,
errors: 0
}
IO.puts("🗜️ CompressionProcessor started (mode: #{compression_mode}, zlib: #{use_zlib})")
{:producer_consumer, state, subscribe_to: [DataProducer]}
end
def handle_events(events, _from, state) do
IO.puts("🗜️ CompressionProcessor: processing #{length(events)} events")
{compressed_events, new_state} = case state.compression_mode do
:individual -> compress_individual_events(events, state)
:batch -> compress_batch_events(events, state)
end
{:noreply, compressed_events, new_state}
end
defp compress_individual_events(events, state) do
{compressed_events, stats} = Enum.map_reduce(events, %{processed: 0, compressed_size: 0, errors: 0}, fn event, acc ->
# Convert to GorillaStream format
data_point = {event.timestamp, event.value}
case GorillaStream.compress([data_point], state.use_zlib) do
{:ok, compressed} ->
compressed_event = %{
original_event: event,
compressed_data: compressed,
compressed_size: byte_size(compressed),
compression_ratio: 16 / byte_size(compressed), # Approximate original size
compression_mode: :individual,
compressed_at: System.system_time(:millisecond),
status: :success
}
new_acc = %{
processed: acc.processed + 1,
compressed_size: acc.compressed_size + byte_size(compressed),
errors: acc.errors
}
{compressed_event, new_acc}
{:error, reason} ->
error_event = %{
original_event: event,
error: reason,
status: :error,
compressed_at: System.system_time(:millisecond)
}
new_acc = %{acc | errors: acc.errors + 1}
{error_event, new_acc}
end
end)
new_state = %{
state |
processed_count: state.processed_count + stats.processed,
total_original_size: state.total_original_size + length(events) * 16,
total_compressed_size: state.total_compressed_size + stats.compressed_size,
errors: state.errors + stats.errors
}
{compressed_events, new_state}
end
defp compress_batch_events(events, state) do
# Convert all events to GorillaStream format
data_points = Enum.map(events, fn event -> {event.timestamp, event.value} end)
case GorillaStream.compress(data_points, state.use_zlib) do
{:ok, compressed} ->
compressed_event = %{
original_events: events,
compressed_data: compressed,
compressed_size: byte_size(compressed),
points_count: length(events),
compression_ratio: (length(events) * 16) / byte_size(compressed),
compression_mode: :batch,
compressed_at: System.system_time(:millisecond),
status: :success
}
new_state = %{
state |
processed_count: state.processed_count + length(events),
total_original_size: state.total_original_size + length(events) * 16,
total_compressed_size: state.total_compressed_size + byte_size(compressed)
}
{[compressed_event], new_state}
{:error, reason} ->
error_event = %{
original_events: events,
error: reason,
status: :error,
compressed_at: System.system_time(:millisecond)
}
new_state = %{state | errors: state.errors + 1}
{[error_event], new_state}
end
end
end
defmodule DataConsumer do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
storage_type = Keyword.get(opts, :storage_type, :memory)
state = %{
storage_type: storage_type,
stored_data: [],
processed_count: 0,
successful_compressions: 0,
failed_compressions: 0,
total_compressed_size: 0,
start_time: System.monotonic_time(:millisecond)
}
IO.puts("💾 DataConsumer started (storage: #{storage_type})")
{:consumer, state, subscribe_to: [CompressionProcessor]}
end
def handle_events(events, _from, state) do
IO.puts("💾 DataConsumer: consuming #{length(events)} compressed events")
# Process each compressed event
{new_stored_data, stats} = Enum.map_reduce(events, state, fn event, acc ->
case event.status do
:success ->
stored_item = store_compressed_data(event, acc.storage_type)
new_acc = %{
acc |
successful_compressions: acc.successful_compressions + get_points_count(event),
total_compressed_size: acc.total_compressed_size + event.compressed_size
}
{stored_item, new_acc}
:error ->
IO.puts("❌ Compression error: #{event.error}")
new_acc = %{
acc |
failed_compressions: acc.failed_compressions + get_points_count(event)
}
{nil, new_acc}
end
end)
# Update state with new data
valid_items = Enum.reject(new_stored_data, &is_nil/1)
new_state = %{
stats |
stored_data: stats.stored_data ++ valid_items,
processed_count: stats.processed_count + length(events)
}
# Periodically report statistics
current_time = System.monotonic_time(:millisecond)
if rem(new_state.processed_count, 10) == 0 do
report_statistics(new_state, current_time)
end
{:noreply, [], new_state}
end
defp get_points_count(%{points_count: count}), do: count
defp get_points_count(%{original_event: _}), do: 1
defp get_points_count(%{original_events: events}), do: length(events)
defp get_points_count(_), do: 0
defp store_compressed_data(event, :memory) do
# Store in memory (for demo)
%{
stored_at: System.system_time(:millisecond),
compressed_size: event.compressed_size,
compression_ratio: event.compression_ratio,
compression_mode: event.compression_mode,
points_count: get_points_count(event)
}
end
defp store_compressed_data(event, :file) do
# Simulate file storage
filename = "compressed_#{System.system_time(:millisecond)}.gorilla"
# In reality: File.write!(filename, event.compressed_data)
%{
stored_at: System.system_time(:millisecond),
filename: filename,
compressed_size: event.compressed_size,
compression_ratio: event.compression_ratio,
compression_mode: event.compression_mode,
points_count: get_points_count(event)
}
end
defp report_statistics(state, current_time) do
elapsed_ms = current_time - state.start_time
elapsed_seconds = elapsed_ms / 1000
total_points = state.successful_compressions + state.failed_compressions
processing_rate = if elapsed_seconds > 0, do: total_points / elapsed_seconds, else: 0
success_rate = if total_points > 0, do: state.successful_compressions / total_points * 100, else: 0
avg_compression_ratio = if state.successful_compressions > 0 do
# Calculate average compression ratio from stored data
total_ratio =
state.stored_data
|> Enum.map(& &1.compression_ratio)
|> Enum.sum()
total_ratio / length(state.stored_data)
else
0
end
IO.puts("\n📈 === Pipeline Statistics ===")
IO.puts(" Elapsed time: #{Float.round(elapsed_seconds, 1)}s")
IO.puts(" Processing rate: #{Float.round(processing_rate, 1)} points/sec")
IO.puts(" Successful compressions: #{state.successful_compressions}")
IO.puts(" Failed compressions: #{state.failed_compressions}")
IO.puts(" Success rate: #{Float.round(success_rate, 1)}%")
IO.puts(" Total compressed size: #{state.total_compressed_size} bytes")
IO.puts(" Average compression ratio: #{Float.round(avg_compression_ratio, 2)}:1")
IO.puts(" Items stored: #{length(state.stored_data)}")
end
def get_stats() do
GenStage.call(__MODULE__, :get_stats)
end
def handle_call(:get_stats, _from, state) do
{:reply, state, [], state}
end
end
Now let’s start the basic pipeline:
# Start the GenStage pipeline
{:ok, _producer} = DataProducer.start_link(sensor_type: :temperature, base_value: 22.0)
{:ok, _processor} = CompressionProcessor.start_link(compression_mode: :individual, use_zlib: false)
{:ok, _consumer} = DataConsumer.start_link(storage_type: :memory)
IO.puts("🚀 Basic GenStage pipeline started!")
IO.puts(" Producer generates temperature sensor data")
IO.puts(" Processor compresses individual points")
IO.puts(" Consumer stores compressed data in memory")
# Let the pipeline run for a bit
Process.sleep(5000)
# Get final statistics
final_stats = DataConsumer.get_stats()
IO.puts("\n🏁 === Final Pipeline Results ===")
IO.puts(" Total items processed: #{final_stats.processed_count}")
IO.puts(" Successful compressions: #{final_stats.successful_compressions}")
IO.puts(" Failed compressions: #{final_stats.failed_compressions}")
IO.puts(" Total compressed size: #{final_stats.total_compressed_size} bytes")
IO.puts(" Items in storage: #{length(final_stats.stored_data)}")
2. Multi-Stage Pipeline with Different Compression Strategies
Let’s build a more complex pipeline that handles multiple data sources and compression strategies:
defmodule MultiSensorProducer do
use GenStage
def start_link(opts) do
name = Keyword.get(opts, :name, __MODULE__)
GenStage.start_link(__MODULE__, opts, name: name)
end
def init(opts) do
sensors = Keyword.get(opts, :sensors, [])
rate_limit = Keyword.get(opts, :rate_limit, 1000) # Max points per second
state = %{
sensors: sensors,
counter: 0,
rate_limit: rate_limit,
last_batch_time: System.monotonic_time(:millisecond)
}
IO.puts("🏭 MultiSensorProducer started with #{length(sensors)} sensors")
{:producer, state}
end
def handle_demand(demand, state) when demand > 0 do
current_time = System.monotonic_time(:millisecond)
time_since_last = current_time - state.last_batch_time
# Apply rate limiting
max_points = min(demand, div(state.rate_limit * time_since_last, 1000) + 1)
actual_demand = max(1, max_points)
# Generate data from all sensors
events = generate_multi_sensor_data(state.sensors, state.counter, actual_demand)
new_state = %{
state |
counter: state.counter + actual_demand,
last_batch_time: current_time
}
IO.puts("🏭 MultiSensorProducer: generated #{length(events)} data points from #{length(state.sensors)} sensors")
{:noreply, events, new_state}
end
defp generate_multi_sensor_data(sensors, base_counter, demand_per_sensor) do
sensors
|> Enum.with_index()
|> Enum.flat_map(fn {{sensor_id, sensor_config}, sensor_index} ->
for i <- 0..(demand_per_sensor - 1) do
counter = base_counter + i
timestamp = System.system_time(:second) + counter * 60
value = case sensor_config.type do
:temperature ->
base = sensor_config.base_value
base + 5 * :math.sin((counter + sensor_index * 100) / 144) + (:rand.uniform() - 0.5) * 1.0
:humidity ->
base = sensor_config.base_value
base + 10 * :math.cos((counter + sensor_index * 150) / 100) + (:rand.uniform() - 0.5) * 5.0
:pressure ->
base = sensor_config.base_value
base + 20 * :math.sin((counter + sensor_index * 200) / 200) + (:rand.uniform() - 0.5) * 3.0
end
%{
sensor_id: sensor_id,
sensor_type: sensor_config.type,
timestamp: timestamp,
value: Float.round(value, 2),
sequence: counter,
generated_at: System.system_time(:millisecond)
}
end
end)
end
end
defmodule AdaptiveCompressionProcessor do
use GenStage
def start_link(opts) do
name = Keyword.get(opts, :name, __MODULE__)
GenStage.start_link(__MODULE__, opts, name: name)
end
def init(opts) do
producer = Keyword.get(opts, :subscribe_to, [])
batch_threshold = Keyword.get(opts, :batch_threshold, 5)
use_zlib = Keyword.get(opts, :use_zlib, false)
state = %{
batch_threshold: batch_threshold,
use_zlib: use_zlib,
buffer: %{}, # Buffer per sensor
processed_count: 0,
total_compressed_size: 0,
compression_stats: %{}
}
IO.puts("🗜️ AdaptiveCompressionProcessor started (batch_threshold: #{batch_threshold}, zlib: #{use_zlib})")
{:producer_consumer, state, subscribe_to: producer}
end
def handle_events(events, _from, state) do
IO.puts("🗜️ AdaptiveCompressionProcessor: processing #{length(events)} events from multiple sensors")
# Group events by sensor
events_by_sensor = Enum.group_by(events, & &1.sensor_id)
# Process each sensor's data adaptively
{compressed_events, new_state} =
Enum.map_reduce(events_by_sensor, state, fn {sensor_id, sensor_events}, acc ->
process_sensor_events(sensor_id, sensor_events, acc)
end)
all_compressed_events = List.flatten(compressed_events)
{:noreply, all_compressed_events, new_state}
end
defp process_sensor_events(sensor_id, events, state) do
# Add events to sensor buffer
current_buffer = Map.get(state.buffer, sensor_id, [])
new_buffer = current_buffer ++ events
if length(new_buffer) >= state.batch_threshold do
# Process as batch
case compress_sensor_batch(sensor_id, new_buffer, state.use_zlib) do
{:ok, compressed_event} ->
# Update compression stats
sensor_stats = Map.get(state.compression_stats, sensor_id, %{batches: 0, individuals: 0, total_size: 0})
new_sensor_stats = %{
sensor_stats |
batches: sensor_stats.batches + 1,
total_size: sensor_stats.total_size + compressed_event.compressed_size
}
new_state = %{
state |
buffer: Map.put(state.buffer, sensor_id, []), # Clear buffer
processed_count: state.processed_count + length(new_buffer),
total_compressed_size: state.total_compressed_size + compressed_event.compressed_size,
compression_stats: Map.put(state.compression_stats, sensor_id, new_sensor_stats)
}
{[compressed_event], new_state}
{:error, reason} ->
IO.puts("❌ Batch compression failed for #{sensor_id}: #{reason}")
# Process individually as fallback
{individual_events, fallback_state} = compress_individually(sensor_id, new_buffer, state)
fallback_state = %{fallback_state | buffer: Map.put(fallback_state.buffer, sensor_id, [])}
{individual_events, fallback_state}
end
else
# Keep buffering
new_state = %{state | buffer: Map.put(state.buffer, sensor_id, new_buffer)}
{[], new_state}
end
end
defp compress_sensor_batch(sensor_id, events, use_zlib) do
data_points = Enum.map(events, fn event -> {event.timestamp, event.value} end)
case GorillaStream.compress(data_points, use_zlib) do
{:ok, compressed} ->
compressed_event = %{
sensor_id: sensor_id,
sensor_type: (List.first(events)).sensor_type,
original_events: events,
compressed_data: compressed,
compressed_size: byte_size(compressed),
points_count: length(events),
compression_ratio: (length(events) * 16) / byte_size(compressed),
compression_mode: :batch,
compressed_at: System.system_time(:millisecond),
status: :success
}
{:ok, compressed_event}
{:error, reason} ->
{:error, reason}
end
end
defp compress_individually(sensor_id, events, state) do
{compressed_events, stats} = Enum.map_reduce(events, %{count: 0, size: 0}, fn event, acc ->
data_point = {event.timestamp, event.value}
case GorillaStream.compress([data_point], state.use_zlib) do
{:ok, compressed} ->
compressed_event = %{
sensor_id: sensor_id,
sensor_type: event.sensor_type,
original_event: event,
compressed_data: compressed,
compressed_size: byte_size(compressed),
points_count: 1,
compression_ratio: 16 / byte_size(compressed),
compression_mode: :individual,
compressed_at: System.system_time(:millisecond),
status: :success
}
new_acc = %{count: acc.count + 1, size: acc.size + byte_size(compressed)}
{compressed_event, new_acc}
{:error, reason} ->
error_event = %{
sensor_id: sensor_id,
original_event: event,
error: reason,
status: :error,
compressed_at: System.system_time(:millisecond)
}
{error_event, acc}
end
end)
# Update sensor stats for individual processing
sensor_stats = Map.get(state.compression_stats, sensor_id, %{batches: 0, individuals: 0, total_size: 0})
new_sensor_stats = %{
sensor_stats |
individuals: sensor_stats.individuals + stats.count,
total_size: sensor_stats.total_size + stats.size
}
new_state = %{
state |
processed_count: state.processed_count + length(events),
total_compressed_size: state.total_compressed_size + stats.size,
compression_stats: Map.put(state.compression_stats, sensor_id, new_sensor_stats)
}
{compressed_events, new_state}
end
end
defmodule AnalyticsConsumer do
use GenStage
def start_link(opts) do
name = Keyword.get(opts, :name, __MODULE__)
GenStage.start_link(__MODULE__, opts, name: name)
end
def init(opts) do
producer = Keyword.get(opts, :subscribe_to, [])
report_interval = Keyword.get(opts, :report_interval, 5000)
# Schedule periodic analytics reports
:timer.send_interval(report_interval, self(), :analytics_report)
state = %{
sensor_analytics: %{},
processed_count: 0,
start_time: System.monotonic_time(:millisecond),
last_report_time: System.monotonic_time(:millisecond)
}
IO.puts("📊 AnalyticsConsumer started (reports every #{report_interval}ms)")
{:consumer, state, subscribe_to: producer}
end
def handle_events(events, _from, state) do
IO.puts("📊 AnalyticsConsumer: analyzing #{length(events)} compressed events")
# Update analytics for each sensor
new_sensor_analytics =
Enum.reduce(events, state.sensor_analytics, fn event, analytics ->
case event.status do
:success ->
sensor_id = event.sensor_id
current_stats = Map.get(analytics, sensor_id, %{
total_points: 0,
total_compressed_size: 0,
batch_compressions: 0,
individual_compressions: 0,
compression_ratios: [],
sensor_type: event.sensor_type
})
new_stats = %{
current_stats |
total_points: current_stats.total_points + event.points_count,
total_compressed_size: current_stats.total_compressed_size + event.compressed_size,
batch_compressions: current_stats.batch_compressions + if(event.compression_mode == :batch, do: 1, else: 0),
individual_compressions: current_stats.individual_compressions + if(event.compression_mode == :individual, do: 1, else: 0),
compression_ratios: [event.compression_ratio | current_stats.compression_ratios]
}
Map.put(analytics, sensor_id, new_stats)
:error ->
# Log error but don't update stats
IO.puts("⚠️ Analytics: Skipping failed compression for #{event.sensor_id}")
analytics
end
end)
new_state = %{
state |
sensor_analytics: new_sensor_analytics,
processed_count: state.processed_count + length(events)
}
{:noreply, [], new_state}
end
def handle_info(:analytics_report, state) do
current_time = System.monotonic_time(:millisecond)
report_analytics(state, current_time)
new_state = %{state | last_report_time: current_time}
{:noreply, [], new_state}
end
defp report_analytics(state, current_time) do
elapsed_seconds = (current_time - state.start_time) / 1000
report_elapsed = (current_time - state.last_report_time) / 1000
IO.puts("\n📈 === Multi-Sensor Analytics Report ===")
IO.puts(" Report interval: #{Float.round(report_elapsed, 1)}s")
IO.puts(" Total elapsed: #{Float.round(elapsed_seconds, 1)}s")
IO.puts(" Events processed: #{state.processed_count}")
if map_size(state.sensor_analytics) > 0 do
IO.puts("\n 📊 Per-Sensor Statistics:")
Enum.each(state.sensor_analytics, fn {sensor_id, stats} ->
avg_compression_ratio = if length(stats.compression_ratios) > 0 do
Enum.sum(stats.compression_ratios) / length(stats.compression_ratios)
else
0
end
processing_rate = if elapsed_seconds > 0, do: stats.total_points / elapsed_seconds, else: 0
IO.puts(" #{sensor_id} (#{stats.sensor_type}):")
IO.puts(" • Points: #{stats.total_points} @ #{Float.round(processing_rate, 1)} pts/sec")
IO.puts(" • Compressed size: #{stats.total_compressed_size} bytes")
IO.puts(" • Avg compression ratio: #{Float.round(avg_compression_ratio, 2)}:1")
IO.puts(" • Batch compressions: #{stats.batch_compressions}")
IO.puts(" • Individual compressions: #{stats.individual_compressions}")
end)
# Overall statistics
total_points = Enum.sum(Enum.map(state.sensor_analytics, fn {_, stats} -> stats.total_points end))
total_size = Enum.sum(Enum.map(state.sensor_analytics, fn {_, stats} -> stats.total_compressed_size end))
overall_rate = if elapsed_seconds > 0, do: total_points / elapsed_seconds, else: 0
IO.puts("\n 🌍 Overall Statistics:")
IO.puts(" • Total points across all sensors: #{total_points}")
IO.puts(" • Total compressed size: #{total_size} bytes")
IO.puts(" • Overall processing rate: #{Float.round(overall_rate, 1)} points/sec")
end
end
def get_analytics() do
GenStage.call(__MODULE__, :get_analytics)
end
def handle_call(:get_analytics, _from, state) do
{:reply, state.sensor_analytics, [], state}
end
end
Now let’s start the multi-stage pipeline:
# Define multiple sensors with different characteristics
sensors = [
{"temp_sensor_01", %{type: :temperature, base_value: 22.0}},
{"temp_sensor_02", %{type: :temperature, base_value: 18.5}},
{"humidity_sensor_01", %{type: :humidity, base_value: 65.0}},
{"pressure_sensor_01", %{type: :pressure, base_value: 1013.25}}
]
# Start the multi-stage pipeline
{:ok, _multi_producer} = MultiSensorProducer.start_link(
name: :multi_producer,
sensors: sensors,
rate_limit: 2000
)
{:ok, _adaptive_processor} = AdaptiveCompressionProcessor.start_link(
name: :adaptive_processor,
subscribe_to: [:multi_producer],
batch_threshold: 3,
use_zlib: true
)
{:ok, _analytics_consumer} = AnalyticsConsumer.start_link(
name: :analytics_consumer,
subscribe_to: [:adaptive_processor],
report_interval: 3000
)
IO.puts("🚀 Multi-stage GenStage pipeline started!")
IO.puts(" MultiSensorProducer: #{length(sensors)} sensors with rate limiting")
IO.puts(" AdaptiveCompressionProcessor: batch threshold=3, zlib enabled")
IO.puts(" AnalyticsConsumer: reports every 3 seconds")
# Let the pipeline run for a while
Process.sleep(12000)
# Get final analytics
final_analytics = AnalyticsConsumer.get_analytics()
IO.puts("\n🏁 === Final Multi-Sensor Results ===")
Enum.each(final_analytics, fn {sensor_id, stats} ->
avg_ratio = if length(stats.compression_ratios) > 0 do
Enum.sum(stats.compression_ratios) / length(stats.compression_ratios)
else
0
end
batch_pct = if stats.batch_compressions + stats.individual_compressions > 0 do
stats.batch_compressions / (stats.batch_compressions + stats.individual_compressions) * 100
else
0
end
IO.puts("#{sensor_id}:")
IO.puts(" Points: #{stats.total_points}, Size: #{stats.total_compressed_size}B")
IO.puts(" Avg ratio: #{Float.round(avg_ratio, 2)}:1, Batch %: #{Float.round(batch_pct, 1)}%")
end)
3. Error Handling and Recovery
Let’s demonstrate robust error handling in GenStage pipelines:
defmodule FaultyProducer do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
error_rate = Keyword.get(opts, :error_rate, 0.1) # 10% error rate
state = %{
counter: 0,
error_rate: error_rate,
errors_generated: 0
}
IO.puts("🏭 FaultyProducer started (error rate: #{Float.round(error_rate * 100, 1)}%)")
{:producer, state}
end
def handle_demand(demand, state) when demand > 0 do
events = for i <- state.counter..(state.counter + demand - 1) do
# Randomly introduce faulty data
if :rand.uniform() < state.error_rate do
# Generate invalid data
%{
timestamp: "invalid_timestamp", # Wrong type
value: nil, # Missing value
sequence: i,
faulty: true,
generated_at: System.system_time(:millisecond)
}
else
# Generate valid data
timestamp = System.system_time(:second) + i * 60
value = 20.0 + :math.sin(i / 100) * 5 + (:rand.uniform() - 0.5) * 1.0
%{
timestamp: timestamp,
value: Float.round(value, 2),
sequence: i,
faulty: false,
generated_at: System.system_time(:millisecond)
}
end
end
errors_in_batch = Enum.count(events, & &1.faulty)
new_state = %{
state |
counter: state.counter + demand,
errors_generated: state.errors_generated + errors_in_batch
}
if errors_in_batch > 0 do
IO.puts("🏭 FaultyProducer: generated #{demand} events (#{errors_in_batch} faulty)")
end
{:noreply, events, new_state}
end
end
defmodule ResilientProcessor do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
state = %{
processed_count: 0,
successful_compressions: 0,
failed_compressions: 0,
validation_errors: 0,
compression_errors: 0,
recovered_errors: 0
}
IO.puts("🛡️ ResilientProcessor started with comprehensive error handling")
{:producer_consumer, state, subscribe_to: [FaultyProducer]}
end
def handle_events(events, _from, state) do
IO.puts("🛡️ ResilientProcessor: processing #{length(events)} events (with error handling)")
{processed_events, new_state} =
Enum.map_reduce(events, state, &process_event_with_recovery/2)
successful_events = Enum.filter(processed_events, &(&1.status == :success))
IO.puts(" ✅ Successfully processed: #{length(successful_events)}/#{length(events)}")
{:noreply, processed_events, new_state}
end
defp process_event_with_recovery(event, state) do
try do
# Step 1: Validate data
case validate_event(event) do
{:ok, validated_event} ->
# Step 2: Attempt compression
case attempt_compression(validated_event) do
{:ok, compressed_result} ->
result = %{
original_event: event,
compressed_data: compressed_result.compressed,
compressed_size: compressed_result.size,
compression_ratio: compressed_result.ratio,
status: :success,
processed_at: System.system_time(:millisecond)
}
new_state = %{
state |
processed_count: state.processed_count + 1,
successful_compressions: state.successful_compressions + 1
}
{result, new_state}
{:error, compression_reason} ->
# Try recovery strategies
case attempt_recovery(validated_event, compression_reason, state) do
{:ok, recovered_result} ->
result = %{
original_event: event,
compressed_data: recovered_result.compressed,
compressed_size: recovered_result.size,
compression_ratio: recovered_result.ratio,
status: :recovered,
recovery_method: recovered_result.method,
original_error: compression_reason,
processed_at: System.system_time(:millisecond)
}
new_state = %{
state |
processed_count: state.processed_count + 1,
successful_compressions: state.successful_compressions + 1,
recovered_errors: state.recovered_errors + 1
}
IO.puts(" 🔧 Recovered from compression error using #{recovered_result.method}")
{result, new_state}
{:error, _recovery_reason} ->
result = %{
original_event: event,
error: compression_reason,
status: :compression_failed,
processed_at: System.system_time(:millisecond)
}
new_state = %{
state |
processed_count: state.processed_count + 1,
compression_errors: state.compression_errors + 1
}
{result, new_state}
end
end
{:error, validation_reason} ->
result = %{
original_event: event,
error: validation_reason,
status: :validation_failed,
processed_at: System.system_time(:millisecond)
}
new_state = %{
state |
processed_count: state.processed_count + 1,
validation_errors: state.validation_errors + 1
}
{result, new_state}
end
rescue
exception ->
# Catch any unexpected errors
result = %{
original_event: event,
error: "Unexpected exception: #{Exception.message(exception)}",
status: :exception,
processed_at: System.system_time(:millisecond)
}
new_state = %{
state |
processed_count: state.processed_count + 1,
failed_compressions: state.failed_compressions + 1
}
IO.puts(" ⚠️ Unexpected exception: #{Exception.message(exception)}")
{result, new_state}
end
end
defp validate_event(event) do
cond do
not is_integer(event.timestamp) ->
{:error, "Invalid timestamp type: #{inspect(event.timestamp)}"}
not is_number(event.value) or is_nil(event.value) ->
{:error, "Invalid value: #{inspect(event.value)}"}
true ->
{:ok, event}
end
end
defp attempt_compression(event) do
data_point = {event.timestamp, event.value}
case GorillaStream.compress([data_point]) do
{:ok, compressed} ->
{:ok, %{
compressed: compressed,
size: byte_size(compressed),
ratio: 16 / byte_size(compressed)
}}
{:error, reason} ->
{:error, reason}
end
end
defp attempt_recovery(event, _compression_reason, _state) do
# Recovery strategy 1: Try with sanitized data
sanitized_timestamp = sanitize_timestamp(event.timestamp)
sanitized_value = sanitize_value(event.value)
if sanitized_timestamp && sanitized_value do
case GorillaStream.compress([{sanitized_timestamp, sanitized_value}]) do
{:ok, compressed} ->
{:ok, %{
compressed: compressed,
size: byte_size(compressed),
ratio: 16 / byte_size(compressed),
method: "data_sanitization"
}}
{:error, _} ->
# Recovery strategy 2: Use default values
attempt_default_recovery(event)
end
else
attempt_default_recovery(event)
end
end
defp attempt_default_recovery(_event) do
# Last resort: use a default data point
default_timestamp = System.system_time(:second)
default_value = 0.0
case GorillaStream.compress([{default_timestamp, default_value}]) do
{:ok, compressed} ->
{:ok, %{
compressed: compressed,
size: byte_size(compressed),
ratio: 16 / byte_size(compressed),
method: "default_values"
}}
{:error, reason} ->
{:error, "All recovery strategies failed: #{reason}"}
end
end
defp sanitize_timestamp(timestamp) when is_integer(timestamp), do: timestamp
defp sanitize_timestamp(_), do: System.system_time(:second)
defp sanitize_value(value) when is_number(value), do: value
defp sanitize_value(_), do: 0.0
end
defmodule ErrorTrackingConsumer do
use GenStage
def start_link(opts) do
GenStage.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
# Schedule periodic error reports
:timer.send_interval(3000, self(), :error_report)
state = %{
total_processed: 0,
successful: 0,
recovered: 0,
validation_failed: 0,
compression_failed: 0,
exceptions: 0,
error_details: [],
start_time: System.monotonic_time(:millisecond)
}
IO.puts("📊 ErrorTrackingConsumer started with detailed error analytics")
{:consumer, state, subscribe_to: [ResilientProcessor]}
end
def handle_events(events, _from, state) do
# Categorize events by status
categorized = Enum.group_by(events, & &1.status)
successful = length(Map.get(categorized, :success, []))
recovered = length(Map.get(categorized, :recovered, []))
validation_failed = length(Map.get(categorized, :validation_failed, []))
compression_failed = length(Map.get(categorized, :compression_failed, []))
exceptions = length(Map.get(categorized, :exception, []))
# Collect error details
error_events = events
|> Enum.filter(&(&1.status != :success))
|> Enum.map(&extract_error_details/1)
new_state = %{
state |
total_processed: state.total_processed + length(events),
successful: state.successful + successful,
recovered: state.recovered + recovered,
validation_failed: state.validation_failed + validation_failed,
compression_failed: state.compression_failed + compression_failed,
exceptions: state.exceptions + exceptions,
error_details: state.error_details ++ error_events
}
IO.puts("📊 ErrorTrackingConsumer: #{length(events)} events (#{successful} ok, #{recovered} recovered, #{length(events) - successful - recovered} errors)")
{:noreply, [], new_state}
end
def handle_info(:error_report, state) do
current_time = System.monotonic_time(:millisecond)
elapsed_seconds = (current_time - state.start_time) / 1000
IO.puts("\n🚨 === Error Tracking Report ===")
IO.puts(" Elapsed time: #{Float.round(elapsed_seconds, 1)}s")
IO.puts(" Total processed: #{state.total_processed}")
if state.total_processed > 0 do
success_rate = state.successful / state.total_processed * 100
recovery_rate = state.recovered / state.total_processed * 100
error_rate = (state.total_processed - state.successful - state.recovered) / state.total_processed * 100
IO.puts(" Success rate: #{Float.round(success_rate, 1)}%")
IO.puts(" Recovery rate: #{Float.round(recovery_rate, 1)}%")
IO.puts(" Error rate: #{Float.round(error_rate, 1)}%")
IO.puts("\n Error Breakdown:")
IO.puts(" • Validation errors: #{state.validation_failed}")
IO.puts(" • Compression errors: #{state.compression_failed}")
IO.puts(" • Exceptions: #{state.exceptions}")
# Show recent error details
recent_errors = Enum.take(state.error_details, -5)
if length(recent_errors) > 0 do
IO.puts("\n Recent Errors:")
Enum.each(recent_errors, fn error ->
IO.puts(" • #{error.type}: #{error.message}")
end)
end
end
{:noreply, [], state}
end
defp extract_error_details(event) do
%{
type: event.status,
message: event.error,
timestamp: event.processed_at,
original_faulty: Map.get(event.original_event, :faulty, false)
}
end
def get_error_stats() do
GenStage.call(__MODULE__, :get_stats)
end
def handle_call(:get_stats, _from, state) do
{:reply, state, [], state}
end
end
Now let’s test the error handling pipeline:
# Start the error handling pipeline
{:ok, _faulty_producer} = FaultyProducer.start_link(error_rate: 0.2) # 20% error rate
{:ok, _resilient_processor} = ResilientProcessor.start_link([])
{:ok, _error_consumer} = ErrorTrackingConsumer.start_link([])
IO.puts("🚀 Error handling pipeline started!")
IO.puts(" FaultyProducer: 20% error rate")
IO.puts(" ResilientProcessor: comprehensive error handling with recovery")
IO.puts(" ErrorTrackingConsumer: detailed error analytics")
# Let the pipeline run and handle errors
Process.sleep(10000)
# Get final error statistics
error_stats = ErrorTrackingConsumer.get_error_stats()
IO.puts("\n🏁 === Final Error Handling Results ===")
IO.puts("Total events: #{error_stats.total_processed}")
IO.puts("Successful: #{error_stats.successful}")
IO.puts("Recovered: #{error_stats.recovered}")
IO.puts("Validation failures: #{error_stats.validation_failed}")
IO.puts("Compression failures: #{error_stats.compression_failed}")
IO.puts("Exceptions: #{error_stats.exceptions}")
if error_stats.total_processed > 0 do
overall_success = (error_stats.successful + error_stats.recovered) / error_stats.total_processed * 100
IO.puts("Overall success rate (including recovery): #{Float.round(overall_success, 1)}%")
end
Summary
This notebook demonstrated advanced GenStage integration patterns with GorillaStream:
- Basic Pipeline: Simple Producer → Processor → Consumer with individual point compression
- Multi-Stage Pipeline: Complex pipeline handling multiple sensors with adaptive batching
- Error Handling: Comprehensive error recovery and tracking mechanisms
Key GenStage Benefits with GorillaStream:
- Backpressure Control: Automatic demand management prevents memory issues
- Fault Tolerance: Isolated stage failures don’t crash the entire pipeline
- Scalability: Easy to add more stages or parallel processing
- Monitoring: Built-in observability for pipeline health and performance
Performance Patterns:
- Individual Compression: Low latency, higher overhead per point
- Batch Compression: Higher throughput, better compression ratios
- Adaptive Processing: Automatically switches based on data patterns
- Error Recovery: Maintains pipeline stability with graceful degradation
Production Considerations:
- Use appropriate batch sizes based on your latency requirements
- Implement comprehensive error handling and recovery strategies
- Monitor compression ratios and processing rates per sensor
- Consider zlib compression for storage-constrained environments
- Scale processing stages based on data volume and complexity
Next Steps:
- Explore Broadway for even more advanced streaming patterns
- Consider Flow for CPU-intensive parallel processing scenarios
- Integrate with real message queues (RabbitMQ, Apache Kafka)
- Add metrics collection and alerting for production deployments