Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

GorillaStream: GenStage Pipeline Integration

notebooks/03_genstage_pipeline.livemd

GorillaStream: GenStage Pipeline Integration

Mix.install([
  {:gorilla_stream, "~> 1.1"},
  {:gen_stage, "~> 1.2"}
])

Introduction

This notebook demonstrates how to integrate GorillaStream with GenStage to build robust, backpressure-aware streaming data pipelines. You’ll learn how to:

  • Create producer stages that generate time series data
  • Build producer-consumer stages for compression processing
  • Implement consumer stages for data storage and analytics
  • Handle backpressure and demand-driven processing
  • Scale processing with multiple concurrent stages
  • Implement error handling and recovery

1. Basic GenStage Pipeline

Let’s start with a simple three-stage pipeline: Producer → Compressor → Consumer

defmodule DataProducer do
  use GenStage

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(opts) do
    sensor_type = Keyword.get(opts, :sensor_type, :temperature)
    base_value = Keyword.get(opts, :base_value, 20.0)
    
    state = %{
      counter: 0,
      sensor_type: sensor_type,
      base_value: base_value,
      start_time: System.system_time(:second)
    }
    
    IO.puts("🏭 DataProducer started for #{sensor_type} sensor")
    {:producer, state}
  end

  def handle_demand(demand, state) when demand > 0 do
    IO.puts("📊 DataProducer: generating #{demand} data points")
    
    # Generate data points based on demand
    events = for i <- state.counter..(state.counter + demand - 1) do
      timestamp = state.start_time + i * 60  # Every minute
      value = generate_sensor_value(state.sensor_type, state.base_value, i)
      
      %{
        timestamp: timestamp,
        value: value,
        sensor_type: state.sensor_type,
        sequence: i,
        generated_at: System.system_time(:millisecond)
      }
    end

    new_state = %{state | counter: state.counter + demand}
    {:noreply, events, new_state}
  end

  defp generate_sensor_value(:temperature, base, i) do
    # Simulate daily temperature cycle with noise
    daily_cycle = 5 * :math.sin(i / 144)  # 24-hour cycle (1440 minutes / 10)
    noise = (:rand.uniform() - 0.5) * 1.0
    Float.round(base + daily_cycle + noise, 2)
  end

  defp generate_sensor_value(:humidity, base, i) do
    # Simulate humidity variations
    variation = 10 * :math.cos(i / 100) + (:rand.uniform() - 0.5) * 5
    Float.round(base + variation, 2)
  end

  defp generate_sensor_value(:pressure, base, i) do
    # Simulate atmospheric pressure changes
    variation = 20 * :math.sin(i / 200) + (:rand.uniform() - 0.5) * 5
    Float.round(base + variation, 2)
  end
end
defmodule CompressionProcessor do
  use GenStage

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(opts) do
    compression_mode = Keyword.get(opts, :compression_mode, :individual)
    use_zlib = Keyword.get(opts, :use_zlib, false)
    
    state = %{
      compression_mode: compression_mode,
      use_zlib: use_zlib,
      processed_count: 0,
      total_original_size: 0,
      total_compressed_size: 0,
      errors: 0
    }
    
    IO.puts("🗜️  CompressionProcessor started (mode: #{compression_mode}, zlib: #{use_zlib})")
    {:producer_consumer, state, subscribe_to: [DataProducer]}
  end

  def handle_events(events, _from, state) do
    IO.puts("🗜️  CompressionProcessor: processing #{length(events)} events")
    
    {compressed_events, new_state} = case state.compression_mode do
      :individual -> compress_individual_events(events, state)
      :batch -> compress_batch_events(events, state)
    end

    {:noreply, compressed_events, new_state}
  end

  defp compress_individual_events(events, state) do
    {compressed_events, stats} = Enum.map_reduce(events, %{processed: 0, compressed_size: 0, errors: 0}, fn event, acc ->
      # Convert to GorillaStream format
      data_point = {event.timestamp, event.value}
      
      case GorillaStream.compress([data_point], state.use_zlib) do
        {:ok, compressed} ->
          compressed_event = %{
            original_event: event,
            compressed_data: compressed,
            compressed_size: byte_size(compressed),
            compression_ratio: 16 / byte_size(compressed),  # Approximate original size
            compression_mode: :individual,
            compressed_at: System.system_time(:millisecond),
            status: :success
          }
          
          new_acc = %{
            processed: acc.processed + 1,
            compressed_size: acc.compressed_size + byte_size(compressed),
            errors: acc.errors
          }
          
          {compressed_event, new_acc}
          
        {:error, reason} ->
          error_event = %{
            original_event: event,
            error: reason,
            status: :error,
            compressed_at: System.system_time(:millisecond)
          }
          
          new_acc = %{acc | errors: acc.errors + 1}
          {error_event, new_acc}
      end
    end)
    
    new_state = %{
      state |
      processed_count: state.processed_count + stats.processed,
      total_original_size: state.total_original_size + length(events) * 16,
      total_compressed_size: state.total_compressed_size + stats.compressed_size,
      errors: state.errors + stats.errors
    }
    
    {compressed_events, new_state}
  end

  defp compress_batch_events(events, state) do
    # Convert all events to GorillaStream format
    data_points = Enum.map(events, fn event -> {event.timestamp, event.value} end)
    
    case GorillaStream.compress(data_points, state.use_zlib) do
      {:ok, compressed} ->
        compressed_event = %{
          original_events: events,
          compressed_data: compressed,
          compressed_size: byte_size(compressed),
          points_count: length(events),
          compression_ratio: (length(events) * 16) / byte_size(compressed),
          compression_mode: :batch,
          compressed_at: System.system_time(:millisecond),
          status: :success
        }
        
        new_state = %{
          state |
          processed_count: state.processed_count + length(events),
          total_original_size: state.total_original_size + length(events) * 16,
          total_compressed_size: state.total_compressed_size + byte_size(compressed)
        }
        
        {[compressed_event], new_state}
        
      {:error, reason} ->
        error_event = %{
          original_events: events,
          error: reason,
          status: :error,
          compressed_at: System.system_time(:millisecond)
        }
        
        new_state = %{state | errors: state.errors + 1}
        {[error_event], new_state}
    end
  end
