Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

GorillaStream: Basic Usage & Examples

notebooks/01_basic_usage_examples.livemd

GorillaStream: Basic Usage & Examples

Mix.install([
  {:gorilla_stream, "~> 1.1"}
])

Introduction

Welcome to GorillaStream! This notebook demonstrates the core functionality of the GorillaStream library, which implements Facebook’s Gorilla compression algorithm for time series data in Elixir.

Key Features:

  • Lossless Compression: Perfect reconstruction of original time series data
  • High Performance: 1.7M+ points/sec encoding, up to 2M points/sec decoding
  • Excellent Compression Ratios: 2-42x compression depending on data patterns
  • Memory Efficient: ~117 bytes/point memory usage for large datasets

1. Basic Compression & Decompression

Let’s start with the most basic example - compressing and decompressing time series data:

# Sample time series data: {timestamp, value} tuples
data = [
  {1609459200, 23.5},
  {1609459260, 23.7}, 
  {1609459320, 23.4},
  {1609459380, 23.6},
  {1609459440, 23.8}
]

# Compress the data
{:ok, compressed} = GorillaStream.compress(data)

# Decompress back to original
{:ok, decompressed} = GorillaStream.decompress(compressed)

# Display results
IO.puts("Original data: #{inspect(data)}")
IO.puts("Compressed size: #{byte_size(compressed)} bytes")
IO.puts("Decompressed data: #{inspect(decompressed)}")
IO.puts("Lossless compression verified: #{decompressed == data}")

# Calculate compression ratio
original_size = length(data) * 16  # Approximate: 8 bytes timestamp + 8 bytes float
compression_ratio = original_size / byte_size(compressed)
IO.puts("Compression ratio: #{Float.round(compression_ratio, 2)}:1")

2. Enhanced Compression with Zlib

GorillaStream supports an additional zlib compression layer for even better compression ratios:

# Generate more realistic sensor data with some patterns
sensor_data = for i <- 1..100 do
  timestamp = 1609459200 + i * 60  # Every minute
  # Temperature with slight variations and daily cycle
  base_temp = 20.0 + 5 * :math.sin(i / 24.0)  # Daily temperature cycle
  noise = (:rand.uniform() - 0.5) * 0.5  # Small random variations
  value = base_temp + noise
  {timestamp, Float.round(value, 2)}
end

# Compress without zlib
{:ok, compressed_normal} = GorillaStream.compress(sensor_data)

# Compress with zlib
{:ok, compressed_zlib} = GorillaStream.compress(sensor_data, true)

# Compare compression ratios
original_size = length(sensor_data) * 16
normal_ratio = original_size / byte_size(compressed_normal)
zlib_ratio = original_size / byte_size(compressed_zlib)

IO.puts("Dataset: #{length(sensor_data)} points")
IO.puts("Original size (approx): #{original_size} bytes")
IO.puts("Gorilla only: #{byte_size(compressed_normal)} bytes (#{Float.round(normal_ratio, 1)}:1 ratio)")
IO.puts("Gorilla + zlib: #{byte_size(compressed_zlib)} bytes (#{Float.round(zlib_ratio, 1)}:1 ratio)")
IO.puts("Additional zlib benefit: #{Float.round(zlib_ratio / normal_ratio, 2)}x")

# Verify both decompress to the same original data
{:ok, decompressed_normal} = GorillaStream.decompress(compressed_normal)
{:ok, decompressed_zlib} = GorillaStream.decompress(compressed_zlib, true)

IO.puts("Lossless verified (normal): #{decompressed_normal == sensor_data}")
IO.puts("Lossless verified (zlib): #{decompressed_zlib == sensor_data}")

3. Different Data Patterns & Compression Performance

Let’s explore how different data patterns affect compression performance:

defmodule CompressionTester do
  def test_pattern(name, data_generator) do
    # Generate 1000 data points
    data = for i <- 1..1000, do: data_generator.(i)
    
    # Compress
    start_time = :os.system_time(:microsecond)
    {:ok, compressed} = GorillaStream.compress(data)
    compress_time = :os.system_time(:microsecond) - start_time
    
    # Decompress
    start_time = :os.system_time(:microsecond)
    {:ok, decompressed} = GorillaStream.decompress(compressed)
    decompress_time = :os.system_time(:microsecond) - start_time
    
    # Calculate metrics
    original_size = length(data) * 16
    compression_ratio = original_size / byte_size(compressed)
    compress_rate = length(data) / (compress_time / 1_000_000)  # points per second
    decompress_rate = length(data) / (decompress_time / 1_000_000)
    
    %{
      pattern: name,
      points: length(data),
      original_size: original_size,
      compressed_size: byte_size(compressed),
      compression_ratio: Float.round(compression_ratio, 2),
      compress_time_us: compress_time,
      decompress_time_us: decompress_time,
      compress_rate: Float.round(compress_rate, 0),
      decompress_rate: Float.round(decompress_rate, 0),
      lossless: decompressed == data
    }
  end
