Powered by AppSignal & Oban Pro

Production Monitoring and Observability for SOAP Services

livebooks/production_monitoring.livemd

Production Monitoring and Observability for SOAP Services

Mix.install([
  {:lather, "~> 1.0"},
  {:finch, "~> 0.18"},
  {:kino, "~> 0.12"},
  {:kino_vega_lite, "~> 0.1"},
  {:jason, "~> 1.4"},
  {:telemetry, "~> 1.2"},
  {:telemetry_metrics, "~> 0.6"},
  {:telemetry_poller, "~> 1.0"}
])

Introduction

Production monitoring is essential for maintaining reliable SOAP service integrations. This Livebook covers:

  • Setting up comprehensive telemetry for SOAP operations
  • Tracking key metrics (latency, error rates, throughput)
  • Implementing health checks and alerting
  • Building real-time monitoring dashboards
  • Establishing performance baselines

Why Monitoring Matters for SOAP Services

SOAP services present unique monitoring challenges:

  1. XML Parsing Overhead: SOAP’s XML-based protocol adds parsing time that must be tracked
  2. Complex Error States: SOAP faults, HTTP errors, and transport failures need separate tracking
  3. Enterprise Dependencies: SOAP services often connect to critical business systems
  4. Legacy Integration: Many SOAP services run on legacy infrastructure with varying reliability
  5. Payload Size Impact: Large XML payloads can affect latency and memory usage

Environment Setup

# Start applications
{:ok, _} = Application.ensure_all_started(:lather)
{:ok, _} = Application.ensure_all_started(:telemetry)

# Start Finch if not already running (safe to re-run this cell)
if Process.whereis(Lather.Finch) == nil do
  children = [
    {Finch,
     name: Lather.Finch,
     pools: %{
       :default => [
         size: 10,
         count: 2,
         conn_opts: [
           transport_opts: [timeout: 30_000]
         ]
       ]
     }}
  ]

  {:ok, _} = Supervisor.start_link(children, strategy: :one_for_one)
end

IO.puts("Monitoring environment initialized!")

Telemetry Integration

Lather emits telemetry events that you can use for monitoring. Let’s set up comprehensive telemetry handling:

defmodule SOAPTelemetryHandler do
  @moduledoc """
  Handles telemetry events from Lather SOAP operations.
  """

  require Logger

  # Lather emits these telemetry events:
  # - [:lather, :request, :start] - SOAP request started
  # - [:lather, :request, :stop] - SOAP request completed
  # - [:lather, :request, :error] - SOAP request failed
  # - [:lather, :wsdl, :parse, :start] - WSDL parsing started
  # - [:lather, :wsdl, :parse, :stop] - WSDL parsing completed

  def attach_handlers do
    :telemetry.attach_many(
      "soap-monitoring-handler",
      [
        [:lather, :request, :start],
        [:lather, :request, :stop],
        [:lather, :request, :error],
        [:lather, :wsdl, :parse, :start],
        [:lather, :wsdl, :parse, :stop]
      ],
      &handle_event/4,
      nil
    )

    IO.puts("Telemetry handlers attached successfully")
  end

  def handle_event([:lather, :request, :start], measurements, metadata, _config) do
    Logger.debug("SOAP Request Started",
      operation: metadata[:operation],
      endpoint: metadata[:endpoint],
      system_time: measurements[:system_time]
    )

    # Store start time for duration calculation
    Process.put({:soap_request_start, metadata[:request_id]}, System.monotonic_time())
  end

  def handle_event([:lather, :request, :stop], measurements, metadata, _config) do
    duration_ms = System.convert_time_unit(
      measurements[:duration] || 0,
      :native,
      :millisecond
    )

    Logger.info("SOAP Request Completed",
      operation: metadata[:operation],
      endpoint: metadata[:endpoint],
      duration_ms: duration_ms,
      status: :success
    )

    # Emit to metrics collector
    MetricsCollector.record_request(metadata[:operation], duration_ms, :success)
  end

  def handle_event([:lather, :request, :error], measurements, metadata, _config) do
    duration_ms = System.convert_time_unit(
      measurements[:duration] || 0,
      :native,
      :millisecond
    )

    error_type = classify_error(metadata[:error])

    Logger.error("SOAP Request Failed",
      operation: metadata[:operation],
      endpoint: metadata[:endpoint],
      duration_ms: duration_ms,
      error_type: error_type,
      error: inspect(metadata[:error])
    )

    MetricsCollector.record_request(metadata[:operation], duration_ms, {:error, error_type})
  end

  def handle_event([:lather, :wsdl, :parse, :start], _measurements, metadata, _config) do
    Logger.debug("WSDL Parse Started", url: metadata[:url])
  end

  def handle_event([:lather, :wsdl, :parse, :stop], measurements, metadata, _config) do
    duration_ms = System.convert_time_unit(
      measurements[:duration] || 0,
      :native,
      :millisecond
    )

    Logger.info("WSDL Parse Completed",
      url: metadata[:url],
      duration_ms: duration_ms,
      operations_count: metadata[:operations_count]
    )
  end

  defp classify_error(error) do
    case error do
      %{type: :soap_fault} -> :soap_fault
      %{type: :http_error, status: status} when status >= 500 -> :server_error
      %{type: :http_error, status: status} when status >= 400 -> :client_error
      %{type: :transport_error, reason: :timeout} -> :timeout
      %{type: :transport_error} -> :transport_error
      _ -> :unknown
    end
  end
end

# Attach the handlers
SOAPTelemetryHandler.attach_handlers()

Metrics Collection

Let’s create a metrics collector that tracks key performance indicators:

defmodule MetricsCollector do
  use Agent

  @doc """
  Starts the metrics collector agent.
  """
  def start_link do
    Agent.start_link(fn -> initial_state() end, name: __MODULE__)
  end

  defp initial_state do
    %{
      requests: [],
      errors_by_type: %{},
      latencies: [],
      start_time: System.monotonic_time(:second),
      connection_pool: %{active: 0, idle: 10, total: 10}
    }
  end

  def record_request(operation, duration_ms, status) do
    timestamp = DateTime.utc_now()

    Agent.update(__MODULE__, fn state ->
      request = %{
        operation: operation,
        duration_ms: duration_ms,
        status: status,
        timestamp: timestamp
      }

      state
      |> Map.update!(:requests, fn requests ->
        # Keep last 1000 requests for analysis
        [request | requests] |> Enum.take(1000)
      end)
      |> Map.update!(:latencies, fn latencies ->
        [duration_ms | latencies] |> Enum.take(1000)
      end)
      |> maybe_record_error(status)
    end)
  end

  defp maybe_record_error(state, {:error, error_type}) do
    Map.update!(state, :errors_by_type, fn errors ->
      Map.update(errors, error_type, 1, &(&1 + 1))
    end)
  end
  defp maybe_record_error(state, _), do: state

  def get_metrics do
    Agent.get(__MODULE__, fn state ->
      latencies = state.latencies
      requests = state.requests
      uptime_seconds = System.monotonic_time(:second) - state.start_time

      successful_requests = Enum.count(requests, fn r -> r.status == :success end)
      failed_requests = length(requests) - successful_requests

      %{
        total_requests: length(requests),
        successful_requests: successful_requests,
        failed_requests: failed_requests,
        error_rate: if(length(requests) > 0, do: failed_requests / length(requests) * 100, else: 0),
        latency_p50: percentile(latencies, 50),
        latency_p95: percentile(latencies, 95),
        latency_p99: percentile(latencies, 99),
        latency_avg: if(length(latencies) > 0, do: Enum.sum(latencies) / length(latencies), else: 0),
        latency_min: if(length(latencies) > 0, do: Enum.min(latencies), else: 0),
        latency_max: if(length(latencies) > 0, do: Enum.max(latencies), else: 0),
        throughput_per_second: if(uptime_seconds > 0, do: length(requests) / uptime_seconds, else: 0),
        errors_by_type: state.errors_by_type,
        connection_pool: state.connection_pool,
        uptime_seconds: uptime_seconds
      }
    end)
  end

  def get_recent_requests(count \\ 10) do
    Agent.get(__MODULE__, fn state ->
      Enum.take(state.requests, count)
    end)
  end

  def reset do
    Agent.update(__MODULE__, fn _state -> initial_state() end)
  end

  defp percentile([], _p), do: 0
  defp percentile(values, p) do
    sorted = Enum.sort(values)
    k = (length(sorted) - 1) * p / 100
    f = floor(k)
    c = ceil(k)

    if f == c do
      Enum.at(sorted, round(k))
    else
      d0 = Enum.at(sorted, f) * (c - k)
      d1 = Enum.at(sorted, c) * (k - f)
      round(d0 + d1)
    end
  end
end

# Start the metrics collector
{:ok, _pid} = MetricsCollector.start_link()
IO.puts("Metrics collector started!")

Key Metrics to Monitor

Simulating SOAP Operations for Metrics

defmodule MetricsSimulator do
  @operations ["GetUser", "CreateUser", "UpdateUser", "SearchUsers", "DeleteUser"]
  @error_types [:soap_fault, :timeout, :server_error, :client_error]

  def simulate_traffic(count \\ 50) do
    Enum.each(1..count, fn _ ->
      operation = Enum.random(@operations)

      # Simulate varying latencies
      base_latency = Enum.random(50..500)
      jitter = Enum.random(-20..20)
      duration_ms = max(10, base_latency + jitter)

      # 10% error rate
      status = if :rand.uniform() > 0.1 do
        :success
      else
        {:error, Enum.random(@error_types)}
      end

      MetricsCollector.record_request(operation, duration_ms, status)

      # Small delay between requests
      Process.sleep(10)
    end)

    IO.puts("Simulated #{count} SOAP requests")
  end
end

# Generate sample traffic
MetricsSimulator.simulate_traffic(100)

Metrics Dashboard

# Get current metrics
metrics = MetricsCollector.get_metrics()

# Create metrics display
metrics_md = """
## Current Metrics Summary

| Metric | Value |
|--------|-------|
| Total Requests | #{metrics.total_requests} |
| Successful | #{metrics.successful_requests} |
| Failed | #{metrics.failed_requests} |
| Error Rate | #{Float.round(metrics.error_rate, 2)}% |
| Throughput | #{Float.round(metrics.throughput_per_second, 2)} req/s |

### Latency Percentiles (ms)

| Percentile | Value |
|------------|-------|
| p50 (Median) | #{metrics.latency_p50} ms |
| p95 | #{metrics.latency_p95} ms |
| p99 | #{metrics.latency_p99} ms |
| Average | #{Float.round(metrics.latency_avg, 1)} ms |
| Min | #{metrics.latency_min} ms |
| Max | #{metrics.latency_max} ms |

### Errors by Type

#{Enum.map_join(metrics.errors_by_type, "\n", fn {type, count} -> "- #{type}: #{count}" end)}
"""

Kino.Markdown.new(metrics_md)

Latency Distribution Chart

# Get recent requests for charting
recent_requests = MetricsCollector.get_recent_requests(50)

# Prepare data for VegaLite chart
chart_data = Enum.map(recent_requests, fn req ->
  %{
    "operation" => to_string(req.operation),
    "latency" => req.duration_ms,
    "status" => if(req.status == :success, do: "success", else: "error"),
    "timestamp" => DateTime.to_iso8601(req.timestamp)
  }
end)

VegaLite.new(width: 600, height: 300, title: "Request Latency by Operation")
|> VegaLite.data_from_values(chart_data)
|> VegaLite.mark(:boxplot)
|> VegaLite.encode_field(:x, "operation", type: :nominal, title: "Operation")
|> VegaLite.encode_field(:y, "latency", type: :quantitative, title: "Latency (ms)")
|> VegaLite.encode_field(:color, "status", type: :nominal)