end
defmodule DataConsumer do
  use GenStage

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(opts) do
    storage_type = Keyword.get(opts, :storage_type, :memory)
    
    state = %{
      storage_type: storage_type,
      stored_data: [],
      processed_count: 0,
      successful_compressions: 0,
      failed_compressions: 0,
      total_compressed_size: 0,
      start_time: System.monotonic_time(:millisecond)
    }
    
    IO.puts("💾 DataConsumer started (storage: #{storage_type})")
    {:consumer, state, subscribe_to: [CompressionProcessor]}
  end

  def handle_events(events, _from, state) do
    IO.puts("💾 DataConsumer: consuming #{length(events)} compressed events")
    
    # Process each compressed event
    {new_stored_data, stats} = Enum.map_reduce(events, state, fn event, acc ->
      case event.status do
        :success ->
          stored_item = store_compressed_data(event, acc.storage_type)
          
          new_acc = %{
            acc |
            successful_compressions: acc.successful_compressions + get_points_count(event),
            total_compressed_size: acc.total_compressed_size + event.compressed_size
          }
          
          {stored_item, new_acc}
          
        :error ->
          IO.puts("❌ Compression error: #{event.error}")
          
          new_acc = %{
            acc |
            failed_compressions: acc.failed_compressions + get_points_count(event)
          }
          
          {nil, new_acc}
      end
    end)

    # Update state with new data
    valid_items = Enum.reject(new_stored_data, &amp;is_nil/1)
    new_state = %{
      stats |
      stored_data: stats.stored_data ++ valid_items,
      processed_count: stats.processed_count + length(events)
    }

    # Periodically report statistics
    current_time = System.monotonic_time(:millisecond)
    if rem(new_state.processed_count, 10) == 0 do
      report_statistics(new_state, current_time)
    end

    {:noreply, [], new_state}
  end

  defp get_points_count(%{points_count: count}), do: count
  defp get_points_count(%{original_event: _}), do: 1
  defp get_points_count(%{original_events: events}), do: length(events)
  defp get_points_count(_), do: 0

  defp store_compressed_data(event, :memory) do
    # Store in memory (for demo)
    %{
      stored_at: System.system_time(:millisecond),
      compressed_size: event.compressed_size,
      compression_ratio: event.compression_ratio,
      compression_mode: event.compression_mode,
      points_count: get_points_count(event)
    }
  end

  defp store_compressed_data(event, :file) do
    # Simulate file storage
    filename = "compressed_#{System.system_time(:millisecond)}.gorilla"
    # In reality: File.write!(filename, event.compressed_data)
    
    %{
      stored_at: System.system_time(:millisecond),
      filename: filename,
      compressed_size: event.compressed_size,
      compression_ratio: event.compression_ratio,
      compression_mode: event.compression_mode,
      points_count: get_points_count(event)
    }
  end

  defp report_statistics(state, current_time) do
    elapsed_ms = current_time - state.start_time
    elapsed_seconds = elapsed_ms / 1000
    
    total_points = state.successful_compressions + state.failed_compressions
    processing_rate = if elapsed_seconds > 0, do: total_points / elapsed_seconds, else: 0
    success_rate = if total_points > 0, do: state.successful_compressions / total_points * 100, else: 0
    avg_compression_ratio = if state.successful_compressions > 0 do
      # Calculate average compression ratio from stored data
      total_ratio = 
        state.stored_data
        |> Enum.map(&amp; &amp;1.compression_ratio)
        |> Enum.sum()
      total_ratio / length(state.stored_data)
    else
      0
    end

    IO.puts("\n📈 === Pipeline Statistics ===")
    IO.puts("   Elapsed time: #{Float.round(elapsed_seconds, 1)}s")
    IO.puts("   Processing rate: #{Float.round(processing_rate, 1)} points/sec")
    IO.puts("   Successful compressions: #{state.successful_compressions}")
    IO.puts("   Failed compressions: #{state.failed_compressions}")
    IO.puts("   Success rate: #{Float.round(success_rate, 1)}%")
    IO.puts("   Total compressed size: #{state.total_compressed_size} bytes")
    IO.puts("   Average compression ratio: #{Float.round(avg_compression_ratio, 2)}:1")
    IO.puts("   Items stored: #{length(state.stored_data)}")
  end
  
  def get_stats() do
    GenStage.call(__MODULE__, :get_stats)
  end
  
  def handle_call(:get_stats, _from, state) do
    {:reply, state, [], state}
  end