end

# Test different data patterns
patterns = [
  {"Stable values", fn i -> {1609459200 + i * 60, 23.5} end},
  {"Gradual increase", fn i -> {1609459200 + i * 60, 20.0 + i * 0.01} end},
  {"Sine wave", fn i -> {1609459200 + i * 60, 50.0 + 10 * :math.sin(i / 100)} end},
  {"Random walk", fn i -> 
    :rand.seed(:exsplus, {1, 2, 3})  # Consistent seed for reproducibility
    {1609459200 + i * 60, 20.0 + :rand.normal() * 5}
  end},
  {"High frequency noise", fn i -> {1609459200 + i * 60, 20.0 + (:rand.uniform() - 0.5) * 0.1} end}
]

results = Enum.map(patterns, fn {name, generator} ->
  CompressionTester.test_pattern(name, generator)
end)

# Display results in a nice table format  
IO.puts("\n" <> String.pad_trailing("Pattern", 20) <> 
        String.pad_trailing("Ratio", 8) <> 
        String.pad_trailing("Enc/sec", 10) <> 
        String.pad_trailing("Dec/sec", 10) <> 
        "Size")
IO.puts(String.duplicate("-", 60))

Enum.each(results, fn result ->
  IO.puts(String.pad_trailing(result.pattern, 20) <> 
          String.pad_trailing("#{result.compression_ratio}:1", 8) <> 
          String.pad_trailing("#{result.compress_rate}", 10) <> 
          String.pad_trailing("#{result.decompress_rate}", 10) <> 
          "#{result.compressed_size}B")
end)

4. Error Handling & Edge Cases

GorillaStream includes comprehensive error handling. Let’s explore various edge cases:

# Test empty data
case GorillaStream.compress([]) do
  {:error, reason} -> IO.puts("Empty data error: #{reason}")
  {:ok, _} -> IO.puts("Empty data surprisingly worked!")
end

# Test single point
single_point = [{1609459200, 23.5}]
{:ok, compressed_single} = GorillaStream.compress(single_point)
{:ok, decompressed_single} = GorillaStream.decompress(compressed_single)
IO.puts("Single point works: #{decompressed_single == single_point}")

# Test invalid data formats
invalid_cases = [
  {[{1609459200}], "Missing value"},
  {[{"not_a_timestamp", 23.5}], "Invalid timestamp type"},
  {[{1609459200, "not_a_number"}], "Invalid value type"},
  {[{1609459200, 23.5}, {1609459100, 24.0}], "Non-monotonic timestamps"}
]

Enum.each(invalid_cases, fn {invalid_data, description} ->
  case GorillaStream.compress(invalid_data) do
    {:error, reason} -> IO.puts("#{description}: #{reason}")
    {:ok, _} -> IO.puts("#{description}: Unexpectedly succeeded!")
  end
end)

5. Memory Usage Analysis

Let’s analyze memory usage patterns for different dataset sizes:

defmodule MemoryAnalyzer do
  def analyze_memory_usage(sizes) do
    Enum.map(sizes, fn size ->
      # Generate test data
      data = for i <- 1..size do
        {1609459200 + i * 60, 20.0 + :math.sin(i / 100) * 5}
      end
      
      # Measure memory before compression
      :erlang.garbage_collect()
      {memory_before, _} = :erlang.process_info(self(), :memory)
      
      # Compress
      {:ok, compressed} = GorillaStream.compress(data)
      
      # Measure memory after compression  
      :erlang.garbage_collect()
      {memory_after, _} = :erlang.process_info(self(), :memory)
      
      memory_used = memory_after - memory_before
      memory_per_point = memory_used / size
      
      %{
        points: size,
        compressed_size: byte_size(compressed),
        memory_used: memory_used,
        memory_per_point: Float.round(memory_per_point, 2),
        compression_ratio: Float.round((size * 16) / byte_size(compressed), 2)
      }
    end)
  end
end