Error Rate Over Time

defmodule ErrorRateAnalyzer do
  def analyze_error_distribution(requests) do
    requests
    |> Enum.group_by(fn req ->
      case req.status do
        :success -> :success
        {:error, type} -> type
      end
    end)
    |> Enum.map(fn {status, reqs} ->
      %{"status" => to_string(status), "count" => length(reqs)}
    end)
  end
end

error_data = ErrorRateAnalyzer.analyze_error_distribution(recent_requests)

VegaLite.new(width: 400, height: 300, title: "Request Status Distribution")
|> VegaLite.data_from_values(error_data)
|> VegaLite.mark(:arc, inner_radius: 50)
|> VegaLite.encode_field(:theta, "count", type: :quantitative)
|> VegaLite.encode_field(:color, "status", type: :nominal)

Health Checks

Implementing comprehensive health checks for SOAP service dependencies:

defmodule HealthChecker do
  @doc """
  Performs comprehensive health checks on SOAP service dependencies.
  """

  def check_all(services) do
    checks = Enum.map(services, fn service ->
      Task.async(fn -> check_service(service) end)
    end)

    results = Task.await_many(checks, 10_000)

    overall_status = if Enum.all?(results, fn r -> r.status == :healthy end) do
      :healthy
    else
      if Enum.any?(results, fn r -> r.status == :unhealthy end) do
        :unhealthy
      else
        :degraded
      end
    end

    %{
      status: overall_status,
      timestamp: DateTime.utc_now(),
      checks: results
    }
  end

  def check_service(service) do
    start_time = System.monotonic_time(:millisecond)

    result = case service.type do
      :wsdl -> check_wsdl_availability(service)
      :endpoint -> check_endpoint_health(service)
      :dependency -> check_dependency(service)
    end

    duration = System.monotonic_time(:millisecond) - start_time

    Map.merge(result, %{
      name: service.name,
      type: service.type,
      duration_ms: duration,
      checked_at: DateTime.utc_now()
    })
  end

  defp check_wsdl_availability(service) do
    # Simulate WSDL availability check
    case simulate_http_check(service.url) do
      {:ok, _} ->
        %{status: :healthy, message: "WSDL accessible"}

      {:error, reason} ->
        %{status: :unhealthy, message: "WSDL unavailable: #{reason}"}
    end
  end

  defp check_endpoint_health(service) do
    case simulate_http_check(service.url) do
      {:ok, response_time} when response_time < 1000 ->
        %{status: :healthy, message: "Endpoint responding normally"}

      {:ok, response_time} ->
        %{status: :degraded, message: "Endpoint slow: #{response_time}ms"}

      {:error, reason} ->
        %{status: :unhealthy, message: "Endpoint error: #{reason}"}
    end
  end

  defp check_dependency(service) do
    # Simulate dependency check
    if :rand.uniform() > 0.1 do
      %{status: :healthy, message: "Dependency available"}
    else
      %{status: :degraded, message: "Dependency experiencing issues"}
    end
  end

  defp simulate_http_check(_url) do
    # Simulate HTTP check with random outcomes
    Process.sleep(Enum.random(50..200))

    case :rand.uniform(10) do
      n when n <= 8 -> {:ok, Enum.random(100..500)}
      9 -> {:ok, Enum.random(1000..2000)}
      10 -> {:error, :timeout}
    end
  end

  @doc """
  Liveness probe - checks if the service is running.
  """
  def liveness_probe do
    # Basic liveness: is the BEAM running and responsive?
    %{
      status: :alive,
      timestamp: DateTime.utc_now(),
      memory_mb: :erlang.memory(:total) / 1_000_000,
      process_count: length(Process.list())
    }
  end

  @doc """
  Readiness probe - checks if the service is ready to handle requests.
  """
  def readiness_probe(dependencies) do
    health = check_all(dependencies)

    ready = health.status in [:healthy, :degraded]

    %{
      ready: ready,
      status: health.status,
      timestamp: DateTime.utc_now(),
      details: health.checks
    }
  end
end

# Define sample services to check
sample_services = [
  %{name: "UserService WSDL", type: :wsdl, url: "https://example.com/users?wsdl"},
  %{name: "UserService Endpoint", type: :endpoint, url: "https://example.com/users"},
  %{name: "OrderService WSDL", type: :wsdl, url: "https://example.com/orders?wsdl"},
  %{name: "Database", type: :dependency, url: "postgres://localhost/db"},
  %{name: "Redis Cache", type: :dependency, url: "redis://localhost:6379"}
]

# Run health checks
health_result = HealthChecker.check_all(sample_services)

# Display results
health_md = """
## Health Check Results

**Overall Status**: #{health_result.status}
**Checked At**: #{DateTime.to_iso8601(health_result.timestamp)}

### Individual Checks

| Service | Type | Status | Duration | Message |
|---------|------|--------|----------|---------|
#{Enum.map_join(health_result.checks, "\n", fn check ->
  status_icon = case check.status do
    :healthy -> "[OK]"
    :degraded -> "[WARN]"
    :unhealthy -> "[FAIL]"
  end
  "| #{check.name} | #{check.type} | #{status_icon} | #{check.duration_ms}ms | #{check.message} |"
end)}
"""

Kino.Markdown.new(health_md)

Liveness and Readiness Probes

# Liveness probe
liveness = HealthChecker.liveness_probe()

# Readiness probe
readiness = HealthChecker.readiness_probe(sample_services)

probes_md = """
## Kubernetes-Style Probes

### Liveness Probe

#{Jason.encode!(liveness, pretty: true)}


### Readiness Probe
**Ready**: #{readiness.ready}
**Status**: #{readiness.status}

Use these endpoints in your Kubernetes deployment:
- `/health/live` - Returns 200 if alive
- `/health/ready` - Returns 200 if ready to serve traffic
"""

Kino.Markdown.new(probes_md)