end

Now let’s start the basic pipeline:

# Start the GenStage pipeline
{:ok, _producer} = DataProducer.start_link(sensor_type: :temperature, base_value: 22.0)
{:ok, _processor} = CompressionProcessor.start_link(compression_mode: :individual, use_zlib: false)
{:ok, _consumer} = DataConsumer.start_link(storage_type: :memory)

IO.puts("🚀 Basic GenStage pipeline started!")
IO.puts("   Producer generates temperature sensor data")
IO.puts("   Processor compresses individual points")
IO.puts("   Consumer stores compressed data in memory")

# Let the pipeline run for a bit
Process.sleep(5000)

# Get final statistics
final_stats = DataConsumer.get_stats()
IO.puts("\n🏁 === Final Pipeline Results ===")
IO.puts("   Total items processed: #{final_stats.processed_count}")
IO.puts("   Successful compressions: #{final_stats.successful_compressions}")
IO.puts("   Failed compressions: #{final_stats.failed_compressions}")
IO.puts("   Total compressed size: #{final_stats.total_compressed_size} bytes")
IO.puts("   Items in storage: #{length(final_stats.stored_data)}")

2. Multi-Stage Pipeline with Different Compression Strategies

Let’s build a more complex pipeline that handles multiple data sources and compression strategies:

defmodule MultiSensorProducer do
  use GenStage

  def start_link(opts) do
    name = Keyword.get(opts, :name, __MODULE__)
    GenStage.start_link(__MODULE__, opts, name: name)
  end

  def init(opts) do
    sensors = Keyword.get(opts, :sensors, [])
    rate_limit = Keyword.get(opts, :rate_limit, 1000)  # Max points per second
    
    state = %{
      sensors: sensors,
      counter: 0,
      rate_limit: rate_limit,
      last_batch_time: System.monotonic_time(:millisecond)
    }
    
    IO.puts("🏭 MultiSensorProducer started with #{length(sensors)} sensors")
    {:producer, state}
  end

  def handle_demand(demand, state) when demand > 0 do
    current_time = System.monotonic_time(:millisecond)
    time_since_last = current_time - state.last_batch_time
    
    # Apply rate limiting
    max_points = min(demand, div(state.rate_limit * time_since_last, 1000) + 1)
    actual_demand = max(1, max_points)
    
    # Generate data from all sensors
    events = generate_multi_sensor_data(state.sensors, state.counter, actual_demand)
    
    new_state = %{
      state | 
      counter: state.counter + actual_demand,
      last_batch_time: current_time
    }
    
    IO.puts("🏭 MultiSensorProducer: generated #{length(events)} data points from #{length(state.sensors)} sensors")
    {:noreply, events, new_state}
  end

  defp generate_multi_sensor_data(sensors, base_counter, demand_per_sensor) do
    sensors
    |> Enum.with_index()
    |> Enum.flat_map(fn {{sensor_id, sensor_config}, sensor_index} ->
      for i <- 0..(demand_per_sensor - 1) do
        counter = base_counter + i
        timestamp = System.system_time(:second) + counter * 60
        
        value = case sensor_config.type do
          :temperature ->
            base = sensor_config.base_value
            base + 5 * :math.sin((counter + sensor_index * 100) / 144) + (:rand.uniform() - 0.5) * 1.0
            
          :humidity ->
            base = sensor_config.base_value
            base + 10 * :math.cos((counter + sensor_index * 150) / 100) + (:rand.uniform() - 0.5) * 5.0
            
          :pressure ->
            base = sensor_config.base_value
            base + 20 * :math.sin((counter + sensor_index * 200) / 200) + (:rand.uniform() - 0.5) * 3.0
        end
        
        %{
          sensor_id: sensor_id,
          sensor_type: sensor_config.type,
          timestamp: timestamp,
          value: Float.round(value, 2),
          sequence: counter,
          generated_at: System.system_time(:millisecond)
        }
      end
    end)
  end
