GorillaStream: Streaming & Processing Patterns
Mix.install([
{:gorilla_stream, "~> 1.1"},
{:jason, "~> 1.4"}
])
Introduction
This notebook explores advanced streaming and processing patterns with GorillaStream. You’ll learn how to handle:
- Real-time streaming (individual points)
- Chunked processing (large datasets)
- Memory-efficient file processing
- Adaptive processing (switching between streaming and chunking)
- IoT data pipeline patterns
1. Real-time Streaming (Individual Points)
Perfect for processing sensor data as it arrives, one point at a time:
defmodule SensorStreaming do
def start_sensor_stream(sensor_id, duration_seconds \\ 30) do
IO.puts("Starting real-time sensor stream for #{sensor_id}...")
# Simulate real-time sensor data stream
Stream.unfold({DateTime.utc_now(), 0}, fn {start_time, count} ->
if count < duration_seconds do
current_time = DateTime.add(start_time, count, :second)
timestamp = DateTime.to_unix(current_time)
# Simulate sensor reading with some realistic variation
base_value = 22.5
variation = :math.sin(count / 10) * 2 + (:rand.uniform() - 0.5) * 0.5
value = base_value + variation
data_point = {timestamp, Float.round(value, 2)}
{data_point, {start_time, count + 1}}
else
nil
end
end)
|> Stream.map(&compress_point/1)
|> Stream.each(&store_compressed_point/1)
|> Enum.to_list() # Force evaluation for demo
end
defp compress_point(data_point) do
# Compress single point as it streams in
case GorillaStream.compress([data_point]) do
{:ok, compressed} ->
{timestamp, value} = data_point
compression_ratio = 16 / byte_size(compressed) # Approximate original size
%{
timestamp: timestamp,
value: value,
compressed: compressed,
compressed_size: byte_size(compressed),
compression_ratio: Float.round(compression_ratio, 2),
status: :success
}
{:error, reason} ->
%{
data_point: data_point,
error: reason,
status: :error
}
end
end
defp store_compressed_point(result) do
case result.status do
:success ->
IO.puts("✓ Streamed point #{result.timestamp}:#{result.value} -> #{result.compressed_size}B (#{result.compression_ratio}:1)")
:error ->
IO.puts("✗ Failed to compress point #{inspect(result.data_point)}: #{result.error}")
end
result
end
end
# Run the real-time streaming demo
streaming_results = SensorStreaming.start_sensor_stream("sensor_001", 10)
# Analyze streaming results
successful_streams = Enum.filter(streaming_results, &(&1.status == :success))
failed_streams = Enum.filter(streaming_results, &(&1.status == :error))
IO.puts("\n--- Streaming Summary ---")
IO.puts("Total points processed: #{length(streaming_results)}")
IO.puts("Successful compressions: #{length(successful_streams)}")
IO.puts("Failed compressions: #{length(failed_streams)}")
if length(successful_streams) > 0 do
avg_compression_ratio =
successful_streams
|> Enum.map(& &1.compression_ratio)
|> Enum.sum()
|> Kernel./(length(successful_streams))
avg_size =
successful_streams
|> Enum.map(& &1.compressed_size)
|> Enum.sum()
|> Kernel./(length(successful_streams))
IO.puts("Average compression ratio: #{Float.round(avg_compression_ratio, 2)}:1")
IO.puts("Average compressed size: #{Float.round(avg_size, 1)} bytes")
end
2. Chunked Processing (Large Datasets)
Efficient for processing large datasets by grouping points into chunks:
alias GorillaStream.Stream, as: GStream
defmodule ChunkedProcessor do
def process_large_dataset(num_points, chunk_size \\ 10_000) do
IO.puts("Processing #{num_points} points in chunks of #{chunk_size}...")
# Generate large dataset as a stream
large_dataset = Stream.unfold(1609459200, fn timestamp ->
if timestamp < 1609459200 + num_points * 60 do
# Simulate realistic sensor data with patterns
point_index = div(timestamp - 1609459200, 60)
base_temp = 20.0 + 5 * :math.sin(point_index / 144) # Daily cycle
noise = (:rand.uniform() - 0.5) * 1.0
value = base_temp + noise
point = {timestamp, Float.round(value, 2)}
{point, timestamp + 60} # Every minute
else
nil
end
end)
# Process in chunks using GorillaStream's streaming API
start_time = :os.system_time(:microsecond)
compressed_chunks =
large_dataset
|> Stream.chunk_every(chunk_size)
|> Stream.with_index()
|> Stream.map(fn {chunk, index} ->
chunk_start = :os.system_time(:microsecond)
case GorillaStream.compress(chunk) do
{:ok, compressed} ->
chunk_time = :os.system_time(:microsecond) - chunk_start
# Calculate chunk statistics
original_size = length(chunk) * 16
compression_ratio = original_size / byte_size(compressed)
points_per_sec = length(chunk) / (chunk_time / 1_000_000)
{first_timestamp, _} = List.first(chunk)
{last_timestamp, _} = List.last(chunk)
{
:ok,
compressed,
%{
chunk_index: index,
original_points: length(chunk),
compressed_size: byte_size(compressed),
compression_ratio: Float.round(compression_ratio, 2),
processing_time_us: chunk_time,
points_per_second: Float.round(points_per_sec, 0),
timestamp_range: {first_timestamp, last_timestamp}
}
}
{:error, reason} ->
{:error, reason, %{chunk_index: index, points: length(chunk)}}
end
end)
|> Enum.to_list()
total_time = :os.system_time(:microsecond) - start_time
# Analyze results
successful_chunks = Enum.filter(compressed_chunks, fn
{:ok, _, _} -> true
_ -> false
end)
total_points_processed =
successful_chunks
|> Enum.map(fn {:ok, _, metadata} -> metadata.original_points end)
|> Enum.sum()
total_compressed_size =
successful_chunks
|> Enum.map(fn {:ok, compressed, _} -> byte_size(compressed) end)
|> Enum.sum()
overall_compression_ratio = (total_points_processed * 16) / total_compressed_size
overall_throughput = total_points_processed / (total_time / 1_000_000)
IO.puts("\n--- Chunked Processing Summary ---")
IO.puts("Total chunks processed: #{length(compressed_chunks)}")
IO.puts("Successful chunks: #{length(successful_chunks)}")
IO.puts("Total points processed: #{total_points_processed}")
IO.puts("Total compressed size: #{total_compressed_size} bytes")
IO.puts("Overall compression ratio: #{Float.round(overall_compression_ratio, 2)}:1")
IO.puts("Overall throughput: #{Float.round(overall_throughput, 0)} points/sec")
IO.puts("Total processing time: #{Float.round(total_time / 1_000_000, 2)} seconds")
# Display per-chunk details
IO.puts("\n--- Per-Chunk Details ---")
Enum.each(successful_chunks, fn {:ok, _, metadata} ->
IO.puts("Chunk #{metadata.chunk_index}: #{metadata.original_points} pts -> #{metadata.compressed_size}B (#{metadata.compression_ratio}:1) @ #{metadata.points_per_second} pts/sec")
end)
compressed_chunks
end
end
# Run chunked processing demo
chunked_results = ChunkedProcessor.process_large_dataset(50_000, 5_000)
3. Memory-Efficient Large File Processing
Process huge files without loading everything into memory:
defmodule LargeFileProcessor do
def simulate_csv_processing(num_points, chunk_size \\ 50_000) do
IO.puts("Simulating CSV file processing with #{num_points} points...")
# Simulate reading from a large CSV file
csv_stream = Stream.unfold(1, fn
line_num when line_num <= num_points ->
timestamp = 1609459200 + line_num * 60
# Simulate different sensor types with different patterns
sensor_type = rem(line_num, 3)
value = case sensor_type do
0 -> 20.0 + :math.sin(line_num / 100) * 5 # Temperature
1 -> 50.0 + (:rand.uniform() - 0.5) * 10 # Humidity
2 -> 1013.25 + :math.sin(line_num / 200) * 20 # Pressure
end
csv_line = "#{line_num},#{timestamp},sensor_#{sensor_type},#{Float.round(value, 2)}\n"
{csv_line, line_num + 1}
_ -> nil
end)
# Process CSV stream without loading entire file into memory
start_time = :os.system_time(:microsecond)
processed_files =
csv_stream
|> Stream.map(&parse_csv_line/1)
|> Stream.reject(&is_nil/1) # Filter invalid lines
|> Stream.chunk_every(chunk_size)
|> Stream.with_index()
|> Stream.map(fn {chunk, index} ->
# Compress each chunk and simulate saving to separate files
case GorillaStream.compress(chunk) do
{:ok, compressed} ->
filename = "compressed_chunk_#{index}.gorilla"
# In real scenario: File.write!(filename, compressed)
original_size = length(chunk) * 16
compression_ratio = original_size / byte_size(compressed)
{
:ok,
%{
filename: filename,
chunk_index: index,
points: length(chunk),
compressed_size: byte_size(compressed),
compression_ratio: Float.round(compression_ratio, 2)
}
}
{:error, reason} ->
{:error, reason, index}
end
end)
|> Enum.to_list()
processing_time = :os.system_time(:microsecond) - start_time
# Simulate loading compressed chunks back
successful_files = Enum.filter(processed_files, fn
{:ok, _} -> true
_ -> false
end)
IO.puts("\n--- File Processing Summary ---")
IO.puts("Processing time: #{Float.round(processing_time / 1_000_000, 2)} seconds")
IO.puts("Files created: #{length(successful_files)}")
total_points =
successful_files
|> Enum.map(fn {:ok, info} -> info.points end)
|> Enum.sum()
total_compressed =
successful_files
|> Enum.map(fn {:ok, info} -> info.compressed_size end)
|> Enum.sum()
IO.puts("Total points processed: #{total_points}")
IO.puts("Total compressed size: #{total_compressed} bytes")
IO.puts("Processing rate: #{Float.round(total_points / (processing_time / 1_000_000), 0)} points/sec")
# Show file details
IO.puts("\n--- Generated Files ---")
Enum.each(successful_files, fn {:ok, info} ->
IO.puts("#{info.filename}: #{info.points} points -> #{info.compressed_size}B (#{info.compression_ratio}:1)")
end)
processed_files
end
# Simulate decompressing and loading chunks back
def simulate_load_compressed_chunks(processed_files) do
IO.puts("\nSimulating loading compressed chunks back...")
successful_files = Enum.filter(processed_files, fn
{:ok, _} -> true
_ -> false
end)
# In real scenario, we would read the files and decompress
# For demo, we'll simulate the process
loaded_points =
successful_files
|> Stream.map(fn {:ok, info} ->
# Simulate: compressed = File.read!(info.filename)
# {:ok, data} = GorillaStream.decompress(compressed)
# For demo, just return the point count
info.points
end)
|> Enum.sum()
IO.puts("Successfully loaded #{loaded_points} points from compressed files")
loaded_points
end
defp parse_csv_line(csv_line) do
try do
[_line_num, timestamp_str, _sensor_type, value_str] =
csv_line
|> String.trim()
|> String.split(",")
timestamp = String.to_integer(timestamp_str)
value = String.to_float(value_str)
{timestamp, value}
rescue
_ -> nil # Skip invalid lines
end
end
end
# Run file processing simulation
file_results = LargeFileProcessor.simulate_csv_processing(100_000, 25_000)
_loaded_points = LargeFileProcessor.simulate_load_compressed_chunks(file_results)
4. Adaptive Processing (Streaming vs Chunking)
Automatically choose between streaming and chunking based on data rate:
defmodule AdaptiveProcessor do
def start_adaptive_processing(data_source, max_points \\ 100) do
IO.puts("Starting adaptive processing (streaming vs chunking)...")
data_source
|> Stream.take(max_points) # Limit for demo
|> Stream.chunk_while(
%{buffer: [], last_time: System.monotonic_time(:millisecond), chunk_count: 0},
&chunk_or_stream/2,
&finalize_buffer/1
)
|> Stream.map(&process_adaptively/1)
|> Enum.to_list() # Force evaluation
end
defp chunk_or_stream(point, %{buffer: buffer, last_time: last_time, chunk_count: count} = acc) do
current_time = System.monotonic_time(:millisecond)
time_diff = current_time - last_time
new_buffer = [point | buffer]
cond do
# High data rate: use chunking for efficiency (buffer filled quickly)
length(new_buffer) >= 10 ->
IO.puts("🔄 High data rate detected, using chunking (#{length(new_buffer)} points)")
{:cont, {:chunk, Enum.reverse(new_buffer)}, %{buffer: [], last_time: current_time, chunk_count: count + 1}}
# Low data rate: stream individual points for low latency (timeout reached)
time_diff > 2000 and length(new_buffer) > 0 ->
IO.puts("⚡ Low data rate detected, streaming individual points (#{length(new_buffer)} points)")
{:cont, {:stream, Enum.reverse(new_buffer)}, %{buffer: [], last_time: current_time, chunk_count: count + 1}}
# Keep buffering
true ->
{:cont, %{buffer: new_buffer, last_time: last_time, chunk_count: count}}
end
end
defp finalize_buffer(%{buffer: []}), do: {:cont, []}
defp finalize_buffer(%{buffer: buffer}), do: {:cont, {:final, Enum.reverse(buffer)}, %{buffer: []}}
defp process_adaptively({:stream, data_points}) do
# Process individual points for low latency
results = Enum.map(data_points, fn point ->
case GorillaStream.compress([point]) do
{:ok, compressed} ->
{timestamp, value} = point
%{
mode: :streaming,
timestamp: timestamp,
value: value,
compressed_size: byte_size(compressed),
compression_ratio: Float.round(16 / byte_size(compressed), 2),
status: :success
}
{:error, reason} ->
%{mode: :streaming, point: point, error: reason, status: :error}
end
end)
successful = Enum.count(results, &(&1.status == :success))
IO.puts(" ✓ Streamed #{successful}/#{length(data_points)} points individually")
{:stream, results}
end
defp process_adaptively({:chunk, data_points}) do
# Process multiple points as a chunk for efficiency
case GorillaStream.compress(data_points) do
{:ok, compressed} ->
original_size = length(data_points) * 16
compression_ratio = original_size / byte_size(compressed)
result = %{
mode: :chunking,
points: length(data_points),
compressed_size: byte_size(compressed),
compression_ratio: Float.round(compression_ratio, 2),
status: :success
}
IO.puts(" ✓ Chunked #{length(data_points)} points -> #{byte_size(compressed)}B (#{result.compression_ratio}:1)")
{:chunk, result}
{:error, reason} ->
IO.puts(" ✗ Chunk compression failed: #{reason}")
{:chunk, %{mode: :chunking, points: length(data_points), error: reason, status: :error}}
end
end
defp process_adaptively({:final, data_points}) do
IO.puts("🏁 Processing final buffer with #{length(data_points)} points")
process_adaptively({:stream, data_points})
end
end
# Create a data source that varies in rate
variable_rate_data = Stream.unfold({1609459200, :slow}, fn
{timestamp, :slow} when timestamp < 1609459200 + 20 * 60 ->
# Slow rate: 1 point per call, simulating low-frequency sensor
point = {timestamp, 20.0 + :rand.uniform() * 2}
Process.sleep(100) # Simulate slow data arrival
{point, {timestamp + 60, if(timestamp > 1609459200 + 10 * 60, do: :fast, else: :slow)}}
{timestamp, :fast} when timestamp < 1609459200 + 50 * 60 ->
# Fast rate: simulate burst of data
point = {timestamp, 20.0 + :rand.uniform() * 2}
Process.sleep(10) # Simulate fast data arrival
{point, {timestamp + 60, if(timestamp > 1609459200 + 40 * 60, do: :slow, else: :fast)}}
{timestamp, :slow} when timestamp < 1609459200 + 70 * 60 ->
# Back to slow rate
point = {timestamp, 20.0 + :rand.uniform() * 2}
Process.sleep(100)
{point, {timestamp + 60, :slow}}
_ -> nil
end)
# Run adaptive processing
adaptive_results = AdaptiveProcessor.start_adaptive_processing(variable_rate_data, 50)
# Analyze adaptive processing results
stream_results = Enum.filter(adaptive_results, fn
{:stream, _} -> true
_ -> false
end)
chunk_results = Enum.filter(adaptive_results, fn
{:chunk, _} -> true
_ -> false
end)
IO.puts("\n--- Adaptive Processing Summary ---")
IO.puts("Streaming operations: #{length(stream_results)}")
IO.puts("Chunking operations: #{length(chunk_results)}")
stream_points =
stream_results
|> Enum.flat_map(fn {:stream, results} -> results end)
|> Enum.count(&(&1.status == :success))
chunk_points =
chunk_results
|> Enum.map(fn {:chunk, result} -> if result.status == :success, do: result.points, else: 0 end)
|> Enum.sum()
IO.puts("Points processed via streaming: #{stream_points}")
IO.puts("Points processed via chunking: #{chunk_points}")
IO.puts("Total points processed: #{stream_points + chunk_points}")
5. IoT Data Pipeline Simulation
Simulate a realistic IoT data pipeline with multiple sensors:
defmodule IoTDataPipeline do
use GenServer
def start_link(device_configs) do
GenServer.start_link(__MODULE__, device_configs, name: __MODULE__)
end
def init(device_configs) do
IO.puts("Starting IoT Data Pipeline with #{length(device_configs)} devices...")
# Start simulated devices
devices = Enum.map(device_configs, &start_device/1)
# Schedule periodic stats reporting
:timer.send_interval(5000, self(), :report_stats)
{:ok, %{
devices: devices,
compression_stats: %{},
total_points: 0,
start_time: System.monotonic_time(:millisecond)
}}
end
def handle_info({:device_data, device_id, {timestamp, value}}, state) do
# Compress individual data points as they arrive from devices
point = {timestamp, value}
case GorillaStream.compress([point]) do
{:ok, compressed} ->
# Simulate sending compressed data to cloud/storage
send_to_cloud(device_id, point, compressed)
# Update compression stats
stats = Map.get(state.compression_stats, device_id, %{points: 0, bytes: 0, errors: 0})
new_stats = %{
points: stats.points + 1,
bytes: stats.bytes + byte_size(compressed),
errors: stats.errors
}
new_state = %{
state |
compression_stats: Map.put(state.compression_stats, device_id, new_stats),
total_points: state.total_points + 1
}
{:noreply, new_state}
{:error, reason} ->
IO.puts("⚠️ IoT compression failed for #{device_id}: #{reason}")
# Update error stats
stats = Map.get(state.compression_stats, device_id, %{points: 0, bytes: 0, errors: 0})
new_stats = %{stats | errors: stats.errors + 1}
new_state = %{
state |
compression_stats: Map.put(state.compression_stats, device_id, new_stats)
}
{:noreply, new_state}
end
end
def handle_info(:report_stats, state) do
current_time = System.monotonic_time(:millisecond)
elapsed_seconds = (current_time - state.start_time) / 1000
IO.puts("\n--- IoT Pipeline Stats (#{Float.round(elapsed_seconds, 1)}s elapsed) ---")
IO.puts("Total points processed: #{state.total_points}")
IO.puts("Processing rate: #{Float.round(state.total_points / elapsed_seconds, 1)} points/sec")
Enum.each(state.compression_stats, fn {device_id, stats} ->
avg_compression = if stats.points > 0, do: stats.bytes / stats.points, else: 0
success_rate = if stats.points + stats.errors > 0 do
stats.points / (stats.points + stats.errors) * 100
else
0
end
IO.puts("#{device_id}: #{stats.points} pts, #{Float.round(avg_compression, 1)}B avg, #{Float.round(success_rate, 1)}% success")
end)
{:noreply, state}
end
def handle_info(:stop_demo, state) do
IO.puts("\n🛑 Stopping IoT Pipeline Demo")
{:stop, :normal, state}
end
defp start_device({device_id, config}) do
# Simulate device connection and start sending data
spawn(fn -> simulate_device_data(device_id, config) end)
{device_id, :connected}
end
defp simulate_device_data(device_id, config) do
base_value = config[:base_value] || 20.0
variation = config[:variation] || 2.0
interval_ms = config[:interval_ms] || 1000
Stream.iterate(0, &(&1 + 1))
|> Stream.each(fn i ->
timestamp = System.system_time(:second)
# Generate realistic sensor data based on device type
value = case config[:type] do
:temperature ->
base_value + variation * :math.sin(i / 10) + (:rand.uniform() - 0.5) * 0.5
:humidity ->
base_value + variation * :math.cos(i / 15) + (:rand.uniform() - 0.5) * 2
:pressure ->
base_value + variation * :math.sin(i / 20) + (:rand.uniform() - 0.5) * 1
_ ->
base_value + (:rand.uniform() - 0.5) * variation
end
# Send data to pipeline
send(IoTDataPipeline, {:device_data, device_id, {timestamp, Float.round(value, 2)}})
Process.sleep(interval_ms)
end)
|> Stream.take(30) # Limit for demo
|> Enum.to_list()
end
defp send_to_cloud(device_id, {timestamp, value}, compressed_data) do
# Simulate sending to cloud storage/API
# In reality: CloudAPI.send_compressed_point(...)
compression_ratio = 16 / byte_size(compressed_data)
if rem(System.system_time(:millisecond), 2000) < 100 do # Occasional logging
IO.puts("☁️ #{device_id}: #{timestamp}:#{value} -> #{byte_size(compressed_data)}B (#{Float.round(compression_ratio, 1)}:1)")
end
end
def stop_demo() do
send(IoTDataPipeline, :stop_demo)
end
end
# Configure simulated IoT devices
device_configs = [
{"temp_sensor_01", [type: :temperature, base_value: 22.5, variation: 3.0, interval_ms: 1000]},
{"humidity_sensor_01", [type: :humidity, base_value: 65.0, variation: 15.0, interval_ms: 1500]},
{"pressure_sensor_01", [type: :pressure, base_value: 1013.25, variation: 20.0, interval_ms: 2000]},
{"temp_sensor_02", [type: :temperature, base_value: 18.0, variation: 4.0, interval_ms: 800]}
]
# Start the IoT pipeline
{:ok, _pid} = IoTDataPipeline.start_link(device_configs)
# Let it run for a while, then stop
Process.sleep(15_000)
IoTDataPipeline.stop_demo()
Summary
This notebook demonstrated advanced streaming and processing patterns with GorillaStream:
- Real-time Streaming: Processing individual sensor readings as they arrive
- Chunked Processing: Efficiently handling large datasets in manageable chunks
- Memory-Efficient File Processing: Processing huge files without memory overflow
- Adaptive Processing: Automatically switching between streaming and chunking based on data patterns
- IoT Data Pipeline: Realistic simulation of multiple sensors with real-time compression
Key Patterns:
- Stream.unfold for generating infinite data streams
- Stream.chunk_every for batching data into manageable sizes
- Stream.chunk_while for adaptive buffering based on conditions
- GenServer for stateful data pipeline management
- Backpressure handling through controlled stream processing
Performance Insights:
- Streaming excels for low-latency requirements with individual points
- Chunking provides better throughput for bulk processing
- Adaptive processing gives you the best of both approaches
- Memory usage remains predictable across all patterns
Next Steps:
- Explore GenStage integration for advanced backpressure control
- Learn Broadway patterns for production-scale data processing
- Consider Flow for CPU-intensive parallel processing scenarios