Alerting Patterns

Setting up intelligent alerting to avoid alert fatigue:

defmodule AlertManager do
  @moduledoc """
  Manages alerts with threshold-based rules and anomaly detection.
  """

  defstruct [:rules, :active_alerts, :alert_history, :cooldowns]

  def new do
    %__MODULE__{
      rules: default_rules(),
      active_alerts: %{},
      alert_history: [],
      cooldowns: %{}
    }
  end

  def default_rules do
    [
      %{
        name: "High Error Rate",
        metric: :error_rate,
        condition: :greater_than,
        threshold: 5.0,
        severity: :warning,
        cooldown_minutes: 5
      },
      %{
        name: "Critical Error Rate",
        metric: :error_rate,
        condition: :greater_than,
        threshold: 20.0,
        severity: :critical,
        cooldown_minutes: 2
      },
      %{
        name: "High Latency P95",
        metric: :latency_p95,
        condition: :greater_than,
        threshold: 2000,
        severity: :warning,
        cooldown_minutes: 5
      },
      %{
        name: "Very High Latency P99",
        metric: :latency_p99,
        condition: :greater_than,
        threshold: 5000,
        severity: :critical,
        cooldown_minutes: 2
      },
      %{
        name: "Low Throughput",
        metric: :throughput_per_second,
        condition: :less_than,
        threshold: 0.1,
        severity: :warning,
        cooldown_minutes: 10
      },
      %{
        name: "Service Degraded",
        metric: :health_status,
        condition: :equals,
        threshold: :degraded,
        severity: :warning,
        cooldown_minutes: 5
      },
      %{
        name: "Service Unhealthy",
        metric: :health_status,
        condition: :equals,
        threshold: :unhealthy,
        severity: :critical,
        cooldown_minutes: 1
      }
    ]
  end

  def evaluate_rules(manager, metrics) do
    now = DateTime.utc_now()

    triggered = Enum.reduce(manager.rules, [], fn rule, acc ->
      if should_alert?(manager, rule, metrics, now) do
        alert = create_alert(rule, metrics, now)
        [alert | acc]
      else
        acc
      end
    end)

    # Update manager state
    updated_manager = Enum.reduce(triggered, manager, fn alert, m ->
      %{m |
        active_alerts: Map.put(m.active_alerts, alert.rule_name, alert),
        alert_history: [alert | m.alert_history] |> Enum.take(100),
        cooldowns: Map.put(m.cooldowns, alert.rule_name, now)
      }
    end)

    {updated_manager, triggered}
  end

  defp should_alert?(manager, rule, metrics, now) do
    # Check cooldown
    last_alert = Map.get(manager.cooldowns, rule.name)
    cooldown_expired = is_nil(last_alert) or
      DateTime.diff(now, last_alert, :minute) >= rule.cooldown_minutes

    # Check condition
    metric_value = Map.get(metrics, rule.metric)
    condition_met = evaluate_condition(rule.condition, metric_value, rule.threshold)

    cooldown_expired and condition_met
  end

  defp evaluate_condition(:greater_than, value, threshold) when is_number(value) do
    value > threshold
  end
  defp evaluate_condition(:less_than, value, threshold) when is_number(value) do
    value < threshold
  end
  defp evaluate_condition(:equals, value, threshold), do: value == threshold
  defp evaluate_condition(_, _, _), do: false

  defp create_alert(rule, metrics, timestamp) do
    %{
      rule_name: rule.name,
      severity: rule.severity,
      metric: rule.metric,
      current_value: Map.get(metrics, rule.metric),
      threshold: rule.threshold,
      triggered_at: timestamp,
      message: "#{rule.name}: #{rule.metric} is #{Map.get(metrics, rule.metric)} (threshold: #{rule.threshold})"
    }
  end

  def format_alerts(alerts) do
    if Enum.empty?(alerts) do
      "No alerts triggered"
    else
      Enum.map_join(alerts, "\n", fn alert ->
        severity_icon = case alert.severity do
          :critical -> "[CRITICAL]"
          :warning -> "[WARNING]"
          :info -> "[INFO]"
        end
        "#{severity_icon} #{alert.message}"
      end)
    end
  end
end

# Create alert manager and evaluate
alert_manager = AlertManager.new()
current_metrics = MetricsCollector.get_metrics()
  |> Map.put(:health_status, health_result.status)

{updated_manager, triggered_alerts} = AlertManager.evaluate_rules(alert_manager, current_metrics)

alerts_md = """
## Alert Evaluation

### Current Metrics for Alerting
- Error Rate: #{Float.round(current_metrics.error_rate, 2)}%
- Latency P95: #{current_metrics.latency_p95}ms
- Latency P99: #{current_metrics.latency_p99}ms
- Throughput: #{Float.round(current_metrics.throughput_per_second, 3)} req/s
- Health Status: #{current_metrics.health_status}

### Triggered Alerts
#{AlertManager.format_alerts(triggered_alerts)}

### Alert Rules Configured
#{Enum.map_join(AlertManager.default_rules(), "\n", fn rule ->
  "- **#{rule.name}** (#{rule.severity}): #{rule.metric} #{rule.condition} #{rule.threshold}"
end)}
"""

Kino.Markdown.new(alerts_md)

Anomaly Detection Concepts