end
defmodule AdaptiveCompressionProcessor do
  use GenStage

  def start_link(opts) do
    name = Keyword.get(opts, :name, __MODULE__)
    GenStage.start_link(__MODULE__, opts, name: name)
  end

  def init(opts) do
    producer = Keyword.get(opts, :subscribe_to, [])
    batch_threshold = Keyword.get(opts, :batch_threshold, 5)
    use_zlib = Keyword.get(opts, :use_zlib, false)
    
    state = %{
      batch_threshold: batch_threshold,
      use_zlib: use_zlib,
      buffer: %{},  # Buffer per sensor
      processed_count: 0,
      total_compressed_size: 0,
      compression_stats: %{}
    }
    
    IO.puts("🗜️  AdaptiveCompressionProcessor started (batch_threshold: #{batch_threshold}, zlib: #{use_zlib})")
    {:producer_consumer, state, subscribe_to: producer}
  end

  def handle_events(events, _from, state) do
    IO.puts("🗜️  AdaptiveCompressionProcessor: processing #{length(events)} events from multiple sensors")
    
    # Group events by sensor
    events_by_sensor = Enum.group_by(events, &amp; &amp;1.sensor_id)
    
    # Process each sensor's data adaptively
    {compressed_events, new_state} = 
      Enum.map_reduce(events_by_sensor, state, fn {sensor_id, sensor_events}, acc ->
        process_sensor_events(sensor_id, sensor_events, acc)
      end)
    
    all_compressed_events = List.flatten(compressed_events)
    {:noreply, all_compressed_events, new_state}
  end

  defp process_sensor_events(sensor_id, events, state) do
    # Add events to sensor buffer
    current_buffer = Map.get(state.buffer, sensor_id, [])
    new_buffer = current_buffer ++ events
    
    if length(new_buffer) >= state.batch_threshold do
      # Process as batch
      case compress_sensor_batch(sensor_id, new_buffer, state.use_zlib) do
        {:ok, compressed_event} ->
          # Update compression stats
          sensor_stats = Map.get(state.compression_stats, sensor_id, %{batches: 0, individuals: 0, total_size: 0})
          new_sensor_stats = %{
            sensor_stats |
            batches: sensor_stats.batches + 1,
            total_size: sensor_stats.total_size + compressed_event.compressed_size
          }
          
          new_state = %{
            state |
            buffer: Map.put(state.buffer, sensor_id, []),  # Clear buffer
            processed_count: state.processed_count + length(new_buffer),
            total_compressed_size: state.total_compressed_size + compressed_event.compressed_size,
            compression_stats: Map.put(state.compression_stats, sensor_id, new_sensor_stats)
          }
          
          {[compressed_event], new_state}
          
        {:error, reason} ->
          IO.puts("❌ Batch compression failed for #{sensor_id}: #{reason}")
          
          # Process individually as fallback
          {individual_events, fallback_state} = compress_individually(sensor_id, new_buffer, state)
          fallback_state = %{fallback_state | buffer: Map.put(fallback_state.buffer, sensor_id, [])}
          
          {individual_events, fallback_state}
      end
    else
      # Keep buffering
      new_state = %{state | buffer: Map.put(state.buffer, sensor_id, new_buffer)}
      {[], new_state}
    end
  end

  defp compress_sensor_batch(sensor_id, events, use_zlib) do
    data_points = Enum.map(events, fn event -> {event.timestamp, event.value} end)
    
    case GorillaStream.compress(data_points, use_zlib) do
      {:ok, compressed} ->
        compressed_event = %{
          sensor_id: sensor_id,
          sensor_type: (List.first(events)).sensor_type,
          original_events: events,
          compressed_data: compressed,
          compressed_size: byte_size(compressed),
          points_count: length(events),
          compression_ratio: (length(events) * 16) / byte_size(compressed),
          compression_mode: :batch,
          compressed_at: System.system_time(:millisecond),
          status: :success
        }
        
        {:ok, compressed_event}
        
      {:error, reason} ->
        {:error, reason}
    end
  end

  defp compress_individually(sensor_id, events, state) do
    {compressed_events, stats} = Enum.map_reduce(events, %{count: 0, size: 0}, fn event, acc ->
      data_point = {event.timestamp, event.value}
      
      case GorillaStream.compress([data_point], state.use_zlib) do
        {:ok, compressed} ->
          compressed_event = %{
            sensor_id: sensor_id,
            sensor_type: event.sensor_type,
            original_event: event,
            compressed_data: compressed,
            compressed_size: byte_size(compressed),
            points_count: 1,
            compression_ratio: 16 / byte_size(compressed),
            compression_mode: :individual,
            compressed_at: System.system_time(:millisecond),
            status: :success
          }
          
          new_acc = %{count: acc.count + 1, size: acc.size + byte_size(compressed)}
          {compressed_event, new_acc}
          
        {:error, reason} ->
          error_event = %{
            sensor_id: sensor_id,
            original_event: event,
            error: reason,
            status: :error,
            compressed_at: System.system_time(:millisecond)
          }
          
          {error_event, acc}
      end
    end)
    
    # Update sensor stats for individual processing
    sensor_stats = Map.get(state.compression_stats, sensor_id, %{batches: 0, individuals: 0, total_size: 0})
    new_sensor_stats = %{
      sensor_stats |
      individuals: sensor_stats.individuals + stats.count,
      total_size: sensor_stats.total_size + stats.size
    }
    
    new_state = %{
      state |
      processed_count: state.processed_count + length(events),
      total_compressed_size: state.total_compressed_size + stats.size,
      compression_stats: Map.put(state.compression_stats, sensor_id, new_sensor_stats)
    }
    
    {compressed_events, new_state}
  end
