Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

GorillaStream: Broadway Processing Pipeline

notebooks/04_broadway_processing.livemd

GorillaStream: Broadway Processing Pipeline

Mix.install([
  {:gorilla_stream, "~> 1.1"},
  {:broadway, "~> 1.0"},
  {:gen_stage, "~> 1.2"},
  {:jason, "~> 1.4"}
])

Introduction

This notebook demonstrates how to build production-grade streaming data pipelines using GorillaStream with Broadway. Broadway provides:

  • Scalable message processing with automatic concurrency management
  • Built-in batching for efficient bulk operations
  • Fault tolerance with configurable retry mechanisms
  • Backpressure handling and rate limiting
  • Comprehensive telemetry and monitoring hooks

You’ll learn to build pipelines for:

  • High-throughput sensor data processing
  • Multi-tenant IoT data streams
  • Real-time analytics with compression
  • Fault-tolerant message processing

1. Basic Broadway Pipeline for Sensor Data

Let’s start with a simple Broadway pipeline that processes sensor data messages:

defmodule SensorDataProducer do
  @behaviour Broadway.Producer
  
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def init(opts) do
    sensor_configs = Keyword.get(opts, :sensors, [])
    message_rate = Keyword.get(opts, :message_rate, 100)  # messages per second
    
    # Schedule message generation
    interval = max(1, div(1000, message_rate))
    :timer.send_interval(interval, self(), :generate_message)
    
    state = %{
      sensors: sensor_configs,
      message_counter: 0,
      current_sensor_index: 0
    }
    
    IO.puts("πŸ“‘ SensorDataProducer started with #{length(sensor_configs)} sensors @ #{message_rate} msgs/sec")
    {:ok, state}
  end
  
  def handle_info(:generate_message, state) do
    # Round-robin through sensors
    sensor_index = rem(state.current_sensor_index, length(state.sensors))
    {sensor_id, sensor_config} = Enum.at(state.sensors, sensor_index)
    
    # Generate sensor reading
    timestamp = System.system_time(:second)
    value = generate_sensor_reading(sensor_config, state.message_counter)
    
    message_data = %{
      sensor_id: sensor_id,
      sensor_type: sensor_config.type,
      timestamp: timestamp,
      value: value,
      message_id: state.message_counter,
      generated_at: System.system_time(:millisecond)
    }
    
    # Send message to Broadway pipeline
    broadway_message = %Broadway.Message{
      data: Jason.encode!(message_data),
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
    
    # In a real scenario, you'd send this to a message queue
    # For demo, we'll simulate by sending to the Broadway process
    send(:sensor_broadway, {:test_message, broadway_message})
    
    new_state = %{
      state |
      message_counter: state.message_counter + 1,
      current_sensor_index: state.current_sensor_index + 1
    }
    
    {:noreply, new_state}
  end
  
  defp generate_sensor_reading(config, counter) do
    base_value = config.base_value
    
    case config.type do
      :temperature ->
        # Simulate daily temperature cycle
        daily_pattern = 5 * :math.sin(counter / 1440)  # 24-hour cycle
        noise = (:rand.uniform() - 0.5) * 2.0
        Float.round(base_value + daily_pattern + noise, 2)
        
      :humidity ->
        # Simulate humidity variations
        pattern = 15 * :math.cos(counter / 720)  # 12-hour cycle
        noise = (:rand.uniform() - 0.5) * 5.0
        value = base_value + pattern + noise
        Float.round(max(0, min(100, value)), 2)  # Clamp to 0-100%
        
      :pressure ->
        # Simulate atmospheric pressure changes
        pattern = 20 * :math.sin(counter / 2880)  # 48-hour cycle
        noise = (:rand.uniform() - 0.5) * 3.0
        Float.round(base_value + pattern + noise, 2)
        
      _ ->
        base_value + (:rand.uniform() - 0.5) * 10.0
    end
  end
  
  # Broadway acknowledger callbacks (simplified for demo)
  def ack(:ack_id, _successful, _failed), do: :ok
end
defmodule SensorDataBroadway do
  use Broadway
  
  def start_link(opts) do
    sensor_configs = Keyword.get(opts, :sensors, [])
    
    # Start the message producer
    {:ok, _producer} = SensorDataProducer.start_link(sensors: sensor_configs, message_rate: 50)
    
    Broadway.start_link(__MODULE__,
      name: :sensor_broadway,
      producer: [
        module: {__MODULE__.TestProducer, []},
        stages: 1
      ],
      processors: [
        default: [stages: 4, min_demand: 5, max_demand: 10]
      ],
      batchers: [
        compressed_storage: [stages: 2, batch_size: 20, batch_timeout: 2000],
        analytics: [stages: 1, batch_size: 50, batch_timeout: 1000]
      ]
    )
  end

  def handle_message(processor, message, _context) do
    case Jason.decode(message.data) do
      {:ok, sensor_data} ->
        # Compress the sensor data point
        data_point = {sensor_data["timestamp"], sensor_data["value"]}
        
        case GorillaStream.compress([data_point]) do
          {:ok, compressed} ->
            # Enrich message with compression information
            enriched_data = Map.merge(sensor_data, %{
              "compressed_data" => Base.encode64(compressed),
              "compressed_size" => byte_size(compressed),
              "compression_ratio" => 16 / byte_size(compressed),
              "processed_at" => System.system_time(:millisecond)
            })
            
            message
            |> Broadway.Message.put_data(enriched_data)
            |> Broadway.Message.put_batcher(:compressed_storage)
            
          {:error, reason} ->
            IO.puts("❌ Compression failed for sensor #{sensor_data["sensor_id"]}: #{reason}")
            Broadway.Message.failed(message, reason)
        end
        
      {:error, reason} ->
        IO.puts("❌ JSON decode failed: #{reason}")
        Broadway.Message.failed(message, reason)
    end
  end

  def handle_batch(:compressed_storage, messages, _batch_info, _context) do
    IO.puts("πŸ’Ύ Storing batch of #{length(messages)} compressed sensor readings")
    
    # Process compressed data storage
    Enum.each(messages, fn message ->
      data = message.data
      
      # Simulate storing compressed data to database/file system
      store_compressed_sensor_data(data)
      
      # Log compression stats occasionally
      if rem(System.system_time(:millisecond), 5000) < 100 do
        IO.puts("   πŸ“Š #{data["sensor_id"]}: #{data["value"]} -> #{data["compressed_size"]}B (#{Float.round(data["compression_ratio"], 2)}:1)")
      end
    end)
    
    messages
  end

  def handle_batch(:analytics, messages, _batch_info, _context) do
    IO.puts("πŸ“ˆ Processing analytics batch of #{length(messages)} sensor readings")
    
    # Aggregate analytics by sensor type
    analytics = 
      messages
      |> Enum.group_by(fn msg -> msg.data["sensor_type"] end)
      |> Enum.map(fn {sensor_type, type_messages} ->
        values = Enum.map(type_messages, fn msg -> msg.data["value"] end)
        compressed_sizes = Enum.map(type_messages, fn msg -> msg.data["compressed_size"] end)
        
        %{
          sensor_type: sensor_type,
          count: length(type_messages),
          avg_value: Enum.sum(values) / length(values),
          avg_compressed_size: Enum.sum(compressed_sizes) / length(compressed_sizes),
          min_value: Enum.min(values),
          max_value: Enum.max(values)
        }
      end)
    
    # Log analytics
    Enum.each(analytics, fn stats ->
      IO.puts("   πŸ“Š #{stats.sensor_type}: #{stats.count} pts, avg=#{Float.round(stats.avg_value, 1)}, size=#{Float.round(stats.avg_compressed_size, 1)}B")
    end)
    
    messages
  end
  
  defp store_compressed_sensor_data(data) do
    # Simulate database storage
    # In reality: Database.insert_compressed_reading(data)
    :ok
  end

  # Test producer for demo purposes
  defmodule TestProducer do
    use GenStage
    
    def init(_opts) do
      {:producer, %{}}
    end
    
    def handle_demand(_demand, state) do
      # Wait for test messages
      {:noreply, [], state}
    end
    
    def handle_info({:test_message, message}, state) do
      {:noreply, [message], state}
    end
  end
end

Let’s start the basic Broadway pipeline:

# Define sensor configuration
basic_sensors = [
  {"temp_01", %{type: :temperature, base_value: 22.0}},
  {"humidity_01", %{type: :humidity, base_value: 65.0}},
  {"pressure_01", %{type: :pressure, base_value: 1013.25}}
]

# Start the Broadway pipeline
{:ok, _broadway_pid} = SensorDataBroadway.start_link(sensors: basic_sensors)

IO.puts("πŸš€ Basic Broadway pipeline started!")
IO.puts("   Producer: 50 messages/sec from 3 sensors")
IO.puts("   Processors: 4 stages processing individual messages")
IO.puts("   Batchers: compressed_storage (20 msgs) + analytics (50 msgs)")

# Let the pipeline process messages
Process.sleep(8000)

IO.puts("\nβœ… Basic Broadway pipeline demonstration completed")

2. Advanced Multi-Tenant IoT Pipeline

Now let’s build a more sophisticated pipeline that handles multiple IoT tenants with different processing requirements:

defmodule MultiTenantBroadway do
  use Broadway
  
  def start_link(opts) do
    Broadway.start_link(__MODULE__,
      name: :multitenant_broadway,
      producer: [
        module: {__MODULE__.IoTMessageProducer, opts},
        stages: 2
      ],
      processors: [
        default: [stages: 8, min_demand: 3, max_demand: 15]
      ],
      batchers: [
        tenant_a_storage: [stages: 2, batch_size: 25, batch_timeout: 1500],
        tenant_b_storage: [stages: 2, batch_size: 15, batch_timeout: 3000],
        tenant_c_storage: [stages: 1, batch_size: 100, batch_timeout: 5000],
        real_time_alerts: [stages: 3, batch_size: 1, batch_timeout: 100],  # Individual processing
        analytics_aggregation: [stages: 2, batch_size: 200, batch_timeout: 10000]
      ]
    )
  end

  def handle_message(processor, message, _context) do
    case Jason.decode(message.data) do
      {:ok, iot_data} ->
        process_iot_message(message, iot_data)
        
      {:error, reason} ->
        IO.puts("❌ Failed to decode IoT message: #{reason}")
        Broadway.Message.failed(message, reason)
    end
  end

  defp process_iot_message(message, iot_data) do
    tenant_id = iot_data["tenant_id"]
    device_id = iot_data["device_id"]
    measurements = iot_data["measurements"]
    
    # Compress all measurements for this device
    compressed_measurements = compress_device_measurements(measurements)
    
    case compressed_measurements do
      {:ok, compression_results} ->
        # Enrich with tenant-specific processing
        enriched_data = %{
          "tenant_id" => tenant_id,
          "device_id" => device_id,
          "original_measurements" => measurements,
          "compressed_results" => compression_results,
          "processing_timestamp" => System.system_time(:millisecond),
          "total_points" => length(measurements),
          "total_compressed_size" => Enum.sum(Enum.map(compression_results, &amp; &amp;1["compressed_size"]))
        }
        
        # Route to appropriate batcher based on tenant and alert conditions
        message_with_data = Broadway.Message.put_data(message, enriched_data)
        
        cond do
          # Check for alert conditions
          has_alert_conditions?(measurements) ->
            Broadway.Message.put_batcher(message_with_data, :real_time_alerts)
            
          # Route by tenant storage preferences
          tenant_id == "tenant_a" ->
            Broadway.Message.put_batcher(message_with_data, :tenant_a_storage)
            
          tenant_id == "tenant_b" ->
            Broadway.Message.put_batcher(message_with_data, :tenant_b_storage)
            
          tenant_id == "tenant_c" ->
            Broadway.Message.put_batcher(message_with_data, :tenant_c_storage)
            
          true ->
            Broadway.Message.put_batcher(message_with_data, :analytics_aggregation)
        end
        
      {:error, reason} ->
        IO.puts("❌ Failed to compress measurements for #{device_id}: #{reason}")
        Broadway.Message.failed(message, reason)
    end
  end

  defp compress_device_measurements(measurements) do
    # Group measurements by sensor type for better compression
    grouped_measurements = Enum.group_by(measurements, &amp; &amp;1["sensor_type"])
    
    compression_results = 
      Enum.map(grouped_measurements, fn {sensor_type, sensor_measurements} ->
        # Convert to GorillaStream format
        data_points = Enum.map(sensor_measurements, fn m ->
          {m["timestamp"], m["value"]}
        end)
        
        case GorillaStream.compress(data_points, true) do  # Use zlib for better compression
          {:ok, compressed} ->
            original_size = length(data_points) * 16
            
            %{
              "sensor_type" => sensor_type,
              "points_count" => length(data_points),
              "compressed_data" => Base.encode64(compressed),
              "compressed_size" => byte_size(compressed),
              "original_size" => original_size,
              "compression_ratio" => original_size / byte_size(compressed)
            }
            
          {:error, reason} ->
            %{"sensor_type" => sensor_type, "error" => reason}
        end
      end)
    
    # Check if all compressions succeeded
    failed_compressions = Enum.filter(compression_results, &amp;Map.has_key?(&amp;1, "error"))
    
    if length(failed_compressions) == 0 do
      {:ok, compression_results}
    else
      {:error, "Some sensor compressions failed: #{inspect(failed_compressions)}"}
    end
  end

  defp has_alert_conditions?(measurements) do
    # Check for anomalous values that require immediate attention
    Enum.any?(measurements, fn measurement ->
      case measurement["sensor_type"] do
        "temperature" -> measurement["value"] > 80.0 or measurement["value"] < -20.0
        "humidity" -> measurement["value"] > 95.0 or measurement["value"] < 5.0
        "pressure" -> measurement["value"] > 1050.0 or measurement["value"] < 950.0
        _ -> false
      end
    end)
  end

  # Tenant-specific batch handlers
  def handle_batch(:tenant_a_storage, messages, batch_info, _context) do
    IO.puts("🏒 Tenant A storage: #{length(messages)} devices (#{batch_info.trigger})")
    
    # Tenant A prefers individual device files with high compression
    Enum.each(messages, fn message ->
      data = message.data
      tenant_id = data["tenant_id"]
      device_id = data["device_id"]
      
      # Store each device's compressed data separately
      filename = "#{tenant_id}/#{device_id}/#{System.system_time(:second)}.gorilla"
      total_size = data["total_compressed_size"]
      points = data["total_points"]
      
      # Simulate file storage
      # File.write!(filename, compressed_data)
      
      IO.puts("   πŸ“ Stored #{device_id}: #{points} points -> #{total_size}B in #{filename}")
    end)
    
    messages
  end

  def handle_batch(:tenant_b_storage, messages, batch_info, _context) do
    IO.puts("🏒 Tenant B storage: #{length(messages)} devices (#{batch_info.trigger})")
    
    # Tenant B prefers batch database inserts
    total_devices = length(messages)
    total_points = Enum.sum(Enum.map(messages, fn msg -> msg.data["total_points"] end))
    total_size = Enum.sum(Enum.map(messages, fn msg -> msg.data["total_compressed_size"] end))
    
    # Simulate database batch insert
    # Database.batch_insert_compressed_data(messages)
    
    IO.puts("   πŸ’Ύ Batch DB insert: #{total_devices} devices, #{total_points} points, #{total_size}B total")
    
    messages
  end

  def handle_batch(:tenant_c_storage, messages, batch_info, _context) do
    IO.puts("🏒 Tenant C storage: #{length(messages)} devices (#{batch_info.trigger})")
    
    # Tenant C uses time-series database with custom aggregation
    by_hour = Enum.group_by(messages, fn msg ->
      # Group by hour for time-series optimization
      timestamp = div(msg.data["processing_timestamp"], 3600000)  # Hour buckets
      timestamp
    end)
    
    Enum.each(by_hour, fn {hour_bucket, hour_messages} ->
      hour_points = Enum.sum(Enum.map(hour_messages, fn msg -> msg.data["total_points"] end))
      hour_size = Enum.sum(Enum.map(hour_messages, fn msg -> msg.data["total_compressed_size"] end))
      
      # Simulate time-series database insert
      # TimeSeriesDB.insert_hour_bucket(hour_bucket, hour_messages)
      
      IO.puts("   ⏰ Hour #{hour_bucket}: #{length(hour_messages)} devices, #{hour_points} points, #{hour_size}B")
    end)
    
    messages
  end

  def handle_batch(:real_time_alerts, messages, _batch_info, _context) do
    IO.puts("🚨 Real-time alerts: #{length(messages)} alert conditions detected!")
    
    Enum.each(messages, fn message ->
      data = message.data
      alert_measurements = Enum.filter(data["original_measurements"], &amp;has_alert_value?/1)
      
      Enum.each(alert_measurements, fn measurement ->
        IO.puts("   ⚠️  ALERT: #{data["device_id"]} #{measurement["sensor_type"]} = #{measurement["value"]}")
        
        # Simulate alert notification
        # AlertService.send_alert(data["tenant_id"], data["device_id"], measurement)
      end)
    end)
    
    messages
  end

  def handle_batch(:analytics_aggregation, messages, batch_info, _context) do
    IO.puts("πŸ“Š Analytics aggregation: #{length(messages)} devices (#{batch_info.trigger})")
    
    # Aggregate analytics across all tenants and devices
    analytics = calculate_batch_analytics(messages)
    
    IO.puts("   πŸ“ˆ Batch analytics:")
    IO.puts("     β€’ Total devices: #{analytics.total_devices}")
    IO.puts("     β€’ Total data points: #{analytics.total_points}")
    IO.puts("     β€’ Average compression ratio: #{Float.round(analytics.avg_compression_ratio, 2)}:1")
    IO.puts("     β€’ Storage efficiency: #{Float.round(analytics.storage_efficiency, 1)}%")
    
    # Group by tenant for tenant-specific analytics
    tenant_analytics = 
      messages
      |> Enum.group_by(fn msg -> msg.data["tenant_id"] end)
      |> Enum.map(fn {tenant_id, tenant_messages} ->
        tenant_points = Enum.sum(Enum.map(tenant_messages, fn msg -> msg.data["total_points"] end))
        tenant_size = Enum.sum(Enum.map(tenant_messages, fn msg -> msg.data["total_compressed_size"] end))
        
        {tenant_id, %{devices: length(tenant_messages), points: tenant_points, size: tenant_size}}
      end)
    
    Enum.each(tenant_analytics, fn {tenant_id, stats} ->
      IO.puts("     β€’ #{tenant_id}: #{stats.devices} devices, #{stats.points} pts, #{stats.size}B")
    end)
    
    messages
  end

  defp has_alert_value?(measurement) do
    case measurement["sensor_type"] do
      "temperature" -> measurement["value"] > 80.0 or measurement["value"] < -20.0
      "humidity" -> measurement["value"] > 95.0 or measurement["value"] < 5.0
      "pressure" -> measurement["value"] > 1050.0 or measurement["value"] < 950.0
      _ -> false
    end
  end

  defp calculate_batch_analytics(messages) do
    total_devices = length(messages)
    total_points = Enum.sum(Enum.map(messages, fn msg -> msg.data["total_points"] end))
    total_compressed = Enum.sum(Enum.map(messages, fn msg -> msg.data["total_compressed_size"] end))
    total_original = total_points * 16  # Approximate original size
    
    %{
      total_devices: total_devices,
      total_points: total_points,
      total_compressed_size: total_compressed,
      avg_compression_ratio: if(total_compressed > 0, do: total_original / total_compressed, else: 0),
      storage_efficiency: if(total_original > 0, do: (total_original - total_compressed) / total_original * 100, else: 0)
    }
  end

  # IoT Message Producer
  defmodule IoTMessageProducer do
    use GenStage
    
    def init(opts) do
      tenants = Keyword.get(opts, :tenants, [])
      message_rate = Keyword.get(opts, :message_rate, 30)
      
      # Schedule IoT message generation
      interval = max(50, div(1000, message_rate))
      :timer.send_interval(interval, self(), :generate_iot_message)
      
      state = %{
        tenants: tenants,
        message_counter: 0
      }
      
      IO.puts("πŸ“‘ IoTMessageProducer started with #{length(tenants)} tenants @ #{message_rate} msgs/sec")
      Process.register(self(), :iot_producer)
      {:producer, state}
    end
    
    def handle_demand(_demand, state) do
      {:noreply, [], state}
    end
    
    def handle_info(:generate_iot_message, state) do
      # Generate IoT message for random tenant
      tenant = Enum.random(state.tenants)
      device = Enum.random(tenant.devices)
      
      # Generate multiple sensor measurements per device message
      measurements = generate_device_measurements(device, state.message_counter)
      
      message_data = %{
        tenant_id: tenant.id,
        device_id: device.id,
        device_type: device.type,
        measurements: measurements,
        message_id: state.message_counter,
        generated_at: System.system_time(:millisecond)
      }
      
      broadway_message = %Broadway.Message{
        data: Jason.encode!(message_data),
        acknowledger: {__MODULE__, :ack_id, :ack_data}
      }
      
      send(:multitenant_broadway, {:test_message, broadway_message})
      
      new_state = %{state | message_counter: state.message_counter + 1}
      {:noreply, [], new_state}
    end
    
    defp generate_device_measurements(device, counter) do
      # Generate 3-5 measurements per device message
      num_measurements = 3 + rem(counter, 3)
      base_timestamp = System.system_time(:second)
      
      for i <- 0..(num_measurements - 1) do
        sensor_type = Enum.random(device.sensors)
        timestamp = base_timestamp - i * 60  # Past measurements
        value = generate_sensor_value(sensor_type, counter + i)
        
        %{
          "sensor_type" => Atom.to_string(sensor_type),
          "timestamp" => timestamp,
          "value" => value,
          "measurement_id" => "#{device.id}_#{sensor_type}_#{timestamp}"
        }
      end
    end
    
    defp generate_sensor_value(:temperature, counter) do
      base = 25.0 + 10 * :math.sin(counter / 100)
      noise = (:rand.uniform() - 0.5) * 5.0
      # Occasionally generate alert conditions
      if rem(counter, 50) == 0, do: 85.0, else: Float.round(base + noise, 2)
    end
    
    defp generate_sensor_value(:humidity, counter) do
      base = 60.0 + 20 * :math.cos(counter / 80)
      noise = (:rand.uniform() - 0.5) * 10.0
      value = base + noise
      # Occasionally generate alert conditions
      if rem(counter, 75) == 0, do: 98.0, else: Float.round(max(0, min(100, value)), 2)
    end
    
    defp generate_sensor_value(:pressure, counter) do
      base = 1013.25 + 15 * :math.sin(counter / 200)
      noise = (:rand.uniform() - 0.5) * 5.0
      # Occasionally generate alert conditions
      if rem(counter, 100) == 0, do: 1055.0, else: Float.round(base + noise, 2)
    end
    
    defp generate_sensor_value(:vibration, counter) do
      base = 0.5 + 0.3 * :math.sin(counter / 50)
      noise = (:rand.uniform() - 0.5) * 0.2
      Float.round(max(0, base + noise), 3)
    end
    
    # Broadway acknowledger callbacks
    def ack(:ack_id, _successful, _failed), do: :ok
  end
  
  # Handle test messages from producer
  def handle_info({:test_message, message}, state) do
    {:noreply, [message], state}
  end
end

Let’s configure and start the multi-tenant pipeline:

# Define multi-tenant IoT configuration
tenants = [
  %{
    id: "tenant_a",
    name: "Manufacturing Corp",
    devices: [
      %{id: "factory_01", type: :industrial, sensors: [:temperature, :pressure, :vibration]},
      %{id: "factory_02", type: :industrial, sensors: [:temperature, :pressure, :vibration]},
      %{id: "warehouse_01", type: :environmental, sensors: [:temperature, :humidity]}
    ]
  },
  %{
    id: "tenant_b", 
    name: "Smart Building Solutions",
    devices: [
      %{id: "building_a_floor_1", type: :hvac, sensors: [:temperature, :humidity, :pressure]},
      %{id: "building_a_floor_2", type: :hvac, sensors: [:temperature, :humidity, :pressure]},
      %{id: "building_b_lobby", type: :environmental, sensors: [:temperature, :humidity]}
    ]
  },
  %{
    id: "tenant_c",
    name: "Agricultural Monitoring",
    devices: [
      %{id: "greenhouse_01", type: :agricultural, sensors: [:temperature, :humidity]},
      %{id: "greenhouse_02", type: :agricultural, sensors: [:temperature, :humidity]},
      %{id: "field_station_01", type: :weather, sensors: [:temperature, :humidity, :pressure]}
    ]
  }
]

# Start the multi-tenant Broadway pipeline
{:ok, _multitenant_pid} = MultiTenantBroadway.start_link(tenants: tenants, message_rate: 20)

IO.puts("πŸš€ Multi-tenant Broadway pipeline started!")
IO.puts("   Tenants: #{length(tenants)} with #{Enum.sum(Enum.map(tenants, fn t -> length(t.devices) end))} total devices")
IO.puts("   Producer: 20 IoT messages/sec with multi-sensor measurements")
IO.puts("   Processors: 8 stages with adaptive demand (3-15 messages)")
IO.puts("   Batchers: Tenant-specific storage + real-time alerts + analytics")

# Let the multi-tenant pipeline process messages
Process.sleep(15000)

IO.puts("\nβœ… Multi-tenant Broadway pipeline demonstration completed")

3. Fault-Tolerant Pipeline with Retry Logic

Let’s build a pipeline that demonstrates comprehensive fault tolerance and retry mechanisms:

defmodule FaultTolerantBroadway do
  use Broadway
  
  def start_link(opts) do
    Broadway.start_link(__MODULE__,
      name: :fault_tolerant_broadway,
      producer: [
        module: {__MODULE__.FaultyMessageProducer, opts},
        stages: 1
      ],
      processors: [
        default: [stages: 6, min_demand: 2, max_demand: 8]
      ],
      batchers: [
        successful_storage: [stages: 2, batch_size: 15, batch_timeout: 2000],
        failed_retry_queue: [stages: 1, batch_size: 5, batch_timeout: 1000],
        permanent_failures: [stages: 1, batch_size: 10, batch_timeout: 5000]
      ],
      # Configure fault tolerance
      context: %{
        retry_attempts: 3,
        compression_failures: 0,
        recovery_successes: 0
      }
    )
  end

  def handle_message(processor, message, context) do
    case Jason.decode(message.data) do
      {:ok, sensor_data} ->
        process_with_fault_tolerance(message, sensor_data, context)
        
      {:error, reason} ->
        IO.puts("❌ JSON decode failed: #{reason}")
        # Route to permanent failures - can't recover from invalid JSON
        message
        |> Broadway.Message.put_data(%{"error" => "json_decode_failed", "reason" => reason})
        |> Broadway.Message.put_batcher(:permanent_failures)
    end
  end

  defp process_with_fault_tolerance(message, sensor_data, context) do
    retry_count = get_retry_count(message)
    max_retries = context.retry_attempts
    
    # Simulate some messages being inherently faulty
    is_faulty_data = Map.get(sensor_data, "faulty", false)
    
    if is_faulty_data and retry_count == 0 do
      IO.puts("⚠️  Processing faulty sensor data (will attempt recovery)")
    end
    
    case attempt_compression(sensor_data, retry_count) do
      {:ok, compression_result} ->
        # Success - route to storage
        enriched_data = Map.merge(sensor_data, %{
          "compressed_data" => Base.encode64(compression_result.compressed),
          "compressed_size" => compression_result.size,
          "compression_ratio" => compression_result.ratio,
          "retry_count" => retry_count,
          "processing_timestamp" => System.system_time(:millisecond)
        })
        
        if retry_count > 0 do
          IO.puts("βœ… Recovery success after #{retry_count} retries for sensor #{sensor_data["sensor_id"]}")
        end
        
        message
        |> Broadway.Message.put_data(enriched_data)
        |> Broadway.Message.put_batcher(:successful_storage)
        
      {:error, :temporary_failure, reason} when retry_count < max_retries ->
        # Temporary failure - retry
        IO.puts("πŸ”„ Temporary failure (attempt #{retry_count + 1}/#{max_retries}): #{reason}")
        
        retry_data = Map.merge(sensor_data, %{
          "retry_count" => retry_count + 1,
          "last_error" => reason,
          "retry_timestamp" => System.system_time(:millisecond)
        })
        
        message
        |> Broadway.Message.put_data(retry_data)
        |> Broadway.Message.put_batcher(:failed_retry_queue)
        
      {:error, :permanent_failure, reason} ->
        # Permanent failure - no point in retrying
        IO.puts("πŸ’€ Permanent failure for sensor #{sensor_data["sensor_id"]}: #{reason}")
        
        failure_data = Map.merge(sensor_data, %{
          "error_type" => "permanent_failure",
          "error_reason" => reason,
          "retry_count" => retry_count,
          "failed_timestamp" => System.system_time(:millisecond)
        })
        
        message
        |> Broadway.Message.put_data(failure_data)
        |> Broadway.Message.put_batcher(:permanent_failures)
        
      {:error, :temporary_failure, reason} ->
        # Exceeded max retries - treat as permanent failure
        IO.puts("πŸ’€ Max retries exceeded for sensor #{sensor_data["sensor_id"]}: #{reason}")
        
        failure_data = Map.merge(sensor_data, %{
          "error_type" => "max_retries_exceeded",
          "error_reason" => reason,
          "retry_count" => retry_count,
          "failed_timestamp" => System.system_time(:millisecond)
        })
        
        message
        |> Broadway.Message.put_data(failure_data)
        |> Broadway.Message.put_batcher(:permanent_failures)
    end
  end

  defp attempt_compression(sensor_data, retry_count) do
    # Simulate different failure modes
    sensor_id = sensor_data["sensor_id"]
    is_faulty = Map.get(sensor_data, "faulty", false)
    
    cond do
      # Permanent failure cases
      is_nil(sensor_data["timestamp"]) ->
        {:error, :permanent_failure, "missing_timestamp"}
        
      is_nil(sensor_data["value"]) ->
        {:error, :permanent_failure, "missing_value"}
        
      not is_number(sensor_data["value"]) ->
        {:error, :permanent_failure, "invalid_value_type"}
        
      # Temporary failure that can be recovered with retry
      is_faulty and retry_count < 2 ->
        {:error, :temporary_failure, "sensor_interference"}
        
      # Network simulation failure (recoverable)
      String.contains?(sensor_id, "unstable") and retry_count == 0 ->
        {:error, :temporary_failure, "network_timeout"}
        
      # Success case (including recovery after retries)
      true ->
        data_point = {sensor_data["timestamp"], sensor_data["value"]}
        
        case GorillaStream.compress([data_point], true) do
          {:ok, compressed} ->
            {:ok, %{
              compressed: compressed,
              size: byte_size(compressed),
              ratio: 16 / byte_size(compressed)
            }}
            
          {:error, reason} ->
            # GorillaStream errors are typically permanent
            {:error, :permanent_failure, "compression_failed: #{reason}"}
        end
    end
  end

  defp get_retry_count(message) do
    case message.data do
      %{"retry_count" => count} when is_integer(count) -> count
      _ -> 0
    end
  end

  def handle_batch(:successful_storage, messages, batch_info, _context) do
    successful_count = length(messages)
    retry_recoveries = Enum.count(messages, fn msg -> msg.data["retry_count"] > 0 end)
    
    IO.puts("πŸ’Ύ Successful storage: #{successful_count} messages (#{retry_recoveries} recovered) [#{batch_info.trigger}]")
    
    # Calculate storage statistics
    total_points = successful_count
    total_size = Enum.sum(Enum.map(messages, fn msg -> msg.data["compressed_size"] end))
    avg_compression = if successful_count > 0, do: total_size / successful_count, else: 0
    
    IO.puts("   πŸ“Š Storage stats: #{total_points} points, #{total_size}B total, #{Float.round(avg_compression, 1)}B avg")
    
    if retry_recoveries > 0 do
      IO.puts("   🎯 Recovery rate: #{retry_recoveries}/#{successful_count} (#{Float.round(retry_recoveries/successful_count*100, 1)}%)")
    end
    
    messages
  end

  def handle_batch(:failed_retry_queue, messages, batch_info, context) do
    IO.puts("πŸ”„ Retry queue: #{length(messages)} messages for retry processing [#{batch_info.trigger}]")
    
    # Re-inject messages back into the pipeline for retry
    Enum.each(messages, fn message ->
      # Create new message for retry
      retry_message = %Broadway.Message{
        data: Jason.encode!(message.data),
        acknowledger: {__MODULE__, :ack_id, :ack_data}
      }
      
      # Send back to pipeline after a brief delay
      Process.send_after(:fault_tolerant_broadway, {:retry_message, retry_message}, 500)
    end)
    
    IO.puts("   ⏰ Scheduled #{length(messages)} messages for retry in 500ms")
    
    messages
  end

  def handle_batch(:permanent_failures, messages, batch_info, _context) do
    IO.puts("πŸ’€ Permanent failures: #{length(messages)} messages [#{batch_info.trigger}]")
    
    # Categorize failure types
    failure_types = 
      messages
      |> Enum.group_by(fn msg -> msg.data["error_type"] || "unknown" end)
      |> Enum.map(fn {type, type_messages} -> {type, length(type_messages)} end)
    
    IO.puts("   πŸ“Š Failure breakdown:")
    Enum.each(failure_types, fn {type, count} ->
      IO.puts("     β€’ #{type}: #{count} messages")
    end)
    
    # In a real system, you might:
    # - Store failures in a dead letter queue
    # - Send alerts to operations team
    # - Generate failure reports
    
    messages
  end

  # Handle retry messages
  def handle_info({:retry_message, message}, state) do
    {:noreply, [message], state}
  end

  # Faulty Message Producer for testing fault tolerance
  defmodule FaultyMessageProducer do
    use GenStage
    
    def init(opts) do
      failure_rate = Keyword.get(opts, :failure_rate, 0.3)  # 30% failure rate
      message_rate = Keyword.get(opts, :message_rate, 15)
      
      # Schedule message generation
      interval = max(50, div(1000, message_rate))
      :timer.send_interval(interval, self(), :generate_message)
      
      state = %{
        message_counter: 0,
        failure_rate: failure_rate
      }
      
      IO.puts("πŸ“‘ FaultyMessageProducer started (failure rate: #{Float.round(failure_rate * 100, 1)}%)")
      {:producer, state}
    end
    
    def handle_demand(_demand, state) do
      {:noreply, [], state}
    end
    
    def handle_info(:generate_message, state) do
      # Generate message with potential faults
      is_faulty = :rand.uniform() < state.failure_rate
      
      sensor_id = if is_faulty do
        # Create different types of faulty sensors
        case rem(state.message_counter, 4) do
          0 -> "unstable_sensor_#{rem(state.message_counter, 3)}"
          1 -> "faulty_sensor_#{rem(state.message_counter, 2)}"
          2 -> "interference_sensor_#{rem(state.message_counter, 3)}"  
          _ -> "temp_failure_sensor_#{rem(state.message_counter, 2)}"
        end
      else
        "stable_sensor_#{rem(state.message_counter, 5)}"
      end
      
      message_data = case is_faulty do
        true ->
          case rem(state.message_counter, 5) do
            0 -> %{  # Missing timestamp
              "sensor_id" => sensor_id,
              "value" => 25.0 + (:rand.uniform() - 0.5) * 10,
              "faulty" => true,
              "fault_type" => "missing_timestamp"
            }
            1 -> %{  # Missing value
              "sensor_id" => sensor_id,
              "timestamp" => System.system_time(:second),
              "faulty" => true,
              "fault_type" => "missing_value"
            }
            2 -> %{  # Invalid value type
              "sensor_id" => sensor_id,
              "timestamp" => System.system_time(:second),
              "value" => "not_a_number",
              "faulty" => true,
              "fault_type" => "invalid_value"
            }
            _ -> %{  # Temporary network/sensor interference
              "sensor_id" => sensor_id,
              "timestamp" => System.system_time(:second),
              "value" => 25.0 + (:rand.uniform() - 0.5) * 10,
              "faulty" => true,
              "fault_type" => "temporary_interference"
            }
          end
          
        false -> %{
          "sensor_id" => sensor_id,
          "timestamp" => System.system_time(:second),
          "value" => Float.round(25.0 + (:rand.uniform() - 0.5) * 10, 2),
          "faulty" => false
        }
      end
      
      broadway_message = %Broadway.Message{
        data: Jason.encode!(message_data),
        acknowledger: {__MODULE__, :ack_id, :ack_data}
      }
      
      send(:fault_tolerant_broadway, {:test_message, broadway_message})
      
      new_state = %{state | message_counter: state.message_counter + 1}
      {:noreply, [], new_state}
    end
    
    # Broadway acknowledger callbacks
    def ack(:ack_id, _successful, _failed), do: :ok
  end
  
  # Handle test messages from producer
  def handle_info({:test_message, message}, state) do
    {:noreply, [message], state}
  end
end

Let’s start the fault-tolerant pipeline:

# Start the fault-tolerant Broadway pipeline
{:ok, _fault_tolerant_pid} = FaultTolerantBroadway.start_link(failure_rate: 0.4, message_rate: 12)

IO.puts("πŸš€ Fault-tolerant Broadway pipeline started!")
IO.puts("   Producer: 40% failure rate, 12 messages/sec")
IO.puts("   Processors: 6 stages with retry logic (max 3 attempts)")
IO.puts("   Batchers: successful_storage, failed_retry_queue, permanent_failures")
IO.puts("   Fault tolerance: Automatic retry for temporary failures")

# Let the fault-tolerant pipeline demonstrate recovery
Process.sleep(20000)

IO.puts("\nβœ… Fault-tolerant Broadway pipeline demonstration completed")

Summary

This notebook demonstrated comprehensive Broadway integration patterns with GorillaStream:

  1. Basic Pipeline: Simple sensor data processing with compression and analytics batching
  2. Multi-Tenant Pipeline: Complex routing and processing for different tenant requirements
  3. Fault-Tolerant Pipeline: Comprehensive error handling with retry mechanisms and recovery strategies

Key Broadway Benefits with GorillaStream:

  • Scalable Processing: Automatic concurrency management across multiple processor stages
  • Intelligent Batching: Route messages to different batchers based on content, tenant, or urgency
  • Fault Tolerance: Built-in retry mechanisms with configurable backoff strategies
  • Backpressure Control: Automatic demand management prevents system overload
  • Observability: Rich telemetry and monitoring hooks for production visibility

Production Patterns Demonstrated:

  • Tenant Isolation: Route data to tenant-specific storage and processing pipelines
  • Alert Processing: Immediate routing of urgent data through real-time alert batchers
  • Compression Optimization: Bulk compression of grouped sensor data for efficiency
  • Error Recovery: Multi-stage retry logic with temporary vs permanent failure classification
  • Analytics Aggregation: Batch processing for metrics and monitoring data

Performance Characteristics:

  • Throughput: Handles hundreds of messages per second with compression
  • Latency: Sub-second processing for individual messages, configurable batching delays
  • Memory Usage: Efficient buffering with controlled batch sizes
  • Error Rate: Graceful degradation with comprehensive error categorization

Next Steps for Production:

  • Integrate with real message queues (RabbitMQ, Apache Kafka, AWS SQS)
  • Add comprehensive telemetry and metrics collection
  • Implement database storage and time-series optimizations
  • Add authentication and authorization for multi-tenant security
  • Scale horizontally with Broadway cluster coordination
  • Monitor compression ratios and storage efficiency over time

Broadway + GorillaStream provides a powerful foundation for building production-grade streaming data pipelines that can handle high-volume IoT data with excellent compression efficiency and fault tolerance.