defmodule AnomalyDetector do
  @moduledoc """
  Simple anomaly detection using statistical methods.
  """

  @doc """
  Detects anomalies using z-score method.
  Values more than `threshold` standard deviations from mean are anomalies.
  """
  def detect_zscore_anomalies(values, threshold \\ 2.0) do
    if length(values) < 10 do
      {:error, :insufficient_data}
    else
      mean = Enum.sum(values) / length(values)
      variance = Enum.reduce(values, 0, fn v, acc ->
        acc + :math.pow(v - mean, 2)
      end) / length(values)
      std_dev = :math.sqrt(variance)

      if std_dev == 0 do
        {:ok, []}
      else
        anomalies = values
          |> Enum.with_index()
          |> Enum.filter(fn {v, _idx} ->
            z_score = abs(v - mean) / std_dev
            z_score > threshold
          end)
          |> Enum.map(fn {v, idx} ->
            %{value: v, index: idx, z_score: abs(v - mean) / std_dev}
          end)

        {:ok, anomalies}
      end
    end
  end

  @doc """
  Detects sudden changes using percentage change from moving average.
  """
  def detect_spike(values, window_size \\ 5, spike_threshold \\ 50) do
    if length(values) < window_size do
      {:error, :insufficient_data}
    else
      values
      |> Enum.chunk_every(window_size, 1, :discard)
      |> Enum.with_index()
      |> Enum.reduce([], fn {window, idx}, acc ->
        [current | history] = Enum.reverse(window)
        avg = Enum.sum(history) / length(history)

        if avg > 0 do
          pct_change = (current - avg) / avg * 100
          if abs(pct_change) > spike_threshold do
            [%{index: idx + window_size - 1, value: current, pct_change: pct_change} | acc]
          else
            acc
          end
        else
          acc
        end
      end)
      |> Enum.reverse()
    end
  end
end

# Get latency data for analysis
latencies = MetricsCollector.get_metrics().latencies |> Enum.take(100)

# Run anomaly detection
anomaly_md = case AnomalyDetector.detect_zscore_anomalies(latencies) do
  {:ok, anomalies} ->
    """
    ## Anomaly Detection Results

    ### Z-Score Analysis (threshold: 2.0 std devs)
    **Anomalies Found**: #{length(anomalies)}

    #{if length(anomalies) > 0 do
      anomalies
      |> Enum.take(5)
      |> Enum.map_join("\n", fn a ->
        "- Value: #{a.value}ms (z-score: #{Float.round(a.z_score, 2)})"
      end)
    else
      "No anomalies detected in current data"
    end}

    ### Spike Detection
    #{case AnomalyDetector.detect_spike(latencies) do
      {:error, _} -> "Insufficient data for spike detection"
      spikes when length(spikes) == 0 -> "No spikes detected"
      spikes ->
        spikes
        |> Enum.take(3)
        |> Enum.map_join("\n", fn s ->
          "- Spike at index #{s.index}: #{s.value}ms (#{Float.round(s.pct_change, 1)}% change)"
        end)
    end}

    ### Tips for Anomaly Detection
    - Establish baselines during normal operation periods
    - Adjust thresholds based on your service's typical variance
    - Consider time-of-day patterns (business hours vs off-hours)
    - Use multiple detection methods for better accuracy
    """

  {:error, :insufficient_data} ->
    "Insufficient data for anomaly detection. Need at least 10 data points."
end

Kino.Markdown.new(anomaly_md)

Logging Best Practices

Implementing structured logging for SOAP services:

defmodule StructuredLogger do
  @moduledoc """
  Structured logging utilities for SOAP services.
  """

  require Logger

  @sensitive_fields ~w(password token secret key credential auth apiKey)

  def log_request(operation, params, opts \\ []) do
    correlation_id = Keyword.get(opts, :correlation_id, generate_correlation_id())
    level = Keyword.get(opts, :level, :info)

    sanitized_params = sanitize_sensitive_data(params)

    log_entry = %{
      event: "soap_request",
      correlation_id: correlation_id,
      operation: operation,
      params: sanitized_params,
      timestamp: DateTime.utc_now() |> DateTime.to_iso8601()
    }

    log_with_level(level, log_entry)

    correlation_id
  end

  def log_response(correlation_id, response, duration_ms, opts \\ []) do
    level = Keyword.get(opts, :level, :info)
    status = Keyword.get(opts, :status, :success)

    # Truncate large responses
    truncated_response = truncate_response(response)

    log_entry = %{
      event: "soap_response",
      correlation_id: correlation_id,
      status: status,
      duration_ms: duration_ms,
      response_preview: truncated_response,
      timestamp: DateTime.utc_now() |> DateTime.to_iso8601()
    }

    log_with_level(level, log_entry)
  end

  def log_error(correlation_id, error, opts \\ []) do
    log_entry = %{
      event: "soap_error",
      correlation_id: correlation_id,
      error_type: classify_error(error),
      error_message: format_error_message(error),
      stack_trace: opts[:stack_trace],
      timestamp: DateTime.utc_now() |> DateTime.to_iso8601()
    }

    Logger.error(fn -> Jason.encode!(log_entry) end)
  end

  defp sanitize_sensitive_data(data) when is_map(data) do
    Enum.reduce(data, %{}, fn {key, value}, acc ->
      sanitized_value = if sensitive_field?(key) do
        "[REDACTED]"
      else
        sanitize_sensitive_data(value)
      end
      Map.put(acc, key, sanitized_value)
    end)
  end
  defp sanitize_sensitive_data(data) when is_list(data) do
    Enum.map(data, &amp;sanitize_sensitive_data/1)
  end
  defp sanitize_sensitive_data(data), do: data

  defp sensitive_field?(field) when is_binary(field) do
    field_lower = String.downcase(field)
    Enum.any?(@sensitive_fields, fn sensitive ->
      String.contains?(field_lower, sensitive)
    end)
  end
  defp sensitive_field?(field) when is_atom(field) do
    sensitive_field?(Atom.to_string(field))
  end
  defp sensitive_field?(_), do: false

  defp truncate_response(response) when is_binary(response) do
    if byte_size(response) > 500 do
      String.slice(response, 0, 500) <> "... [truncated]"
    else
      response
    end
  end
  defp truncate_response(response) when is_map(response) do
    case Jason.encode(response) do
      {:ok, json} -> truncate_response(json)
      _ -> "[Unable to serialize response]"
    end
  end
  defp truncate_response(response), do: inspect(response, limit: 100)

  defp classify_error(%{type: type}), do: type
  defp classify_error(_), do: :unknown

  defp format_error_message(%{type: :soap_fault, fault_string: msg}), do: msg
  defp format_error_message(%{type: :http_error, status: status}), do: "HTTP #{status}"
  defp format_error_message(%{reason: reason}), do: inspect(reason)
  defp format_error_message(error), do: inspect(error)

  defp generate_correlation_id do
    :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
  end

  defp log_with_level(:debug, entry), do: Logger.debug(fn -> Jason.encode!(entry) end)
  defp log_with_level(:info, entry), do: Logger.info(fn -> Jason.encode!(entry) end)
  defp log_with_level(:warning, entry), do: Logger.warning(fn -> Jason.encode!(entry) end)
  defp log_with_level(:error, entry), do: Logger.error(fn -> Jason.encode!(entry) end)