end
defmodule AnalyticsConsumer do
  use GenStage

  def start_link(opts) do
    name = Keyword.get(opts, :name, __MODULE__)
    GenStage.start_link(__MODULE__, opts, name: name)
  end

  def init(opts) do
    producer = Keyword.get(opts, :subscribe_to, [])
    report_interval = Keyword.get(opts, :report_interval, 5000)
    
    # Schedule periodic analytics reports
    :timer.send_interval(report_interval, self(), :analytics_report)
    
    state = %{
      sensor_analytics: %{},
      processed_count: 0,
      start_time: System.monotonic_time(:millisecond),
      last_report_time: System.monotonic_time(:millisecond)
    }
    
    IO.puts("📊 AnalyticsConsumer started (reports every #{report_interval}ms)")
    {:consumer, state, subscribe_to: producer}
  end

  def handle_events(events, _from, state) do
    IO.puts("📊 AnalyticsConsumer: analyzing #{length(events)} compressed events")
    
    # Update analytics for each sensor
    new_sensor_analytics = 
      Enum.reduce(events, state.sensor_analytics, fn event, analytics ->
        case event.status do
          :success ->
            sensor_id = event.sensor_id
            current_stats = Map.get(analytics, sensor_id, %{
              total_points: 0,
              total_compressed_size: 0,
              batch_compressions: 0,
              individual_compressions: 0,
              compression_ratios: [],
              sensor_type: event.sensor_type
            })
            
            new_stats = %{
              current_stats |
              total_points: current_stats.total_points + event.points_count,
              total_compressed_size: current_stats.total_compressed_size + event.compressed_size,
              batch_compressions: current_stats.batch_compressions + if(event.compression_mode == :batch, do: 1, else: 0),
              individual_compressions: current_stats.individual_compressions + if(event.compression_mode == :individual, do: 1, else: 0),
              compression_ratios: [event.compression_ratio | current_stats.compression_ratios]
            }
            
            Map.put(analytics, sensor_id, new_stats)
            
          :error ->
            # Log error but don't update stats
            IO.puts("⚠️  Analytics: Skipping failed compression for #{event.sensor_id}")
            analytics
        end
      end)
    
    new_state = %{
      state |
      sensor_analytics: new_sensor_analytics,
      processed_count: state.processed_count + length(events)
    }

    {:noreply, [], new_state}
  end

  def handle_info(:analytics_report, state) do
    current_time = System.monotonic_time(:millisecond)
    report_analytics(state, current_time)
    
    new_state = %{state | last_report_time: current_time}
    {:noreply, [], new_state}
  end

  defp report_analytics(state, current_time) do
    elapsed_seconds = (current_time - state.start_time) / 1000
    report_elapsed = (current_time - state.last_report_time) / 1000
    
    IO.puts("\n📈 === Multi-Sensor Analytics Report ===")
    IO.puts("   Report interval: #{Float.round(report_elapsed, 1)}s")
    IO.puts("   Total elapsed: #{Float.round(elapsed_seconds, 1)}s")
    IO.puts("   Events processed: #{state.processed_count}")
    
    if map_size(state.sensor_analytics) > 0 do
      IO.puts("\n   📊 Per-Sensor Statistics:")
      
      Enum.each(state.sensor_analytics, fn {sensor_id, stats} ->
        avg_compression_ratio = if length(stats.compression_ratios) > 0 do
          Enum.sum(stats.compression_ratios) / length(stats.compression_ratios)
        else
          0
        end
        
        processing_rate = if elapsed_seconds > 0, do: stats.total_points / elapsed_seconds, else: 0
        
        IO.puts("   #{sensor_id} (#{stats.sensor_type}):")
        IO.puts("     • Points: #{stats.total_points} @ #{Float.round(processing_rate, 1)} pts/sec")
        IO.puts("     • Compressed size: #{stats.total_compressed_size} bytes")
        IO.puts("     • Avg compression ratio: #{Float.round(avg_compression_ratio, 2)}:1")
        IO.puts("     • Batch compressions: #{stats.batch_compressions}")
        IO.puts("     • Individual compressions: #{stats.individual_compressions}")
      end)
      
      # Overall statistics
      total_points = Enum.sum(Enum.map(state.sensor_analytics, fn {_, stats} -> stats.total_points end))
      total_size = Enum.sum(Enum.map(state.sensor_analytics, fn {_, stats} -> stats.total_compressed_size end))
      overall_rate = if elapsed_seconds > 0, do: total_points / elapsed_seconds, else: 0
      
      IO.puts("\n   🌍 Overall Statistics:")
      IO.puts("     • Total points across all sensors: #{total_points}")
      IO.puts("     • Total compressed size: #{total_size} bytes")
      IO.puts("     • Overall processing rate: #{Float.round(overall_rate, 1)} points/sec")
    end
  end
  
  def get_analytics() do
    GenStage.call(__MODULE__, :get_analytics)
  end
  
  def handle_call(:get_analytics, _from, state) do
    {:reply, state.sensor_analytics, [], state}
  end
end

Now let’s start the multi-stage pipeline:

# Define multiple sensors with different characteristics
sensors = [
  {"temp_sensor_01", %{type: :temperature, base_value: 22.0}},
  {"temp_sensor_02", %{type: :temperature, base_value: 18.5}},
  {"humidity_sensor_01", %{type: :humidity, base_value: 65.0}},
  {"pressure_sensor_01", %{type: :pressure, base_value: 1013.25}}
]

# Start the multi-stage pipeline
{:ok, _multi_producer} = MultiSensorProducer.start_link(
  name: :multi_producer,
  sensors: sensors,
  rate_limit: 2000
)

{:ok, _adaptive_processor} = AdaptiveCompressionProcessor.start_link(
  name: :adaptive_processor,
  subscribe_to: [:multi_producer],
  batch_threshold: 3,
  use_zlib: true
)

