GorillaStream: Broadway Processing Pipeline
Mix.install([
{:gorilla_stream, "~> 1.1"},
{:broadway, "~> 1.0"},
{:gen_stage, "~> 1.2"},
{:jason, "~> 1.4"}
])
Introduction
This notebook demonstrates how to build production-grade streaming data pipelines using GorillaStream with Broadway. Broadway provides:
- Scalable message processing with automatic concurrency management
- Built-in batching for efficient bulk operations
- Fault tolerance with configurable retry mechanisms
- Backpressure handling and rate limiting
- Comprehensive telemetry and monitoring hooks
Youβll learn to build pipelines for:
- High-throughput sensor data processing
- Multi-tenant IoT data streams
- Real-time analytics with compression
- Fault-tolerant message processing
1. Basic Broadway Pipeline for Sensor Data
Letβs start with a simple Broadway pipeline that processes sensor data messages:
defmodule SensorDataProducer do
@behaviour Broadway.Producer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
sensor_configs = Keyword.get(opts, :sensors, [])
message_rate = Keyword.get(opts, :message_rate, 100) # messages per second
# Schedule message generation
interval = max(1, div(1000, message_rate))
:timer.send_interval(interval, self(), :generate_message)
state = %{
sensors: sensor_configs,
message_counter: 0,
current_sensor_index: 0
}
IO.puts("π‘ SensorDataProducer started with #{length(sensor_configs)} sensors @ #{message_rate} msgs/sec")
{:ok, state}
end
def handle_info(:generate_message, state) do
# Round-robin through sensors
sensor_index = rem(state.current_sensor_index, length(state.sensors))
{sensor_id, sensor_config} = Enum.at(state.sensors, sensor_index)
# Generate sensor reading
timestamp = System.system_time(:second)
value = generate_sensor_reading(sensor_config, state.message_counter)
message_data = %{
sensor_id: sensor_id,
sensor_type: sensor_config.type,
timestamp: timestamp,
value: value,
message_id: state.message_counter,
generated_at: System.system_time(:millisecond)
}
# Send message to Broadway pipeline
broadway_message = %Broadway.Message{
data: Jason.encode!(message_data),
acknowledger: {__MODULE__, :ack_id, :ack_data}
}
# In a real scenario, you'd send this to a message queue
# For demo, we'll simulate by sending to the Broadway process
send(:sensor_broadway, {:test_message, broadway_message})
new_state = %{
state |
message_counter: state.message_counter + 1,
current_sensor_index: state.current_sensor_index + 1
}
{:noreply, new_state}
end
defp generate_sensor_reading(config, counter) do
base_value = config.base_value
case config.type do
:temperature ->
# Simulate daily temperature cycle
daily_pattern = 5 * :math.sin(counter / 1440) # 24-hour cycle
noise = (:rand.uniform() - 0.5) * 2.0
Float.round(base_value + daily_pattern + noise, 2)
:humidity ->
# Simulate humidity variations
pattern = 15 * :math.cos(counter / 720) # 12-hour cycle
noise = (:rand.uniform() - 0.5) * 5.0
value = base_value + pattern + noise
Float.round(max(0, min(100, value)), 2) # Clamp to 0-100%
:pressure ->
# Simulate atmospheric pressure changes
pattern = 20 * :math.sin(counter / 2880) # 48-hour cycle
noise = (:rand.uniform() - 0.5) * 3.0
Float.round(base_value + pattern + noise, 2)
_ ->
base_value + (:rand.uniform() - 0.5) * 10.0
end
end
# Broadway acknowledger callbacks (simplified for demo)
def ack(:ack_id, _successful, _failed), do: :ok
end
defmodule SensorDataBroadway do
use Broadway
def start_link(opts) do
sensor_configs = Keyword.get(opts, :sensors, [])
# Start the message producer
{:ok, _producer} = SensorDataProducer.start_link(sensors: sensor_configs, message_rate: 50)
Broadway.start_link(__MODULE__,
name: :sensor_broadway,
producer: [
module: {__MODULE__.TestProducer, []},
stages: 1
],
processors: [
default: [stages: 4, min_demand: 5, max_demand: 10]
],
batchers: [
compressed_storage: [stages: 2, batch_size: 20, batch_timeout: 2000],
analytics: [stages: 1, batch_size: 50, batch_timeout: 1000]
]
)
end
def handle_message(processor, message, _context) do
case Jason.decode(message.data) do
{:ok, sensor_data} ->
# Compress the sensor data point
data_point = {sensor_data["timestamp"], sensor_data["value"]}
case GorillaStream.compress([data_point]) do
{:ok, compressed} ->
# Enrich message with compression information
enriched_data = Map.merge(sensor_data, %{
"compressed_data" => Base.encode64(compressed),
"compressed_size" => byte_size(compressed),
"compression_ratio" => 16 / byte_size(compressed),
"processed_at" => System.system_time(:millisecond)
})
message
|> Broadway.Message.put_data(enriched_data)
|> Broadway.Message.put_batcher(:compressed_storage)
{:error, reason} ->
IO.puts("β Compression failed for sensor #{sensor_data["sensor_id"]}: #{reason}")
Broadway.Message.failed(message, reason)
end
{:error, reason} ->
IO.puts("β JSON decode failed: #{reason}")
Broadway.Message.failed(message, reason)
end
end
def handle_batch(:compressed_storage, messages, _batch_info, _context) do
IO.puts("πΎ Storing batch of #{length(messages)} compressed sensor readings")
# Process compressed data storage
Enum.each(messages, fn message ->
data = message.data
# Simulate storing compressed data to database/file system
store_compressed_sensor_data(data)
# Log compression stats occasionally
if rem(System.system_time(:millisecond), 5000) < 100 do
IO.puts(" π #{data["sensor_id"]}: #{data["value"]} -> #{data["compressed_size"]}B (#{Float.round(data["compression_ratio"], 2)}:1)")
end
end)
messages
end
def handle_batch(:analytics, messages, _batch_info, _context) do
IO.puts("π Processing analytics batch of #{length(messages)} sensor readings")
# Aggregate analytics by sensor type
analytics =
messages
|> Enum.group_by(fn msg -> msg.data["sensor_type"] end)
|> Enum.map(fn {sensor_type, type_messages} ->
values = Enum.map(type_messages, fn msg -> msg.data["value"] end)
compressed_sizes = Enum.map(type_messages, fn msg -> msg.data["compressed_size"] end)
%{
sensor_type: sensor_type,
count: length(type_messages),
avg_value: Enum.sum(values) / length(values),
avg_compressed_size: Enum.sum(compressed_sizes) / length(compressed_sizes),
min_value: Enum.min(values),
max_value: Enum.max(values)
}
end)
# Log analytics
Enum.each(analytics, fn stats ->
IO.puts(" π #{stats.sensor_type}: #{stats.count} pts, avg=#{Float.round(stats.avg_value, 1)}, size=#{Float.round(stats.avg_compressed_size, 1)}B")
end)
messages
end
defp store_compressed_sensor_data(data) do
# Simulate database storage
# In reality: Database.insert_compressed_reading(data)
:ok
end
# Test producer for demo purposes
defmodule TestProducer do
use GenStage
def init(_opts) do
{:producer, %{}}
end
def handle_demand(_demand, state) do
# Wait for test messages
{:noreply, [], state}
end
def handle_info({:test_message, message}, state) do
{:noreply, [message], state}
end
end
end
Letβs start the basic Broadway pipeline:
# Define sensor configuration
basic_sensors = [
{"temp_01", %{type: :temperature, base_value: 22.0}},
{"humidity_01", %{type: :humidity, base_value: 65.0}},
{"pressure_01", %{type: :pressure, base_value: 1013.25}}
]
# Start the Broadway pipeline
{:ok, _broadway_pid} = SensorDataBroadway.start_link(sensors: basic_sensors)
IO.puts("π Basic Broadway pipeline started!")
IO.puts(" Producer: 50 messages/sec from 3 sensors")
IO.puts(" Processors: 4 stages processing individual messages")
IO.puts(" Batchers: compressed_storage (20 msgs) + analytics (50 msgs)")
# Let the pipeline process messages
Process.sleep(8000)
IO.puts("\nβ
Basic Broadway pipeline demonstration completed")
2. Advanced Multi-Tenant IoT Pipeline
Now letβs build a more sophisticated pipeline that handles multiple IoT tenants with different processing requirements:
defmodule MultiTenantBroadway do
use Broadway
def start_link(opts) do
Broadway.start_link(__MODULE__,
name: :multitenant_broadway,
producer: [
module: {__MODULE__.IoTMessageProducer, opts},
stages: 2
],
processors: [
default: [stages: 8, min_demand: 3, max_demand: 15]
],
batchers: [
tenant_a_storage: [stages: 2, batch_size: 25, batch_timeout: 1500],
tenant_b_storage: [stages: 2, batch_size: 15, batch_timeout: 3000],
tenant_c_storage: [stages: 1, batch_size: 100, batch_timeout: 5000],
real_time_alerts: [stages: 3, batch_size: 1, batch_timeout: 100], # Individual processing
analytics_aggregation: [stages: 2, batch_size: 200, batch_timeout: 10000]
]
)
end
def handle_message(processor, message, _context) do
case Jason.decode(message.data) do
{:ok, iot_data} ->
process_iot_message(message, iot_data)
{:error, reason} ->
IO.puts("β Failed to decode IoT message: #{reason}")
Broadway.Message.failed(message, reason)
end
end
defp process_iot_message(message, iot_data) do
tenant_id = iot_data["tenant_id"]
device_id = iot_data["device_id"]
measurements = iot_data["measurements"]
# Compress all measurements for this device
compressed_measurements = compress_device_measurements(measurements)
case compressed_measurements do
{:ok, compression_results} ->
# Enrich with tenant-specific processing
enriched_data = %{
"tenant_id" => tenant_id,
"device_id" => device_id,
"original_measurements" => measurements,
"compressed_results" => compression_results,
"processing_timestamp" => System.system_time(:millisecond),
"total_points" => length(measurements),
"total_compressed_size" => Enum.sum(Enum.map(compression_results, & &1["compressed_size"]))
}
# Route to appropriate batcher based on tenant and alert conditions
message_with_data = Broadway.Message.put_data(message, enriched_data)
cond do
# Check for alert conditions
has_alert_conditions?(measurements) ->
Broadway.Message.put_batcher(message_with_data, :real_time_alerts)
# Route by tenant storage preferences
tenant_id == "tenant_a" ->
Broadway.Message.put_batcher(message_with_data, :tenant_a_storage)
tenant_id == "tenant_b" ->
Broadway.Message.put_batcher(message_with_data, :tenant_b_storage)
tenant_id == "tenant_c" ->
Broadway.Message.put_batcher(message_with_data, :tenant_c_storage)
true ->
Broadway.Message.put_batcher(message_with_data, :analytics_aggregation)
end
{:error, reason} ->
IO.puts("β Failed to compress measurements for #{device_id}: #{reason}")
Broadway.Message.failed(message, reason)
end
end
defp compress_device_measurements(measurements) do
# Group measurements by sensor type for better compression
grouped_measurements = Enum.group_by(measurements, & &1["sensor_type"])
compression_results =
Enum.map(grouped_measurements, fn {sensor_type, sensor_measurements} ->
# Convert to GorillaStream format
data_points = Enum.map(sensor_measurements, fn m ->
{m["timestamp"], m["value"]}
end)
case GorillaStream.compress(data_points, true) do # Use zlib for better compression
{:ok, compressed} ->
original_size = length(data_points) * 16
%{
"sensor_type" => sensor_type,
"points_count" => length(data_points),
"compressed_data" => Base.encode64(compressed),
"compressed_size" => byte_size(compressed),
"original_size" => original_size,
"compression_ratio" => original_size / byte_size(compressed)
}
{:error, reason} ->
%{"sensor_type" => sensor_type, "error" => reason}
end
end)
# Check if all compressions succeeded
failed_compressions = Enum.filter(compression_results, &Map.has_key?(&1, "error"))
if length(failed_compressions) == 0 do
{:ok, compression_results}
else
{:error, "Some sensor compressions failed: #{inspect(failed_compressions)}"}
end
end
defp has_alert_conditions?(measurements) do
# Check for anomalous values that require immediate attention
Enum.any?(measurements, fn measurement ->
case measurement["sensor_type"] do
"temperature" -> measurement["value"] > 80.0 or measurement["value"] < -20.0
"humidity" -> measurement["value"] > 95.0 or measurement["value"] < 5.0
"pressure" -> measurement["value"] > 1050.0 or measurement["value"] < 950.0
_ -> false
end
end)
end
# Tenant-specific batch handlers
def handle_batch(:tenant_a_storage, messages, batch_info, _context) do
IO.puts("π’ Tenant A storage: #{length(messages)} devices (#{batch_info.trigger})")
# Tenant A prefers individual device files with high compression
Enum.each(messages, fn message ->
data = message.data
tenant_id = data["tenant_id"]
device_id = data["device_id"]
# Store each device's compressed data separately
filename = "#{tenant_id}/#{device_id}/#{System.system_time(:second)}.gorilla"
total_size = data["total_compressed_size"]
points = data["total_points"]
# Simulate file storage
# File.write!(filename, compressed_data)
IO.puts(" π Stored #{device_id}: #{points} points -> #{total_size}B in #{filename}")
end)
messages
end
def handle_batch(:tenant_b_storage, messages, batch_info, _context) do
IO.puts("π’ Tenant B storage: #{length(messages)} devices (#{batch_info.trigger})")
# Tenant B prefers batch database inserts
total_devices = length(messages)
total_points = Enum.sum(Enum.map(messages, fn msg -> msg.data["total_points"] end))
total_size = Enum.sum(Enum.map(messages, fn msg -> msg.data["total_compressed_size"] end))
# Simulate database batch insert
# Database.batch_insert_compressed_data(messages)
IO.puts(" πΎ Batch DB insert: #{total_devices} devices, #{total_points} points, #{total_size}B total")
messages
end
def handle_batch(:tenant_c_storage, messages, batch_info, _context) do
IO.puts("π’ Tenant C storage: #{length(messages)} devices (#{batch_info.trigger})")
# Tenant C uses time-series database with custom aggregation
by_hour = Enum.group_by(messages, fn msg ->
# Group by hour for time-series optimization
timestamp = div(msg.data["processing_timestamp"], 3600000) # Hour buckets
timestamp
end)
Enum.each(by_hour, fn {hour_bucket, hour_messages} ->
hour_points = Enum.sum(Enum.map(hour_messages, fn msg -> msg.data["total_points"] end))
hour_size = Enum.sum(Enum.map(hour_messages, fn msg -> msg.data["total_compressed_size"] end))
# Simulate time-series database insert
# TimeSeriesDB.insert_hour_bucket(hour_bucket, hour_messages)
IO.puts(" β° Hour #{hour_bucket}: #{length(hour_messages)} devices, #{hour_points} points, #{hour_size}B")
end)
messages
end
def handle_batch(:real_time_alerts, messages, _batch_info, _context) do
IO.puts("π¨ Real-time alerts: #{length(messages)} alert conditions detected!")
Enum.each(messages, fn message ->
data = message.data
alert_measurements = Enum.filter(data["original_measurements"], &has_alert_value?/1)
Enum.each(alert_measurements, fn measurement ->
IO.puts(" β οΈ ALERT: #{data["device_id"]} #{measurement["sensor_type"]} = #{measurement["value"]}")
# Simulate alert notification
# AlertService.send_alert(data["tenant_id"], data["device_id"], measurement)
end)
end)
messages
end
def handle_batch(:analytics_aggregation, messages, batch_info, _context) do
IO.puts("π Analytics aggregation: #{length(messages)} devices (#{batch_info.trigger})")
# Aggregate analytics across all tenants and devices
analytics = calculate_batch_analytics(messages)
IO.puts(" π Batch analytics:")
IO.puts(" β’ Total devices: #{analytics.total_devices}")
IO.puts(" β’ Total data points: #{analytics.total_points}")
IO.puts(" β’ Average compression ratio: #{Float.round(analytics.avg_compression_ratio, 2)}:1")
IO.puts(" β’ Storage efficiency: #{Float.round(analytics.storage_efficiency, 1)}%")
# Group by tenant for tenant-specific analytics
tenant_analytics =
messages
|> Enum.group_by(fn msg -> msg.data["tenant_id"] end)
|> Enum.map(fn {tenant_id, tenant_messages} ->
tenant_points = Enum.sum(Enum.map(tenant_messages, fn msg -> msg.data["total_points"] end))
tenant_size = Enum.sum(Enum.map(tenant_messages, fn msg -> msg.data["total_compressed_size"] end))
{tenant_id, %{devices: length(tenant_messages), points: tenant_points, size: tenant_size}}
end)
Enum.each(tenant_analytics, fn {tenant_id, stats} ->
IO.puts(" β’ #{tenant_id}: #{stats.devices} devices, #{stats.points} pts, #{stats.size}B")
end)
messages
end
defp has_alert_value?(measurement) do
case measurement["sensor_type"] do
"temperature" -> measurement["value"] > 80.0 or measurement["value"] < -20.0
"humidity" -> measurement["value"] > 95.0 or measurement["value"] < 5.0
"pressure" -> measurement["value"] > 1050.0 or measurement["value"] < 950.0
_ -> false
end
end
defp calculate_batch_analytics(messages) do
total_devices = length(messages)
total_points = Enum.sum(Enum.map(messages, fn msg -> msg.data["total_points"] end))
total_compressed = Enum.sum(Enum.map(messages, fn msg -> msg.data["total_compressed_size"] end))
total_original = total_points * 16 # Approximate original size
%{
total_devices: total_devices,
total_points: total_points,
total_compressed_size: total_compressed,
avg_compression_ratio: if(total_compressed > 0, do: total_original / total_compressed, else: 0),
storage_efficiency: if(total_original > 0, do: (total_original - total_compressed) / total_original * 100, else: 0)
}
end
# IoT Message Producer
defmodule IoTMessageProducer do
use GenStage
def init(opts) do
tenants = Keyword.get(opts, :tenants, [])
message_rate = Keyword.get(opts, :message_rate, 30)
# Schedule IoT message generation
interval = max(50, div(1000, message_rate))
:timer.send_interval(interval, self(), :generate_iot_message)
state = %{
tenants: tenants,
message_counter: 0
}
IO.puts("π‘ IoTMessageProducer started with #{length(tenants)} tenants @ #{message_rate} msgs/sec")
Process.register(self(), :iot_producer)
{:producer, state}
end
def handle_demand(_demand, state) do
{:noreply, [], state}
end
def handle_info(:generate_iot_message, state) do
# Generate IoT message for random tenant
tenant = Enum.random(state.tenants)
device = Enum.random(tenant.devices)
# Generate multiple sensor measurements per device message
measurements = generate_device_measurements(device, state.message_counter)
message_data = %{
tenant_id: tenant.id,
device_id: device.id,
device_type: device.type,
measurements: measurements,
message_id: state.message_counter,
generated_at: System.system_time(:millisecond)
}
broadway_message = %Broadway.Message{
data: Jason.encode!(message_data),
acknowledger: {__MODULE__, :ack_id, :ack_data}
}
send(:multitenant_broadway, {:test_message, broadway_message})
new_state = %{state | message_counter: state.message_counter + 1}
{:noreply, [], new_state}
end
defp generate_device_measurements(device, counter) do
# Generate 3-5 measurements per device message
num_measurements = 3 + rem(counter, 3)
base_timestamp = System.system_time(:second)
for i <- 0..(num_measurements - 1) do
sensor_type = Enum.random(device.sensors)
timestamp = base_timestamp - i * 60 # Past measurements
value = generate_sensor_value(sensor_type, counter + i)
%{
"sensor_type" => Atom.to_string(sensor_type),
"timestamp" => timestamp,
"value" => value,
"measurement_id" => "#{device.id}_#{sensor_type}_#{timestamp}"
}
end
end
defp generate_sensor_value(:temperature, counter) do
base = 25.0 + 10 * :math.sin(counter / 100)
noise = (:rand.uniform() - 0.5) * 5.0
# Occasionally generate alert conditions
if rem(counter, 50) == 0, do: 85.0, else: Float.round(base + noise, 2)
end
defp generate_sensor_value(:humidity, counter) do
base = 60.0 + 20 * :math.cos(counter / 80)
noise = (:rand.uniform() - 0.5) * 10.0
value = base + noise
# Occasionally generate alert conditions
if rem(counter, 75) == 0, do: 98.0, else: Float.round(max(0, min(100, value)), 2)
end
defp generate_sensor_value(:pressure, counter) do
base = 1013.25 + 15 * :math.sin(counter / 200)
noise = (:rand.uniform() - 0.5) * 5.0
# Occasionally generate alert conditions
if rem(counter, 100) == 0, do: 1055.0, else: Float.round(base + noise, 2)
end
defp generate_sensor_value(:vibration, counter) do
base = 0.5 + 0.3 * :math.sin(counter / 50)
noise = (:rand.uniform() - 0.5) * 0.2
Float.round(max(0, base + noise), 3)
end
# Broadway acknowledger callbacks
def ack(:ack_id, _successful, _failed), do: :ok
end
# Handle test messages from producer
def handle_info({:test_message, message}, state) do
{:noreply, [message], state}
end
end
Letβs configure and start the multi-tenant pipeline:
# Define multi-tenant IoT configuration
tenants = [
%{
id: "tenant_a",
name: "Manufacturing Corp",
devices: [
%{id: "factory_01", type: :industrial, sensors: [:temperature, :pressure, :vibration]},
%{id: "factory_02", type: :industrial, sensors: [:temperature, :pressure, :vibration]},
%{id: "warehouse_01", type: :environmental, sensors: [:temperature, :humidity]}
]
},
%{
id: "tenant_b",
name: "Smart Building Solutions",
devices: [
%{id: "building_a_floor_1", type: :hvac, sensors: [:temperature, :humidity, :pressure]},
%{id: "building_a_floor_2", type: :hvac, sensors: [:temperature, :humidity, :pressure]},
%{id: "building_b_lobby", type: :environmental, sensors: [:temperature, :humidity]}
]
},
%{
id: "tenant_c",
name: "Agricultural Monitoring",
devices: [
%{id: "greenhouse_01", type: :agricultural, sensors: [:temperature, :humidity]},
%{id: "greenhouse_02", type: :agricultural, sensors: [:temperature, :humidity]},
%{id: "field_station_01", type: :weather, sensors: [:temperature, :humidity, :pressure]}
]
}
]
# Start the multi-tenant Broadway pipeline
{:ok, _multitenant_pid} = MultiTenantBroadway.start_link(tenants: tenants, message_rate: 20)
IO.puts("π Multi-tenant Broadway pipeline started!")
IO.puts(" Tenants: #{length(tenants)} with #{Enum.sum(Enum.map(tenants, fn t -> length(t.devices) end))} total devices")
IO.puts(" Producer: 20 IoT messages/sec with multi-sensor measurements")
IO.puts(" Processors: 8 stages with adaptive demand (3-15 messages)")
IO.puts(" Batchers: Tenant-specific storage + real-time alerts + analytics")
# Let the multi-tenant pipeline process messages
Process.sleep(15000)
IO.puts("\nβ
Multi-tenant Broadway pipeline demonstration completed")
3. Fault-Tolerant Pipeline with Retry Logic
Letβs build a pipeline that demonstrates comprehensive fault tolerance and retry mechanisms:
defmodule FaultTolerantBroadway do
use Broadway
def start_link(opts) do
Broadway.start_link(__MODULE__,
name: :fault_tolerant_broadway,
producer: [
module: {__MODULE__.FaultyMessageProducer, opts},
stages: 1
],
processors: [
default: [stages: 6, min_demand: 2, max_demand: 8]
],
batchers: [
successful_storage: [stages: 2, batch_size: 15, batch_timeout: 2000],
failed_retry_queue: [stages: 1, batch_size: 5, batch_timeout: 1000],
permanent_failures: [stages: 1, batch_size: 10, batch_timeout: 5000]
],
# Configure fault tolerance
context: %{
retry_attempts: 3,
compression_failures: 0,
recovery_successes: 0
}
)
end
def handle_message(processor, message, context) do
case Jason.decode(message.data) do
{:ok, sensor_data} ->
process_with_fault_tolerance(message, sensor_data, context)
{:error, reason} ->
IO.puts("β JSON decode failed: #{reason}")
# Route to permanent failures - can't recover from invalid JSON
message
|> Broadway.Message.put_data(%{"error" => "json_decode_failed", "reason" => reason})
|> Broadway.Message.put_batcher(:permanent_failures)
end
end
defp process_with_fault_tolerance(message, sensor_data, context) do
retry_count = get_retry_count(message)
max_retries = context.retry_attempts
# Simulate some messages being inherently faulty
is_faulty_data = Map.get(sensor_data, "faulty", false)
if is_faulty_data and retry_count == 0 do
IO.puts("β οΈ Processing faulty sensor data (will attempt recovery)")
end
case attempt_compression(sensor_data, retry_count) do
{:ok, compression_result} ->
# Success - route to storage
enriched_data = Map.merge(sensor_data, %{
"compressed_data" => Base.encode64(compression_result.compressed),
"compressed_size" => compression_result.size,
"compression_ratio" => compression_result.ratio,
"retry_count" => retry_count,
"processing_timestamp" => System.system_time(:millisecond)
})
if retry_count > 0 do
IO.puts("β
Recovery success after #{retry_count} retries for sensor #{sensor_data["sensor_id"]}")
end
message
|> Broadway.Message.put_data(enriched_data)
|> Broadway.Message.put_batcher(:successful_storage)
{:error, :temporary_failure, reason} when retry_count < max_retries ->
# Temporary failure - retry
IO.puts("π Temporary failure (attempt #{retry_count + 1}/#{max_retries}): #{reason}")
retry_data = Map.merge(sensor_data, %{
"retry_count" => retry_count + 1,
"last_error" => reason,
"retry_timestamp" => System.system_time(:millisecond)
})
message
|> Broadway.Message.put_data(retry_data)
|> Broadway.Message.put_batcher(:failed_retry_queue)
{:error, :permanent_failure, reason} ->
# Permanent failure - no point in retrying
IO.puts("π Permanent failure for sensor #{sensor_data["sensor_id"]}: #{reason}")
failure_data = Map.merge(sensor_data, %{
"error_type" => "permanent_failure",
"error_reason" => reason,
"retry_count" => retry_count,
"failed_timestamp" => System.system_time(:millisecond)
})
message
|> Broadway.Message.put_data(failure_data)
|> Broadway.Message.put_batcher(:permanent_failures)
{:error, :temporary_failure, reason} ->
# Exceeded max retries - treat as permanent failure
IO.puts("π Max retries exceeded for sensor #{sensor_data["sensor_id"]}: #{reason}")
failure_data = Map.merge(sensor_data, %{
"error_type" => "max_retries_exceeded",
"error_reason" => reason,
"retry_count" => retry_count,
"failed_timestamp" => System.system_time(:millisecond)
})
message
|> Broadway.Message.put_data(failure_data)
|> Broadway.Message.put_batcher(:permanent_failures)
end
end
defp attempt_compression(sensor_data, retry_count) do
# Simulate different failure modes
sensor_id = sensor_data["sensor_id"]
is_faulty = Map.get(sensor_data, "faulty", false)
cond do
# Permanent failure cases
is_nil(sensor_data["timestamp"]) ->
{:error, :permanent_failure, "missing_timestamp"}
is_nil(sensor_data["value"]) ->
{:error, :permanent_failure, "missing_value"}
not is_number(sensor_data["value"]) ->
{:error, :permanent_failure, "invalid_value_type"}
# Temporary failure that can be recovered with retry
is_faulty and retry_count < 2 ->
{:error, :temporary_failure, "sensor_interference"}
# Network simulation failure (recoverable)
String.contains?(sensor_id, "unstable") and retry_count == 0 ->
{:error, :temporary_failure, "network_timeout"}
# Success case (including recovery after retries)
true ->
data_point = {sensor_data["timestamp"], sensor_data["value"]}
case GorillaStream.compress([data_point], true) do
{:ok, compressed} ->
{:ok, %{
compressed: compressed,
size: byte_size(compressed),
ratio: 16 / byte_size(compressed)
}}
{:error, reason} ->
# GorillaStream errors are typically permanent
{:error, :permanent_failure, "compression_failed: #{reason}"}
end
end
end
defp get_retry_count(message) do
case message.data do
%{"retry_count" => count} when is_integer(count) -> count
_ -> 0
end
end
def handle_batch(:successful_storage, messages, batch_info, _context) do
successful_count = length(messages)
retry_recoveries = Enum.count(messages, fn msg -> msg.data["retry_count"] > 0 end)
IO.puts("πΎ Successful storage: #{successful_count} messages (#{retry_recoveries} recovered) [#{batch_info.trigger}]")
# Calculate storage statistics
total_points = successful_count
total_size = Enum.sum(Enum.map(messages, fn msg -> msg.data["compressed_size"] end))
avg_compression = if successful_count > 0, do: total_size / successful_count, else: 0
IO.puts(" π Storage stats: #{total_points} points, #{total_size}B total, #{Float.round(avg_compression, 1)}B avg")
if retry_recoveries > 0 do
IO.puts(" π― Recovery rate: #{retry_recoveries}/#{successful_count} (#{Float.round(retry_recoveries/successful_count*100, 1)}%)")
end
messages
end
def handle_batch(:failed_retry_queue, messages, batch_info, context) do
IO.puts("π Retry queue: #{length(messages)} messages for retry processing [#{batch_info.trigger}]")
# Re-inject messages back into the pipeline for retry
Enum.each(messages, fn message ->
# Create new message for retry
retry_message = %Broadway.Message{
data: Jason.encode!(message.data),
acknowledger: {__MODULE__, :ack_id, :ack_data}
}
# Send back to pipeline after a brief delay
Process.send_after(:fault_tolerant_broadway, {:retry_message, retry_message}, 500)
end)
IO.puts(" β° Scheduled #{length(messages)} messages for retry in 500ms")
messages
end
def handle_batch(:permanent_failures, messages, batch_info, _context) do
IO.puts("π Permanent failures: #{length(messages)} messages [#{batch_info.trigger}]")
# Categorize failure types
failure_types =
messages
|> Enum.group_by(fn msg -> msg.data["error_type"] || "unknown" end)
|> Enum.map(fn {type, type_messages} -> {type, length(type_messages)} end)
IO.puts(" π Failure breakdown:")
Enum.each(failure_types, fn {type, count} ->
IO.puts(" β’ #{type}: #{count} messages")
end)
# In a real system, you might:
# - Store failures in a dead letter queue
# - Send alerts to operations team
# - Generate failure reports
messages
end
# Handle retry messages
def handle_info({:retry_message, message}, state) do
{:noreply, [message], state}
end
# Faulty Message Producer for testing fault tolerance
defmodule FaultyMessageProducer do
use GenStage
def init(opts) do
failure_rate = Keyword.get(opts, :failure_rate, 0.3) # 30% failure rate
message_rate = Keyword.get(opts, :message_rate, 15)
# Schedule message generation
interval = max(50, div(1000, message_rate))
:timer.send_interval(interval, self(), :generate_message)
state = %{
message_counter: 0,
failure_rate: failure_rate
}
IO.puts("π‘ FaultyMessageProducer started (failure rate: #{Float.round(failure_rate * 100, 1)}%)")
{:producer, state}
end
def handle_demand(_demand, state) do
{:noreply, [], state}
end
def handle_info(:generate_message, state) do
# Generate message with potential faults
is_faulty = :rand.uniform() < state.failure_rate
sensor_id = if is_faulty do
# Create different types of faulty sensors
case rem(state.message_counter, 4) do
0 -> "unstable_sensor_#{rem(state.message_counter, 3)}"
1 -> "faulty_sensor_#{rem(state.message_counter, 2)}"
2 -> "interference_sensor_#{rem(state.message_counter, 3)}"
_ -> "temp_failure_sensor_#{rem(state.message_counter, 2)}"
end
else
"stable_sensor_#{rem(state.message_counter, 5)}"
end
message_data = case is_faulty do
true ->
case rem(state.message_counter, 5) do
0 -> %{ # Missing timestamp
"sensor_id" => sensor_id,
"value" => 25.0 + (:rand.uniform() - 0.5) * 10,
"faulty" => true,
"fault_type" => "missing_timestamp"
}
1 -> %{ # Missing value
"sensor_id" => sensor_id,
"timestamp" => System.system_time(:second),
"faulty" => true,
"fault_type" => "missing_value"
}
2 -> %{ # Invalid value type
"sensor_id" => sensor_id,
"timestamp" => System.system_time(:second),
"value" => "not_a_number",
"faulty" => true,
"fault_type" => "invalid_value"
}
_ -> %{ # Temporary network/sensor interference
"sensor_id" => sensor_id,
"timestamp" => System.system_time(:second),
"value" => 25.0 + (:rand.uniform() - 0.5) * 10,
"faulty" => true,
"fault_type" => "temporary_interference"
}
end
false -> %{
"sensor_id" => sensor_id,
"timestamp" => System.system_time(:second),
"value" => Float.round(25.0 + (:rand.uniform() - 0.5) * 10, 2),
"faulty" => false
}
end
broadway_message = %Broadway.Message{
data: Jason.encode!(message_data),
acknowledger: {__MODULE__, :ack_id, :ack_data}
}
send(:fault_tolerant_broadway, {:test_message, broadway_message})
new_state = %{state | message_counter: state.message_counter + 1}
{:noreply, [], new_state}
end
# Broadway acknowledger callbacks
def ack(:ack_id, _successful, _failed), do: :ok
end
# Handle test messages from producer
def handle_info({:test_message, message}, state) do
{:noreply, [message], state}
end
end
Letβs start the fault-tolerant pipeline:
# Start the fault-tolerant Broadway pipeline
{:ok, _fault_tolerant_pid} = FaultTolerantBroadway.start_link(failure_rate: 0.4, message_rate: 12)
IO.puts("π Fault-tolerant Broadway pipeline started!")
IO.puts(" Producer: 40% failure rate, 12 messages/sec")
IO.puts(" Processors: 6 stages with retry logic (max 3 attempts)")
IO.puts(" Batchers: successful_storage, failed_retry_queue, permanent_failures")
IO.puts(" Fault tolerance: Automatic retry for temporary failures")
# Let the fault-tolerant pipeline demonstrate recovery
Process.sleep(20000)
IO.puts("\nβ
Fault-tolerant Broadway pipeline demonstration completed")
Summary
This notebook demonstrated comprehensive Broadway integration patterns with GorillaStream:
- Basic Pipeline: Simple sensor data processing with compression and analytics batching
- Multi-Tenant Pipeline: Complex routing and processing for different tenant requirements
- Fault-Tolerant Pipeline: Comprehensive error handling with retry mechanisms and recovery strategies
Key Broadway Benefits with GorillaStream:
- Scalable Processing: Automatic concurrency management across multiple processor stages
- Intelligent Batching: Route messages to different batchers based on content, tenant, or urgency
- Fault Tolerance: Built-in retry mechanisms with configurable backoff strategies
- Backpressure Control: Automatic demand management prevents system overload
- Observability: Rich telemetry and monitoring hooks for production visibility
Production Patterns Demonstrated:
- Tenant Isolation: Route data to tenant-specific storage and processing pipelines
- Alert Processing: Immediate routing of urgent data through real-time alert batchers
- Compression Optimization: Bulk compression of grouped sensor data for efficiency
- Error Recovery: Multi-stage retry logic with temporary vs permanent failure classification
- Analytics Aggregation: Batch processing for metrics and monitoring data
Performance Characteristics:
- Throughput: Handles hundreds of messages per second with compression
- Latency: Sub-second processing for individual messages, configurable batching delays
- Memory Usage: Efficient buffering with controlled batch sizes
- Error Rate: Graceful degradation with comprehensive error categorization
Next Steps for Production:
- Integrate with real message queues (RabbitMQ, Apache Kafka, AWS SQS)
- Add comprehensive telemetry and metrics collection
- Implement database storage and time-series optimizations
- Add authentication and authorization for multi-tenant security
- Scale horizontally with Broadway cluster coordination
- Monitor compression ratios and storage efficiency over time
Broadway + GorillaStream provides a powerful foundation for building production-grade streaming data pipelines that can handle high-volume IoT data with excellent compression efficiency and fault tolerance.