end

# Demo structured logging
demo_params = %{
  "userId" => "12345",
  "password" => "secret123",
  "apiKey" => "sk-1234567890",
  "data" => %{
    "name" => "John Doe",
    "token" => "bearer-xyz"
  }
}

IO.puts("Example: Logging a request with sensitive data sanitization")
IO.puts("Original params: #{inspect(demo_params)}")

correlation_id = StructuredLogger.log_request("GetUser", demo_params)

IO.puts("\nCorrelation ID: #{correlation_id}")
IO.puts("\nNote: Sensitive fields (password, apiKey, token) are redacted in logs")

Log Levels Strategy

log_levels_md = """
## Log Levels Strategy for SOAP Services

### DEBUG Level
Use for detailed troubleshooting:
- Full request/response XML (sanitized)
- WSDL parsing details
- Parameter validation steps
- Connection pool status

### INFO Level
Use for normal operations:
- Request initiated (with correlation ID)
- Request completed (with duration)
- Service health checks passing

### WARNING Level
Use for concerning but recoverable situations:
- High latency (> p95 threshold)
- Retry attempts
- Service degradation detected
- Near rate limit

### ERROR Level
Use for failures requiring attention:
- SOAP faults
- HTTP errors (4xx, 5xx)
- Connection failures
- Authentication failures

### Correlation ID Best Practices

1. **Generate at entry point**: Create correlation ID when request enters your system
2. **Propagate through calls**: Pass correlation ID to all downstream services
3. **Include in SOAP headers**: Add correlation ID as custom SOAP header
4. **Log consistently**: Include correlation ID in every log entry

Example: Adding correlation ID to SOAP request

def call_with_tracing(client, operation, params) do correlation_id = generate_correlation_id()

headers = [

Lather.Soap.Header.custom("X-Correlation-ID", correlation_id, %{})

]

result = Lather.DynamicClient.call(

client,
operation,
params,
soap_headers: headers

)

{correlation_id, result} end

"""

Kino.Markdown.new(log_levels_md)

Real-Time Dashboard

Creating an interactive monitoring dashboard:

defmodule DashboardState do
  use Agent

  def start_link do
    Agent.start_link(fn -> %{last_update: nil} end, name: __MODULE__)
  end

  def update_timestamp do
    Agent.update(__MODULE__, fn state ->
      %{state | last_update: DateTime.utc_now()}
    end)
  end

  def get_last_update do
    Agent.get(__MODULE__, fn state -> state.last_update end)
  end
end

{:ok, _} = DashboardState.start_link()

# Create refresh button
refresh_button = Kino.Control.button("Refresh Dashboard")

# Create the dashboard frame
dashboard_frame = Kino.Frame.new()

# Dashboard rendering function
defmodule Dashboard do
  def render do
    metrics = MetricsCollector.get_metrics()
    recent = MetricsCollector.get_recent_requests(20)

    # Status indicators
    status = cond do
      metrics.error_rate > 20 -> {"CRITICAL", "background-color: #dc3545"}
      metrics.error_rate > 5 -> {"WARNING", "background-color: #ffc107"}
      true -> {"HEALTHY", "background-color: #28a745"}
    end

    # Build dashboard HTML
    html = """
    
      
        

SOAP Service Dashboard

#{elem(status, 0)}
#{metrics.total_requests} Total Requests #{Float.round(100 - metrics.error_rate, 1)}% Success Rate #{metrics.latency_p50}ms Median Latency #{Float.round(metrics.throughput_per_second, 2)} Requests/sec

Latency Percentiles

P50 (Median) #{metrics.latency_p50}ms
P95 #{metrics.latency_p95}ms
P99 #{metrics.latency_p99}ms
Max #{metrics.latency_max}ms

Errors by Type

#{render_error_breakdown(metrics.errors_by_type)}

Recent Requests

#{render_recent_requests(recent)}
Time Operation Duration Status
Last updated:
#{DateTime.utc_now() |> DateTime.to_iso8601()} """ Kino.HTML.new(html) end defp render_error_breakdown(errors) when map_size(errors) == 0 do "

No errors recorded

"
end defp render_error_breakdown(errors) do errors |> Enum.map(fn {type, count} -> " #{type} #{count} " end) |> Enum.join("") end defp render_recent_requests(requests) do requests |> Enum.take(10) |> Enum.map(fn req -> status_badge = case req.status do :success -> "OK" {:error, type} -> "#{type}" end time = Calendar.strftime(req.timestamp, "%H:%M:%S") " #{time} #{req.operation} #{req.duration_ms}ms #{status_badge} " end) |> Enum.join("") end end # Initial render Kino.Frame.render(dashboard_frame, Dashboard.render()) # Handle refresh button clicks Kino.listen(refresh_button, fn _event -> # Simulate new traffic MetricsSimulator.simulate_traffic(10) DashboardState.update_timestamp() Kino.Frame.render(dashboard_frame, Dashboard.render()) end) Kino.Layout.grid([refresh_button, dashboard_frame], columns: 1)

Performance Baselines

Establishing and tracking performance baselines:

defmodule PerformanceBaseline do
  @moduledoc """
  Tools for establishing and comparing performance baselines.
  """

  defstruct [
    :name,
    :created_at,
    :latency_p50,
    :latency_p95,
    :latency_p99,
    :error_rate,
    :throughput,
    :sample_size
  ]

  def capture(name) do
    metrics = MetricsCollector.get_metrics()

    %__MODULE__{
      name: name,
      created_at: DateTime.utc_now(),
      latency_p50: metrics.latency_p50,
      latency_p95: metrics.latency_p95,
      latency_p99: metrics.latency_p99,
      error_rate: metrics.error_rate,
      throughput: metrics.throughput_per_second,
      sample_size: metrics.total_requests
    }
  end

  def compare(current, baseline) do
    comparisons = [
      compare_metric("Latency P50", current.latency_p50, baseline.latency_p50, :lower_better),
      compare_metric("Latency P95", current.latency_p95, baseline.latency_p95, :lower_better),
      compare_metric("Latency P99", current.latency_p99, baseline.latency_p99, :lower_better),
      compare_metric("Error Rate", current.error_rate, baseline.error_rate, :lower_better),
      compare_metric("Throughput", current.throughput, baseline.throughput, :higher_better)
    ]

    overall = calculate_overall_status(comparisons)

    %{
      comparisons: comparisons,
      overall_status: overall,
      baseline_name: baseline.name,
      baseline_date: baseline.created_at
    }
  end

  defp compare_metric(name, current, baseline, direction) when baseline > 0 do
    pct_change = (current - baseline) / baseline * 100

    status = case {direction, pct_change} do
      {:lower_better, pct} when pct < -10 -> :improved
      {:lower_better, pct} when pct > 20 -> :degraded
      {:higher_better, pct} when pct > 10 -> :improved
      {:higher_better, pct} when pct < -20 -> :degraded
      _ -> :stable
    end

    %{
      name: name,
      current: current,
      baseline: baseline,
      pct_change: pct_change,
      status: status
    }
  end
  defp compare_metric(name, current, baseline, _direction) do
    %{
      name: name,
      current: current,
      baseline: baseline,
      pct_change: 0.0,
      status: :stable
    }
  end

  defp calculate_overall_status(comparisons) do
    degraded_count = Enum.count(comparisons, fn c -> c.status == :degraded end)
    improved_count = Enum.count(comparisons, fn c -> c.status == :improved end)

    cond do
      degraded_count >= 2 -> :degraded
      degraded_count == 1 and improved_count == 0 -> :warning
      improved_count >= 2 -> :improved
      true -> :stable
    end
  end

  def capacity_estimate(baseline, target_throughput) do
    if baseline.throughput > 0 do
      scale_factor = target_throughput / baseline.throughput

      %{
        current_throughput: baseline.throughput,
        target_throughput: target_throughput,
        scale_factor: scale_factor,
        estimated_instances: ceil(scale_factor),
        recommendations: generate_capacity_recommendations(scale_factor, baseline)
      }
    else
      %{error: "Insufficient baseline data for capacity estimation"}
    end
  end

  defp generate_capacity_recommendations(scale_factor, baseline) do
    recommendations = []

    recommendations = if scale_factor > 1.5 do
      ["Consider horizontal scaling with #{ceil(scale_factor)} instances" | recommendations]
    else
      recommendations
    end

    recommendations = if baseline.latency_p99 > 2000 do
      ["Optimize high p99 latency before scaling" | recommendations]
    else
      recommendations
    end

    recommendations = if baseline.error_rate > 1 do
      ["Address error rate (#{Float.round(baseline.error_rate, 1)}%) before scaling" | recommendations]
    else
      recommendations
    end

    if Enum.empty?(recommendations) do
      ["Current capacity appears sufficient for target load"]
    else
      recommendations
    end
  end
end

# Capture baseline
baseline = PerformanceBaseline.capture("Production Baseline v1")

# Simulate some performance changes
MetricsSimulator.simulate_traffic(50)

# Capture current state and compare
current = PerformanceBaseline.capture("Current")
comparison = PerformanceBaseline.compare(current, baseline)

# Estimate capacity for 10x throughput
capacity = PerformanceBaseline.capacity_estimate(baseline, baseline.throughput * 10)

baseline_md = """
## Performance Baseline Analysis

### Captured Baseline: #{baseline.name}
**Captured At**: #{DateTime.to_iso8601(baseline.created_at)}
**Sample Size**: #{baseline.sample_size} requests

| Metric | Baseline Value |
|--------|----------------|
| Latency P50 | #{baseline.latency_p50}ms |
| Latency P95 | #{baseline.latency_p95}ms |
| Latency P99 | #{baseline.latency_p99}ms |
| Error Rate | #{Float.round(baseline.error_rate, 2)}% |
| Throughput | #{Float.round(baseline.throughput, 3)} req/s |

### Comparison with Current Performance

**Overall Status**: #{comparison.overall_status}

| Metric | Baseline | Current | Change | Status |
|--------|----------|---------|--------|--------|
#{Enum.map_join(comparison.comparisons, "\n", fn c ->
  change_str = if c.pct_change >= 0 do
    "+#{Float.round(c.pct_change, 1)}%"
  else
    "#{Float.round(c.pct_change, 1)}%"
  end
  status_icon = case c.status do
    :improved -> "[+]"
    :degraded -> "[-]"
    :stable -> "[=]"
  end
  "| #{c.name} | #{c.baseline} | #{c.current} | #{change_str} | #{status_icon} |"
end)}

### Capacity Planning

**Current Throughput**: #{Float.round(capacity.current_throughput, 3)} req/s
**Target Throughput**: #{Float.round(capacity.target_throughput, 3)} req/s (10x)
**Scale Factor**: #{Float.round(capacity.scale_factor, 2)}x
**Estimated Instances**: #{capacity.estimated_instances}

**Recommendations**:
#{Enum.map_join(capacity.recommendations, "\n", fn r -> "- #{r}" end)}
"""