{:ok, _analytics_consumer} = AnalyticsConsumer.start_link(
  name: :analytics_consumer,
  subscribe_to: [:adaptive_processor],
  report_interval: 3000
)

IO.puts("🚀 Multi-stage GenStage pipeline started!")
IO.puts("   MultiSensorProducer: #{length(sensors)} sensors with rate limiting")
IO.puts("   AdaptiveCompressionProcessor: batch threshold=3, zlib enabled")
IO.puts("   AnalyticsConsumer: reports every 3 seconds")

# Let the pipeline run for a while
Process.sleep(12000)

# Get final analytics
final_analytics = AnalyticsConsumer.get_analytics()
IO.puts("\n🏁 === Final Multi-Sensor Results ===")

Enum.each(final_analytics, fn {sensor_id, stats} ->
  avg_ratio = if length(stats.compression_ratios) > 0 do
    Enum.sum(stats.compression_ratios) / length(stats.compression_ratios)
  else
    0
  end
  
  batch_pct = if stats.batch_compressions + stats.individual_compressions > 0 do
    stats.batch_compressions / (stats.batch_compressions + stats.individual_compressions) * 100
  else
    0
  end
  
  IO.puts("#{sensor_id}:")
  IO.puts("  Points: #{stats.total_points}, Size: #{stats.total_compressed_size}B")
  IO.puts("  Avg ratio: #{Float.round(avg_ratio, 2)}:1, Batch %: #{Float.round(batch_pct, 1)}%")
end)

3. Error Handling and Recovery

Let’s demonstrate robust error handling in GenStage pipelines:

defmodule FaultyProducer do
  use GenStage

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(opts) do
    error_rate = Keyword.get(opts, :error_rate, 0.1)  # 10% error rate
    
    state = %{
      counter: 0,
      error_rate: error_rate,
      errors_generated: 0
    }
    
    IO.puts("🏭 FaultyProducer started (error rate: #{Float.round(error_rate * 100, 1)}%)")
    {:producer, state}
  end

  def handle_demand(demand, state) when demand > 0 do
    events = for i <- state.counter..(state.counter + demand - 1) do
      # Randomly introduce faulty data
      if :rand.uniform() < state.error_rate do
        # Generate invalid data
        %{
          timestamp: "invalid_timestamp",  # Wrong type
          value: nil,  # Missing value
          sequence: i,
          faulty: true,
          generated_at: System.system_time(:millisecond)
        }
      else
        # Generate valid data
        timestamp = System.system_time(:second) + i * 60
        value = 20.0 + :math.sin(i / 100) * 5 + (:rand.uniform() - 0.5) * 1.0
        
        %{
          timestamp: timestamp,
          value: Float.round(value, 2),
          sequence: i,
          faulty: false,
          generated_at: System.system_time(:millisecond)
        }
      end
    end

    errors_in_batch = Enum.count(events, &amp; &amp;1.faulty)
    
    new_state = %{
      state |
      counter: state.counter + demand,
      errors_generated: state.errors_generated + errors_in_batch
    }

    if errors_in_batch > 0 do
      IO.puts("🏭 FaultyProducer: generated #{demand} events (#{errors_in_batch} faulty)")
    end

    {:noreply, events, new_state}
  end