# Test different dataset sizes
sizes = [100, 1_000, 10_000, 50_000]
memory_results = MemoryAnalyzer.analyze_memory_usage(sizes)

IO.puts("\n" <> String.pad_trailing("Points", 10) <> 
        String.pad_trailing("Compressed", 12) <> 
        String.pad_trailing("Memory/Point", 14) <> 
        "Ratio")
IO.puts(String.duplicate("-", 50))

Enum.each(memory_results, fn result ->
  IO.puts(String.pad_trailing("#{result.points}", 10) <> 
          String.pad_trailing("#{result.compressed_size}B", 12) <> 
          String.pad_trailing("#{result.memory_per_point}B", 14) <> 
          "#{result.compression_ratio}:1")
end)

6. Performance Benchmarking

Let’s benchmark compression and decompression performance across different scenarios:

defmodule PerformanceBenchmark do
  def benchmark(name, fun, iterations \\ 10) do
    # Warm up
    fun.()
    
    # Benchmark
    times = for _ <- 1..iterations do
      start_time = :os.system_time(:microsecond)
      result = fun.()
      end_time = :os.system_time(:microsecond)
      {end_time - start_time, result}
    end
    
    {durations, _results} = Enum.unzip(times)
    
    avg_time = Enum.sum(durations) / length(durations)
    min_time = Enum.min(durations)
    max_time = Enum.max(durations)
    
    %{
      name: name,
      avg_time_us: Float.round(avg_time, 0),
      min_time_us: min_time,
      max_time_us: max_time,
      iterations: iterations
    }
  end
  
  def benchmark_throughput(data, label) do
    # Compression benchmark
    compress_bench = benchmark("#{label} Compress", fn ->
      {:ok, _} = GorillaStream.compress(data)
    end)
    
    # Decompress benchmark (need compressed data first)
    {:ok, compressed} = GorillaStream.compress(data)
    decompress_bench = benchmark("#{label} Decompress", fn ->
      {:ok, _} = GorillaStream.decompress(compressed)
    end)
    
    points = length(data)
    compress_rate = points / (compress_bench.avg_time_us / 1_000_000)
    decompress_rate = points / (decompress_bench.avg_time_us / 1_000_000)
    
    %{
      label: label,
      points: points,
      compress_rate: Float.round(compress_rate, 0),
      decompress_rate: Float.round(decompress_rate, 0),
      compressed_size: byte_size(compressed)
    }
  end
end

# Generate test datasets of varying sizes
datasets = [
  {1_000, "1K points"},
  {10_000, "10K points"}, 
  {50_000, "50K points"}
]

benchmark_results = Enum.map(datasets, fn {size, label} ->
  data = for i <- 1..size do
    {1609459200 + i * 60, 20.0 + :math.sin(i / 100) * 5 + (:rand.uniform() - 0.5) * 0.5}
  end
  
  PerformanceBenchmark.benchmark_throughput(data, label)
end)

IO.puts("\n" <> String.pad_trailing("Dataset", 12) <> 
        String.pad_trailing("Enc/sec", 10) <> 
        String.pad_trailing("Dec/sec", 10) <> 
        String.pad_trailing("Size", 8) <> 
        "Points")
IO.puts(String.duplicate("-", 55))

Enum.each(benchmark_results, fn result ->
  IO.puts(String.pad_trailing(result.label, 12) <> 
          String.pad_trailing("#{result.compress_rate}", 10) <> 
          String.pad_trailing("#{result.decompress_rate}", 10) <> 
          String.pad_trailing("#{result.compressed_size}B", 8) <> 
          "#{result.points}")
end)

Summary

This notebook demonstrated the core functionality of GorillaStream:

  1. Basic Usage: Simple compression/decompression with lossless guarantees
  2. Enhanced Compression: Using zlib for additional compression benefits
  3. Pattern Analysis: How different data patterns affect compression performance
  4. Error Handling: Robust handling of edge cases and invalid data
  5. Memory Efficiency: Memory usage scales predictably with dataset size
  6. Performance: High-throughput compression and decompression capabilities

Key Takeaways:

  • GorillaStream excels with time series data that has gradual changes
  • Compression ratios vary significantly based on data patterns (2-42x)
  • Performance consistently exceeds 1.7M points/sec for encoding
  • Memory usage is predictable and efficient (~117 bytes/point)
  • Comprehensive error handling makes it production-ready

Next Steps:

  • Explore streaming patterns in the “Streaming & Processing” notebook
  • Learn about GenStage integration for backpressure handling
  • See Broadway examples for scalable data processing pipelines