Kino.Markdown.new(baseline_md)

Load Testing Integration

defmodule LoadTestSimulator do
  @moduledoc """
  Simulates load testing scenarios for capacity planning.
  """

  def run_scenario(name, config) do
    IO.puts("Starting load test: #{name}")
    IO.puts("Duration: #{config.duration_seconds}s, RPS: #{config.target_rps}")

    start_time = System.monotonic_time(:second)
    results = run_load(config, start_time)

    analyze_results(name, results)
  end

  defp run_load(config, start_time) do
    interval_ms = round(1000 / config.target_rps)
    end_time = start_time + config.duration_seconds

    Stream.repeatedly(fn -> :ok end)
    |> Stream.take_while(fn _ ->
      System.monotonic_time(:second) < end_time
    end)
    |> Enum.reduce([], fn _, acc ->
      # Simulate request
      latency = simulate_request(config)
      status = if :rand.uniform() > config.error_rate, do: :success, else: :error

      MetricsCollector.record_request("LoadTest", latency, status)
      Process.sleep(interval_ms)

      [{latency, status} | acc]
    end)
  end

  defp simulate_request(config) do
    base = config.base_latency_ms
    variance = config.latency_variance_ms

    # Add some realistic variance
    jitter = :rand.uniform(variance * 2) - variance

    # Occasionally add slow requests (tail latency)
    if :rand.uniform() > 0.95 do
      base + jitter + :rand.uniform(base * 3)
    else
      max(10, base + jitter)
    end
  end

  defp analyze_results(name, results) do
    latencies = Enum.map(results, fn {l, _} -> l end)
    errors = Enum.count(results, fn {_, s} -> s == :error end)

    %{
      test_name: name,
      total_requests: length(results),
      errors: errors,
      error_rate: errors / length(results) * 100,
      latency_avg: Enum.sum(latencies) / length(latencies),
      latency_max: Enum.max(latencies),
      latency_min: Enum.min(latencies)
    }
  end
end

# Run a quick load test
load_config = %{
  duration_seconds: 3,
  target_rps: 20,
  base_latency_ms: 150,
  latency_variance_ms: 50,
  error_rate: 0.05
}

load_result = LoadTestSimulator.run_scenario("Quick Load Test", load_config)

load_md = """
## Load Test Results: #{load_result.test_name}

| Metric | Value |
|--------|-------|
| Total Requests | #{load_result.total_requests} |
| Errors | #{load_result.errors} |
| Error Rate | #{Float.round(load_result.error_rate, 2)}% |
| Avg Latency | #{Float.round(load_result.latency_avg, 1)}ms |
| Min Latency | #{load_result.latency_min}ms |
| Max Latency | #{load_result.latency_max}ms |

### Load Testing Best Practices

1. **Warm-up Period**: Run warm-up requests before measuring
2. **Realistic Patterns**: Model actual traffic patterns (not just constant load)
3. **Gradual Ramp-up**: Increase load gradually to find breaking points
4. **Monitor Dependencies**: Watch downstream services during tests
5. **Test in Isolation**: Use dedicated test environments when possible
"""

Kino.Markdown.new(load_md)

Summary and Best Practices

summary_md = """
## Production Monitoring Summary

### Key Metrics to Track

| Metric | Why It Matters | Alert Threshold |
|--------|----------------|-----------------|
| Error Rate | Service reliability | > 5% warning, > 20% critical |
| Latency P95 | User experience | > 2s warning, > 5s critical |
| Latency P99 | Tail latency issues | > 5s warning, > 10s critical |
| Throughput | Capacity utilization | < 10% of baseline |
| SOAP Faults | Application errors | > 1% of requests |

### Monitoring Checklist

- [ ] Telemetry handlers attached for all Lather events
- [ ] Structured logging with correlation IDs
- [ ] Health check endpoints for liveness and readiness
- [ ] Alerting rules configured with appropriate thresholds
- [ ] Performance baselines established
- [ ] Dashboard available for real-time monitoring
- [ ] Sensitive data redaction in logs
- [ ] Error categorization by type

### Implementation Tips

1. **Start Simple**: Begin with basic metrics and add complexity as needed
2. **Use Correlation IDs**: Trace requests across all system components
3. **Redact Sensitive Data**: Never log passwords, tokens, or PII
4. **Set Meaningful Thresholds**: Base alerts on actual service behavior
5. **Monitor Trends**: Look for gradual degradation, not just spikes
6. **Test Your Monitoring**: Verify alerts fire when expected
7. **Document Runbooks**: Prepare response procedures for common alerts

### Integration with Observability Platforms

Lather's telemetry events can be exported to:
- **Prometheus**: Use `telemetry_metrics_prometheus`
- **StatsD**: Use `telemetry_metrics_statsd`
- **DataDog**: Use `telemetry_metrics_datadog`
- **Grafana Cloud**: Export via any of the above

Example Prometheus setup:

defmodule MyApp.Telemetry do use Supervisor import Telemetry.Metrics

def start_link(arg) do

Supervisor.start_link(__MODULE__, arg, name: __MODULE__)

end

def init(_arg) do

children = [
  {TelemetryMetricsPrometheus, metrics: metrics()}
]
Supervisor.init(children, strategy: :one_for_one)

end

defp metrics do

[
  counter("lather.request.count", tags: [:operation, :status]),
  distribution("lather.request.duration", unit: {:native, :millisecond}),
  counter("lather.request.error.count", tags: [:error_type])
]

end end


Happy monitoring!
"""

Kino.Markdown.new(summary_md)

Next Steps

You now have a comprehensive understanding of production monitoring for SOAP services with Lather. Consider exploring:

  1. Enterprise Integration: See enterprise_integration.livemd for advanced authentication
  2. Debugging: See debugging_troubleshooting.livemd for troubleshooting techniques
  3. Getting Started: See getting_started.livemd for basic usage patterns
  4. Advanced Types: See advanced_types.livemd for complex data handling