end
defmodule ResilientProcessor do
  use GenStage

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(opts) do
    state = %{
      processed_count: 0,
      successful_compressions: 0,
      failed_compressions: 0,
      validation_errors: 0,
      compression_errors: 0,
      recovered_errors: 0
    }
    
    IO.puts("🛡️  ResilientProcessor started with comprehensive error handling")
    {:producer_consumer, state, subscribe_to: [FaultyProducer]}
  end

  def handle_events(events, _from, state) do
    IO.puts("🛡️  ResilientProcessor: processing #{length(events)} events (with error handling)")
    
    {processed_events, new_state} = 
      Enum.map_reduce(events, state, &amp;process_event_with_recovery/2)

    successful_events = Enum.filter(processed_events, &amp;(&amp;1.status == :success))
    
    IO.puts("   ✅ Successfully processed: #{length(successful_events)}/#{length(events)}")
    
    {:noreply, processed_events, new_state}
  end

  defp process_event_with_recovery(event, state) do
    try do
      # Step 1: Validate data
      case validate_event(event) do
        {:ok, validated_event} ->
          # Step 2: Attempt compression
          case attempt_compression(validated_event) do
            {:ok, compressed_result} ->
              result = %{
                original_event: event,
                compressed_data: compressed_result.compressed,
                compressed_size: compressed_result.size,
                compression_ratio: compressed_result.ratio,
                status: :success,
                processed_at: System.system_time(:millisecond)
              }
              
              new_state = %{
                state |
                processed_count: state.processed_count + 1,
                successful_compressions: state.successful_compressions + 1
              }
              
              {result, new_state}
              
            {:error, compression_reason} ->
              # Try recovery strategies
              case attempt_recovery(validated_event, compression_reason, state) do
                {:ok, recovered_result} ->
                  result = %{
                    original_event: event,
                    compressed_data: recovered_result.compressed,
                    compressed_size: recovered_result.size,
                    compression_ratio: recovered_result.ratio,
                    status: :recovered,
                    recovery_method: recovered_result.method,
                    original_error: compression_reason,
                    processed_at: System.system_time(:millisecond)
                  }
                  
                  new_state = %{
                    state |
                    processed_count: state.processed_count + 1,
                    successful_compressions: state.successful_compressions + 1,
                    recovered_errors: state.recovered_errors + 1
                  }
                  
                  IO.puts("   🔧 Recovered from compression error using #{recovered_result.method}")
                  {result, new_state}
                  
                {:error, _recovery_reason} ->
                  result = %{
                    original_event: event,
                    error: compression_reason,
                    status: :compression_failed,
                    processed_at: System.system_time(:millisecond)
                  }
                  
                  new_state = %{
                    state |
                    processed_count: state.processed_count + 1,
                    compression_errors: state.compression_errors + 1
                  }
                  
                  {result, new_state}
              end
          end
          
        {:error, validation_reason} ->
          result = %{
            original_event: event,
            error: validation_reason,
            status: :validation_failed,
            processed_at: System.system_time(:millisecond)
          }
          
          new_state = %{
            state |
            processed_count: state.processed_count + 1,
            validation_errors: state.validation_errors + 1
          }
          
          {result, new_state}
      end
    rescue
      exception ->
        # Catch any unexpected errors
        result = %{
          original_event: event,
          error: "Unexpected exception: #{Exception.message(exception)}",
          status: :exception,
          processed_at: System.system_time(:millisecond)
        }
        
        new_state = %{
          state |
          processed_count: state.processed_count + 1,
          failed_compressions: state.failed_compressions + 1
        }
        
        IO.puts("   ⚠️  Unexpected exception: #{Exception.message(exception)}")
        {result, new_state}
    end
  end

  defp validate_event(event) do
    cond do
      not is_integer(event.timestamp) ->
        {:error, "Invalid timestamp type: #{inspect(event.timestamp)}"}
        
      not is_number(event.value) or is_nil(event.value) ->
        {:error, "Invalid value: #{inspect(event.value)}"}
        
      true ->
        {:ok, event}
    end
  end

  defp attempt_compression(event) do
    data_point = {event.timestamp, event.value}
    
    case GorillaStream.compress([data_point]) do
      {:ok, compressed} ->
        {:ok, %{
          compressed: compressed,
          size: byte_size(compressed),
          ratio: 16 / byte_size(compressed)
        }}
        
      {:error, reason} ->
        {:error, reason}
    end
  end

  defp attempt_recovery(event, _compression_reason, _state) do
    # Recovery strategy 1: Try with sanitized data
    sanitized_timestamp = sanitize_timestamp(event.timestamp)
    sanitized_value = sanitize_value(event.value)
    
    if sanitized_timestamp &amp;&amp; sanitized_value do
      case GorillaStream.compress([{sanitized_timestamp, sanitized_value}]) do
        {:ok, compressed} ->
          {:ok, %{
            compressed: compressed,
            size: byte_size(compressed),
            ratio: 16 / byte_size(compressed),
            method: "data_sanitization"
          }}
          
        {:error, _} ->
          # Recovery strategy 2: Use default values
          attempt_default_recovery(event)
      end
    else
      attempt_default_recovery(event)
    end
  end

  defp attempt_default_recovery(_event) do
    # Last resort: use a default data point
    default_timestamp = System.system_time(:second)
    default_value = 0.0
    
    case GorillaStream.compress([{default_timestamp, default_value}]) do
      {:ok, compressed} ->
        {:ok, %{
          compressed: compressed,
          size: byte_size(compressed),
          ratio: 16 / byte_size(compressed),
          method: "default_values"
        }}
        
      {:error, reason} ->
        {:error, "All recovery strategies failed: #{reason}"}
    end
  end

  defp sanitize_timestamp(timestamp) when is_integer(timestamp), do: timestamp
  defp sanitize_timestamp(_), do: System.system_time(:second)

  defp sanitize_value(value) when is_number(value), do: value
  defp sanitize_value(_), do: 0.0
end
defmodule ErrorTrackingConsumer do
  use GenStage

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_opts) do
    # Schedule periodic error reports
    :timer.send_interval(3000, self(), :error_report)
    
    state = %{
      total_processed: 0,
      successful: 0,
      recovered: 0,
      validation_failed: 0,
      compression_failed: 0,
      exceptions: 0,
      error_details: [],
      start_time: System.monotonic_time(:millisecond)
    }
    
    IO.puts("📊 ErrorTrackingConsumer started with detailed error analytics")
    {:consumer, state, subscribe_to: [ResilientProcessor]}
  end

  def handle_events(events, _from, state) do
    # Categorize events by status
    categorized = Enum.group_by(events, &amp; &amp;1.status)
    
    successful = length(Map.get(categorized, :success, []))
    recovered = length(Map.get(categorized, :recovered, []))
    validation_failed = length(Map.get(categorized, :validation_failed, []))
    compression_failed = length(Map.get(categorized, :compression_failed, []))
    exceptions = length(Map.get(categorized, :exception, []))
    
    # Collect error details
    error_events = events 
                  |> Enum.filter(&amp;(&amp;1.status != :success))
                  |> Enum.map(&amp;extract_error_details/1)
    
    new_state = %{
      state |
      total_processed: state.total_processed + length(events),
      successful: state.successful + successful,
      recovered: state.recovered + recovered,
      validation_failed: state.validation_failed + validation_failed,
      compression_failed: state.compression_failed + compression_failed,
      exceptions: state.exceptions + exceptions,
      error_details: state.error_details ++ error_events
    }

    IO.puts("📊 ErrorTrackingConsumer: #{length(events)} events (#{successful} ok, #{recovered} recovered, #{length(events) - successful - recovered} errors)")

    {:noreply, [], new_state}
  end

  def handle_info(:error_report, state) do
    current_time = System.monotonic_time(:millisecond)
    elapsed_seconds = (current_time - state.start_time) / 1000
    
    IO.puts("\n🚨 === Error Tracking Report ===")
    IO.puts("   Elapsed time: #{Float.round(elapsed_seconds, 1)}s")
    IO.puts("   Total processed: #{state.total_processed}")
    
    if state.total_processed > 0 do
      success_rate = state.successful / state.total_processed * 100
      recovery_rate = state.recovered / state.total_processed * 100
      error_rate = (state.total_processed - state.successful - state.recovered) / state.total_processed * 100
      
      IO.puts("   Success rate: #{Float.round(success_rate, 1)}%")
      IO.puts("   Recovery rate: #{Float.round(recovery_rate, 1)}%")
      IO.puts("   Error rate: #{Float.round(error_rate, 1)}%")
      
      IO.puts("\n   Error Breakdown:")
      IO.puts("     • Validation errors: #{state.validation_failed}")
      IO.puts("     • Compression errors: #{state.compression_failed}")
      IO.puts("     • Exceptions: #{state.exceptions}")
      
      # Show recent error details
      recent_errors = Enum.take(state.error_details, -5)
      if length(recent_errors) > 0 do
        IO.puts("\n   Recent Errors:")
        Enum.each(recent_errors, fn error ->
          IO.puts("     • #{error.type}: #{error.message}")
        end)
      end
    end
    
    {:noreply, [], state}
  end

  defp extract_error_details(event) do
    %{
      type: event.status,
      message: event.error,
      timestamp: event.processed_at,
      original_faulty: Map.get(event.original_event, :faulty, false)
    }
  end
  
  def get_error_stats() do
    GenStage.call(__MODULE__, :get_stats)
  end
  
  def handle_call(:get_stats, _from, state) do
    {:reply, state, [], state}
  end
end

Now let’s test the error handling pipeline:

# Start the error handling pipeline
{:ok, _faulty_producer} = FaultyProducer.start_link(error_rate: 0.2)  # 20% error rate
{:ok, _resilient_processor} = ResilientProcessor.start_link([])
{:ok, _error_consumer} = ErrorTrackingConsumer.start_link([])

IO.puts("🚀 Error handling pipeline started!")
IO.puts("   FaultyProducer: 20% error rate")
IO.puts("   ResilientProcessor: comprehensive error handling with recovery")
IO.puts("   ErrorTrackingConsumer: detailed error analytics")

# Let the pipeline run and handle errors
Process.sleep(10000)

# Get final error statistics
error_stats = ErrorTrackingConsumer.get_error_stats()
IO.puts("\n🏁 === Final Error Handling Results ===")
IO.puts("Total events: #{error_stats.total_processed}")
IO.puts("Successful: #{error_stats.successful}")
IO.puts("Recovered: #{error_stats.recovered}")
IO.puts("Validation failures: #{error_stats.validation_failed}")
IO.puts("Compression failures: #{error_stats.compression_failed}")
IO.puts("Exceptions: #{error_stats.exceptions}")

if error_stats.total_processed > 0 do
  overall_success = (error_stats.successful + error_stats.recovered) / error_stats.total_processed * 100
  IO.puts("Overall success rate (including recovery): #{Float.round(overall_success, 1)}%")
end

Summary

This notebook demonstrated advanced GenStage integration patterns with GorillaStream:

  1. Basic Pipeline: Simple Producer → Processor → Consumer with individual point compression
  2. Multi-Stage Pipeline: Complex pipeline handling multiple sensors with adaptive batching
  3. Error Handling: Comprehensive error recovery and tracking mechanisms

Key GenStage Benefits with GorillaStream:

  • Backpressure Control: Automatic demand management prevents memory issues
  • Fault Tolerance: Isolated stage failures don’t crash the entire pipeline
  • Scalability: Easy to add more stages or parallel processing
  • Monitoring: Built-in observability for pipeline health and performance

Performance Patterns:

  • Individual Compression: Low latency, higher overhead per point
  • Batch Compression: Higher throughput, better compression ratios
  • Adaptive Processing: Automatically switches based on data patterns
  • Error Recovery: Maintains pipeline stability with graceful degradation

Production Considerations:

  • Use appropriate batch sizes based on your latency requirements
  • Implement comprehensive error handling and recovery strategies
  • Monitor compression ratios and processing rates per sensor
  • Consider zlib compression for storage-constrained environments
  • Scale processing stages based on data volume and complexity

Next Steps:

  • Explore Broadway for even more advanced streaming patterns
  • Consider Flow for CPU-intensive parallel processing scenarios
  • Integrate with real message queues (RabbitMQ, Apache Kafka)
  • Add metrics collection and alerting for production deployments