Production Monitoring and Observability for SOAP Services
Mix.install([
{:lather, "~> 1.0"},
{:finch, "~> 0.18"},
{:kino, "~> 0.12"},
{:kino_vega_lite, "~> 0.1"},
{:jason, "~> 1.4"},
{:telemetry, "~> 1.2"},
{:telemetry_metrics, "~> 0.6"},
{:telemetry_poller, "~> 1.0"}
])
Introduction
Production monitoring is essential for maintaining reliable SOAP service integrations. This Livebook covers:
- Setting up comprehensive telemetry for SOAP operations
- Tracking key metrics (latency, error rates, throughput)
- Implementing health checks and alerting
- Building real-time monitoring dashboards
- Establishing performance baselines
Why Monitoring Matters for SOAP Services
SOAP services present unique monitoring challenges:
- XML Parsing Overhead: SOAP’s XML-based protocol adds parsing time that must be tracked
- Complex Error States: SOAP faults, HTTP errors, and transport failures need separate tracking
- Enterprise Dependencies: SOAP services often connect to critical business systems
- Legacy Integration: Many SOAP services run on legacy infrastructure with varying reliability
- Payload Size Impact: Large XML payloads can affect latency and memory usage
Environment Setup
# Start applications
{:ok, _} = Application.ensure_all_started(:lather)
{:ok, _} = Application.ensure_all_started(:telemetry)
# Start Finch if not already running (safe to re-run this cell)
if Process.whereis(Lather.Finch) == nil do
children = [
{Finch,
name: Lather.Finch,
pools: %{
:default => [
size: 10,
count: 2,
conn_opts: [
transport_opts: [timeout: 30_000]
]
]
}}
]
{:ok, _} = Supervisor.start_link(children, strategy: :one_for_one)
end
IO.puts("Monitoring environment initialized!")
Telemetry Integration
Lather emits telemetry events that you can use for monitoring. Let’s set up comprehensive telemetry handling:
defmodule SOAPTelemetryHandler do
@moduledoc """
Handles telemetry events from Lather SOAP operations.
"""
require Logger
# Lather emits these telemetry events:
# - [:lather, :request, :start] - SOAP request started
# - [:lather, :request, :stop] - SOAP request completed
# - [:lather, :request, :error] - SOAP request failed
# - [:lather, :wsdl, :parse, :start] - WSDL parsing started
# - [:lather, :wsdl, :parse, :stop] - WSDL parsing completed
def attach_handlers do
:telemetry.attach_many(
"soap-monitoring-handler",
[
[:lather, :request, :start],
[:lather, :request, :stop],
[:lather, :request, :error],
[:lather, :wsdl, :parse, :start],
[:lather, :wsdl, :parse, :stop]
],
&handle_event/4,
nil
)
IO.puts("Telemetry handlers attached successfully")
end
def handle_event([:lather, :request, :start], measurements, metadata, _config) do
Logger.debug("SOAP Request Started",
operation: metadata[:operation],
endpoint: metadata[:endpoint],
system_time: measurements[:system_time]
)
# Store start time for duration calculation
Process.put({:soap_request_start, metadata[:request_id]}, System.monotonic_time())
end
def handle_event([:lather, :request, :stop], measurements, metadata, _config) do
duration_ms = System.convert_time_unit(
measurements[:duration] || 0,
:native,
:millisecond
)
Logger.info("SOAP Request Completed",
operation: metadata[:operation],
endpoint: metadata[:endpoint],
duration_ms: duration_ms,
status: :success
)
# Emit to metrics collector
MetricsCollector.record_request(metadata[:operation], duration_ms, :success)
end
def handle_event([:lather, :request, :error], measurements, metadata, _config) do
duration_ms = System.convert_time_unit(
measurements[:duration] || 0,
:native,
:millisecond
)
error_type = classify_error(metadata[:error])
Logger.error("SOAP Request Failed",
operation: metadata[:operation],
endpoint: metadata[:endpoint],
duration_ms: duration_ms,
error_type: error_type,
error: inspect(metadata[:error])
)
MetricsCollector.record_request(metadata[:operation], duration_ms, {:error, error_type})
end
def handle_event([:lather, :wsdl, :parse, :start], _measurements, metadata, _config) do
Logger.debug("WSDL Parse Started", url: metadata[:url])
end
def handle_event([:lather, :wsdl, :parse, :stop], measurements, metadata, _config) do
duration_ms = System.convert_time_unit(
measurements[:duration] || 0,
:native,
:millisecond
)
Logger.info("WSDL Parse Completed",
url: metadata[:url],
duration_ms: duration_ms,
operations_count: metadata[:operations_count]
)
end
defp classify_error(error) do
case error do
%{type: :soap_fault} -> :soap_fault
%{type: :http_error, status: status} when status >= 500 -> :server_error
%{type: :http_error, status: status} when status >= 400 -> :client_error
%{type: :transport_error, reason: :timeout} -> :timeout
%{type: :transport_error} -> :transport_error
_ -> :unknown
end
end
end
# Attach the handlers
SOAPTelemetryHandler.attach_handlers()
Metrics Collection
Let’s create a metrics collector that tracks key performance indicators:
defmodule MetricsCollector do
use Agent
@doc """
Starts the metrics collector agent.
"""
def start_link do
Agent.start_link(fn -> initial_state() end, name: __MODULE__)
end
defp initial_state do
%{
requests: [],
errors_by_type: %{},
latencies: [],
start_time: System.monotonic_time(:second),
connection_pool: %{active: 0, idle: 10, total: 10}
}
end
def record_request(operation, duration_ms, status) do
timestamp = DateTime.utc_now()
Agent.update(__MODULE__, fn state ->
request = %{
operation: operation,
duration_ms: duration_ms,
status: status,
timestamp: timestamp
}
state
|> Map.update!(:requests, fn requests ->
# Keep last 1000 requests for analysis
[request | requests] |> Enum.take(1000)
end)
|> Map.update!(:latencies, fn latencies ->
[duration_ms | latencies] |> Enum.take(1000)
end)
|> maybe_record_error(status)
end)
end
defp maybe_record_error(state, {:error, error_type}) do
Map.update!(state, :errors_by_type, fn errors ->
Map.update(errors, error_type, 1, &(&1 + 1))
end)
end
defp maybe_record_error(state, _), do: state
def get_metrics do
Agent.get(__MODULE__, fn state ->
latencies = state.latencies
requests = state.requests
uptime_seconds = System.monotonic_time(:second) - state.start_time
successful_requests = Enum.count(requests, fn r -> r.status == :success end)
failed_requests = length(requests) - successful_requests
%{
total_requests: length(requests),
successful_requests: successful_requests,
failed_requests: failed_requests,
error_rate: if(length(requests) > 0, do: failed_requests / length(requests) * 100, else: 0),
latency_p50: percentile(latencies, 50),
latency_p95: percentile(latencies, 95),
latency_p99: percentile(latencies, 99),
latency_avg: if(length(latencies) > 0, do: Enum.sum(latencies) / length(latencies), else: 0),
latency_min: if(length(latencies) > 0, do: Enum.min(latencies), else: 0),
latency_max: if(length(latencies) > 0, do: Enum.max(latencies), else: 0),
throughput_per_second: if(uptime_seconds > 0, do: length(requests) / uptime_seconds, else: 0),
errors_by_type: state.errors_by_type,
connection_pool: state.connection_pool,
uptime_seconds: uptime_seconds
}
end)
end
def get_recent_requests(count \\ 10) do
Agent.get(__MODULE__, fn state ->
Enum.take(state.requests, count)
end)
end
def reset do
Agent.update(__MODULE__, fn _state -> initial_state() end)
end
defp percentile([], _p), do: 0
defp percentile(values, p) do
sorted = Enum.sort(values)
k = (length(sorted) - 1) * p / 100
f = floor(k)
c = ceil(k)
if f == c do
Enum.at(sorted, round(k))
else
d0 = Enum.at(sorted, f) * (c - k)
d1 = Enum.at(sorted, c) * (k - f)
round(d0 + d1)
end
end
end
# Start the metrics collector
{:ok, _pid} = MetricsCollector.start_link()
IO.puts("Metrics collector started!")
Key Metrics to Monitor
Simulating SOAP Operations for Metrics
defmodule MetricsSimulator do
@operations ["GetUser", "CreateUser", "UpdateUser", "SearchUsers", "DeleteUser"]
@error_types [:soap_fault, :timeout, :server_error, :client_error]
def simulate_traffic(count \\ 50) do
Enum.each(1..count, fn _ ->
operation = Enum.random(@operations)
# Simulate varying latencies
base_latency = Enum.random(50..500)
jitter = Enum.random(-20..20)
duration_ms = max(10, base_latency + jitter)
# 10% error rate
status = if :rand.uniform() > 0.1 do
:success
else
{:error, Enum.random(@error_types)}
end
MetricsCollector.record_request(operation, duration_ms, status)
# Small delay between requests
Process.sleep(10)
end)
IO.puts("Simulated #{count} SOAP requests")
end
end
# Generate sample traffic
MetricsSimulator.simulate_traffic(100)
Metrics Dashboard
# Get current metrics
metrics = MetricsCollector.get_metrics()
# Create metrics display
metrics_md = """
## Current Metrics Summary
| Metric | Value |
|--------|-------|
| Total Requests | #{metrics.total_requests} |
| Successful | #{metrics.successful_requests} |
| Failed | #{metrics.failed_requests} |
| Error Rate | #{Float.round(metrics.error_rate, 2)}% |
| Throughput | #{Float.round(metrics.throughput_per_second, 2)} req/s |
### Latency Percentiles (ms)
| Percentile | Value |
|------------|-------|
| p50 (Median) | #{metrics.latency_p50} ms |
| p95 | #{metrics.latency_p95} ms |
| p99 | #{metrics.latency_p99} ms |
| Average | #{Float.round(metrics.latency_avg, 1)} ms |
| Min | #{metrics.latency_min} ms |
| Max | #{metrics.latency_max} ms |
### Errors by Type
#{Enum.map_join(metrics.errors_by_type, "\n", fn {type, count} -> "- #{type}: #{count}" end)}
"""
Kino.Markdown.new(metrics_md)
Latency Distribution Chart
# Get recent requests for charting
recent_requests = MetricsCollector.get_recent_requests(50)
# Prepare data for VegaLite chart
chart_data = Enum.map(recent_requests, fn req ->
%{
"operation" => to_string(req.operation),
"latency" => req.duration_ms,
"status" => if(req.status == :success, do: "success", else: "error"),
"timestamp" => DateTime.to_iso8601(req.timestamp)
}
end)
VegaLite.new(width: 600, height: 300, title: "Request Latency by Operation")
|> VegaLite.data_from_values(chart_data)
|> VegaLite.mark(:boxplot)
|> VegaLite.encode_field(:x, "operation", type: :nominal, title: "Operation")
|> VegaLite.encode_field(:y, "latency", type: :quantitative, title: "Latency (ms)")
|> VegaLite.encode_field(:color, "status", type: :nominal)
Error Rate Over Time
defmodule ErrorRateAnalyzer do
def analyze_error_distribution(requests) do
requests
|> Enum.group_by(fn req ->
case req.status do
:success -> :success
{:error, type} -> type
end
end)
|> Enum.map(fn {status, reqs} ->
%{"status" => to_string(status), "count" => length(reqs)}
end)
end
end
error_data = ErrorRateAnalyzer.analyze_error_distribution(recent_requests)
VegaLite.new(width: 400, height: 300, title: "Request Status Distribution")
|> VegaLite.data_from_values(error_data)
|> VegaLite.mark(:arc, inner_radius: 50)
|> VegaLite.encode_field(:theta, "count", type: :quantitative)
|> VegaLite.encode_field(:color, "status", type: :nominal)
Health Checks
Implementing comprehensive health checks for SOAP service dependencies:
defmodule HealthChecker do
@doc """
Performs comprehensive health checks on SOAP service dependencies.
"""
def check_all(services) do
checks = Enum.map(services, fn service ->
Task.async(fn -> check_service(service) end)
end)
results = Task.await_many(checks, 10_000)
overall_status = if Enum.all?(results, fn r -> r.status == :healthy end) do
:healthy
else
if Enum.any?(results, fn r -> r.status == :unhealthy end) do
:unhealthy
else
:degraded
end
end
%{
status: overall_status,
timestamp: DateTime.utc_now(),
checks: results
}
end
def check_service(service) do
start_time = System.monotonic_time(:millisecond)
result = case service.type do
:wsdl -> check_wsdl_availability(service)
:endpoint -> check_endpoint_health(service)
:dependency -> check_dependency(service)
end
duration = System.monotonic_time(:millisecond) - start_time
Map.merge(result, %{
name: service.name,
type: service.type,
duration_ms: duration,
checked_at: DateTime.utc_now()
})
end
defp check_wsdl_availability(service) do
# Simulate WSDL availability check
case simulate_http_check(service.url) do
{:ok, _} ->
%{status: :healthy, message: "WSDL accessible"}
{:error, reason} ->
%{status: :unhealthy, message: "WSDL unavailable: #{reason}"}
end
end
defp check_endpoint_health(service) do
case simulate_http_check(service.url) do
{:ok, response_time} when response_time < 1000 ->
%{status: :healthy, message: "Endpoint responding normally"}
{:ok, response_time} ->
%{status: :degraded, message: "Endpoint slow: #{response_time}ms"}
{:error, reason} ->
%{status: :unhealthy, message: "Endpoint error: #{reason}"}
end
end
defp check_dependency(service) do
# Simulate dependency check
if :rand.uniform() > 0.1 do
%{status: :healthy, message: "Dependency available"}
else
%{status: :degraded, message: "Dependency experiencing issues"}
end
end
defp simulate_http_check(_url) do
# Simulate HTTP check with random outcomes
Process.sleep(Enum.random(50..200))
case :rand.uniform(10) do
n when n <= 8 -> {:ok, Enum.random(100..500)}
9 -> {:ok, Enum.random(1000..2000)}
10 -> {:error, :timeout}
end
end
@doc """
Liveness probe - checks if the service is running.
"""
def liveness_probe do
# Basic liveness: is the BEAM running and responsive?
%{
status: :alive,
timestamp: DateTime.utc_now(),
memory_mb: :erlang.memory(:total) / 1_000_000,
process_count: length(Process.list())
}
end
@doc """
Readiness probe - checks if the service is ready to handle requests.
"""
def readiness_probe(dependencies) do
health = check_all(dependencies)
ready = health.status in [:healthy, :degraded]
%{
ready: ready,
status: health.status,
timestamp: DateTime.utc_now(),
details: health.checks
}
end
end
# Define sample services to check
sample_services = [
%{name: "UserService WSDL", type: :wsdl, url: "https://example.com/users?wsdl"},
%{name: "UserService Endpoint", type: :endpoint, url: "https://example.com/users"},
%{name: "OrderService WSDL", type: :wsdl, url: "https://example.com/orders?wsdl"},
%{name: "Database", type: :dependency, url: "postgres://localhost/db"},
%{name: "Redis Cache", type: :dependency, url: "redis://localhost:6379"}
]
# Run health checks
health_result = HealthChecker.check_all(sample_services)
# Display results
health_md = """
## Health Check Results
**Overall Status**: #{health_result.status}
**Checked At**: #{DateTime.to_iso8601(health_result.timestamp)}
### Individual Checks
| Service | Type | Status | Duration | Message |
|---------|------|--------|----------|---------|
#{Enum.map_join(health_result.checks, "\n", fn check ->
status_icon = case check.status do
:healthy -> "[OK]"
:degraded -> "[WARN]"
:unhealthy -> "[FAIL]"
end
"| #{check.name} | #{check.type} | #{status_icon} | #{check.duration_ms}ms | #{check.message} |"
end)}
"""
Kino.Markdown.new(health_md)
Liveness and Readiness Probes
# Liveness probe
liveness = HealthChecker.liveness_probe()
# Readiness probe
readiness = HealthChecker.readiness_probe(sample_services)
probes_md = """
## Kubernetes-Style Probes
### Liveness Probe
#{Jason.encode!(liveness, pretty: true)}
### Readiness Probe
**Ready**: #{readiness.ready}
**Status**: #{readiness.status}
Use these endpoints in your Kubernetes deployment:
- `/health/live` - Returns 200 if alive
- `/health/ready` - Returns 200 if ready to serve traffic
"""
Kino.Markdown.new(probes_md)
Alerting Patterns
Setting up intelligent alerting to avoid alert fatigue:
defmodule AlertManager do
@moduledoc """
Manages alerts with threshold-based rules and anomaly detection.
"""
defstruct [:rules, :active_alerts, :alert_history, :cooldowns]
def new do
%__MODULE__{
rules: default_rules(),
active_alerts: %{},
alert_history: [],
cooldowns: %{}
}
end
def default_rules do
[
%{
name: "High Error Rate",
metric: :error_rate,
condition: :greater_than,
threshold: 5.0,
severity: :warning,
cooldown_minutes: 5
},
%{
name: "Critical Error Rate",
metric: :error_rate,
condition: :greater_than,
threshold: 20.0,
severity: :critical,
cooldown_minutes: 2
},
%{
name: "High Latency P95",
metric: :latency_p95,
condition: :greater_than,
threshold: 2000,
severity: :warning,
cooldown_minutes: 5
},
%{
name: "Very High Latency P99",
metric: :latency_p99,
condition: :greater_than,
threshold: 5000,
severity: :critical,
cooldown_minutes: 2
},
%{
name: "Low Throughput",
metric: :throughput_per_second,
condition: :less_than,
threshold: 0.1,
severity: :warning,
cooldown_minutes: 10
},
%{
name: "Service Degraded",
metric: :health_status,
condition: :equals,
threshold: :degraded,
severity: :warning,
cooldown_minutes: 5
},
%{
name: "Service Unhealthy",
metric: :health_status,
condition: :equals,
threshold: :unhealthy,
severity: :critical,
cooldown_minutes: 1
}
]
end
def evaluate_rules(manager, metrics) do
now = DateTime.utc_now()
triggered = Enum.reduce(manager.rules, [], fn rule, acc ->
if should_alert?(manager, rule, metrics, now) do
alert = create_alert(rule, metrics, now)
[alert | acc]
else
acc
end
end)
# Update manager state
updated_manager = Enum.reduce(triggered, manager, fn alert, m ->
%{m |
active_alerts: Map.put(m.active_alerts, alert.rule_name, alert),
alert_history: [alert | m.alert_history] |> Enum.take(100),
cooldowns: Map.put(m.cooldowns, alert.rule_name, now)
}
end)
{updated_manager, triggered}
end
defp should_alert?(manager, rule, metrics, now) do
# Check cooldown
last_alert = Map.get(manager.cooldowns, rule.name)
cooldown_expired = is_nil(last_alert) or
DateTime.diff(now, last_alert, :minute) >= rule.cooldown_minutes
# Check condition
metric_value = Map.get(metrics, rule.metric)
condition_met = evaluate_condition(rule.condition, metric_value, rule.threshold)
cooldown_expired and condition_met
end
defp evaluate_condition(:greater_than, value, threshold) when is_number(value) do
value > threshold
end
defp evaluate_condition(:less_than, value, threshold) when is_number(value) do
value < threshold
end
defp evaluate_condition(:equals, value, threshold), do: value == threshold
defp evaluate_condition(_, _, _), do: false
defp create_alert(rule, metrics, timestamp) do
%{
rule_name: rule.name,
severity: rule.severity,
metric: rule.metric,
current_value: Map.get(metrics, rule.metric),
threshold: rule.threshold,
triggered_at: timestamp,
message: "#{rule.name}: #{rule.metric} is #{Map.get(metrics, rule.metric)} (threshold: #{rule.threshold})"
}
end
def format_alerts(alerts) do
if Enum.empty?(alerts) do
"No alerts triggered"
else
Enum.map_join(alerts, "\n", fn alert ->
severity_icon = case alert.severity do
:critical -> "[CRITICAL]"
:warning -> "[WARNING]"
:info -> "[INFO]"
end
"#{severity_icon} #{alert.message}"
end)
end
end
end
# Create alert manager and evaluate
alert_manager = AlertManager.new()
current_metrics = MetricsCollector.get_metrics()
|> Map.put(:health_status, health_result.status)
{updated_manager, triggered_alerts} = AlertManager.evaluate_rules(alert_manager, current_metrics)
alerts_md = """
## Alert Evaluation
### Current Metrics for Alerting
- Error Rate: #{Float.round(current_metrics.error_rate, 2)}%
- Latency P95: #{current_metrics.latency_p95}ms
- Latency P99: #{current_metrics.latency_p99}ms
- Throughput: #{Float.round(current_metrics.throughput_per_second, 3)} req/s
- Health Status: #{current_metrics.health_status}
### Triggered Alerts
#{AlertManager.format_alerts(triggered_alerts)}
### Alert Rules Configured
#{Enum.map_join(AlertManager.default_rules(), "\n", fn rule ->
"- **#{rule.name}** (#{rule.severity}): #{rule.metric} #{rule.condition} #{rule.threshold}"
end)}
"""
Kino.Markdown.new(alerts_md)
Anomaly Detection Concepts
defmodule AnomalyDetector do
@moduledoc """
Simple anomaly detection using statistical methods.
"""
@doc """
Detects anomalies using z-score method.
Values more than `threshold` standard deviations from mean are anomalies.
"""
def detect_zscore_anomalies(values, threshold \\ 2.0) do
if length(values) < 10 do
{:error, :insufficient_data}
else
mean = Enum.sum(values) / length(values)
variance = Enum.reduce(values, 0, fn v, acc ->
acc + :math.pow(v - mean, 2)
end) / length(values)
std_dev = :math.sqrt(variance)
if std_dev == 0 do
{:ok, []}
else
anomalies = values
|> Enum.with_index()
|> Enum.filter(fn {v, _idx} ->
z_score = abs(v - mean) / std_dev
z_score > threshold
end)
|> Enum.map(fn {v, idx} ->
%{value: v, index: idx, z_score: abs(v - mean) / std_dev}
end)
{:ok, anomalies}
end
end
end
@doc """
Detects sudden changes using percentage change from moving average.
"""
def detect_spike(values, window_size \\ 5, spike_threshold \\ 50) do
if length(values) < window_size do
{:error, :insufficient_data}
else
values
|> Enum.chunk_every(window_size, 1, :discard)
|> Enum.with_index()
|> Enum.reduce([], fn {window, idx}, acc ->
[current | history] = Enum.reverse(window)
avg = Enum.sum(history) / length(history)
if avg > 0 do
pct_change = (current - avg) / avg * 100
if abs(pct_change) > spike_threshold do
[%{index: idx + window_size - 1, value: current, pct_change: pct_change} | acc]
else
acc
end
else
acc
end
end)
|> Enum.reverse()
end
end
end
# Get latency data for analysis
latencies = MetricsCollector.get_metrics().latencies |> Enum.take(100)
# Run anomaly detection
anomaly_md = case AnomalyDetector.detect_zscore_anomalies(latencies) do
{:ok, anomalies} ->
"""
## Anomaly Detection Results
### Z-Score Analysis (threshold: 2.0 std devs)
**Anomalies Found**: #{length(anomalies)}
#{if length(anomalies) > 0 do
anomalies
|> Enum.take(5)
|> Enum.map_join("\n", fn a ->
"- Value: #{a.value}ms (z-score: #{Float.round(a.z_score, 2)})"
end)
else
"No anomalies detected in current data"
end}
### Spike Detection
#{case AnomalyDetector.detect_spike(latencies) do
{:error, _} -> "Insufficient data for spike detection"
spikes when length(spikes) == 0 -> "No spikes detected"
spikes ->
spikes
|> Enum.take(3)
|> Enum.map_join("\n", fn s ->
"- Spike at index #{s.index}: #{s.value}ms (#{Float.round(s.pct_change, 1)}% change)"
end)
end}
### Tips for Anomaly Detection
- Establish baselines during normal operation periods
- Adjust thresholds based on your service's typical variance
- Consider time-of-day patterns (business hours vs off-hours)
- Use multiple detection methods for better accuracy
"""
{:error, :insufficient_data} ->
"Insufficient data for anomaly detection. Need at least 10 data points."
end
Kino.Markdown.new(anomaly_md)
Logging Best Practices
Implementing structured logging for SOAP services:
defmodule StructuredLogger do
@moduledoc """
Structured logging utilities for SOAP services.
"""
require Logger
@sensitive_fields ~w(password token secret key credential auth apiKey)
def log_request(operation, params, opts \\ []) do
correlation_id = Keyword.get(opts, :correlation_id, generate_correlation_id())
level = Keyword.get(opts, :level, :info)
sanitized_params = sanitize_sensitive_data(params)
log_entry = %{
event: "soap_request",
correlation_id: correlation_id,
operation: operation,
params: sanitized_params,
timestamp: DateTime.utc_now() |> DateTime.to_iso8601()
}
log_with_level(level, log_entry)
correlation_id
end
def log_response(correlation_id, response, duration_ms, opts \\ []) do
level = Keyword.get(opts, :level, :info)
status = Keyword.get(opts, :status, :success)
# Truncate large responses
truncated_response = truncate_response(response)
log_entry = %{
event: "soap_response",
correlation_id: correlation_id,
status: status,
duration_ms: duration_ms,
response_preview: truncated_response,
timestamp: DateTime.utc_now() |> DateTime.to_iso8601()
}
log_with_level(level, log_entry)
end
def log_error(correlation_id, error, opts \\ []) do
log_entry = %{
event: "soap_error",
correlation_id: correlation_id,
error_type: classify_error(error),
error_message: format_error_message(error),
stack_trace: opts[:stack_trace],
timestamp: DateTime.utc_now() |> DateTime.to_iso8601()
}
Logger.error(fn -> Jason.encode!(log_entry) end)
end
defp sanitize_sensitive_data(data) when is_map(data) do
Enum.reduce(data, %{}, fn {key, value}, acc ->
sanitized_value = if sensitive_field?(key) do
"[REDACTED]"
else
sanitize_sensitive_data(value)
end
Map.put(acc, key, sanitized_value)
end)
end
defp sanitize_sensitive_data(data) when is_list(data) do
Enum.map(data, &sanitize_sensitive_data/1)
end
defp sanitize_sensitive_data(data), do: data
defp sensitive_field?(field) when is_binary(field) do
field_lower = String.downcase(field)
Enum.any?(@sensitive_fields, fn sensitive ->
String.contains?(field_lower, sensitive)
end)
end
defp sensitive_field?(field) when is_atom(field) do
sensitive_field?(Atom.to_string(field))
end
defp sensitive_field?(_), do: false
defp truncate_response(response) when is_binary(response) do
if byte_size(response) > 500 do
String.slice(response, 0, 500) <> "... [truncated]"
else
response
end
end
defp truncate_response(response) when is_map(response) do
case Jason.encode(response) do
{:ok, json} -> truncate_response(json)
_ -> "[Unable to serialize response]"
end
end
defp truncate_response(response), do: inspect(response, limit: 100)
defp classify_error(%{type: type}), do: type
defp classify_error(_), do: :unknown
defp format_error_message(%{type: :soap_fault, fault_string: msg}), do: msg
defp format_error_message(%{type: :http_error, status: status}), do: "HTTP #{status}"
defp format_error_message(%{reason: reason}), do: inspect(reason)
defp format_error_message(error), do: inspect(error)
defp generate_correlation_id do
:crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
end
defp log_with_level(:debug, entry), do: Logger.debug(fn -> Jason.encode!(entry) end)
defp log_with_level(:info, entry), do: Logger.info(fn -> Jason.encode!(entry) end)
defp log_with_level(:warning, entry), do: Logger.warning(fn -> Jason.encode!(entry) end)
defp log_with_level(:error, entry), do: Logger.error(fn -> Jason.encode!(entry) end)
end
# Demo structured logging
demo_params = %{
"userId" => "12345",
"password" => "secret123",
"apiKey" => "sk-1234567890",
"data" => %{
"name" => "John Doe",
"token" => "bearer-xyz"
}
}
IO.puts("Example: Logging a request with sensitive data sanitization")
IO.puts("Original params: #{inspect(demo_params)}")
correlation_id = StructuredLogger.log_request("GetUser", demo_params)
IO.puts("\nCorrelation ID: #{correlation_id}")
IO.puts("\nNote: Sensitive fields (password, apiKey, token) are redacted in logs")
Log Levels Strategy
log_levels_md = """
## Log Levels Strategy for SOAP Services
### DEBUG Level
Use for detailed troubleshooting:
- Full request/response XML (sanitized)
- WSDL parsing details
- Parameter validation steps
- Connection pool status
### INFO Level
Use for normal operations:
- Request initiated (with correlation ID)
- Request completed (with duration)
- Service health checks passing
### WARNING Level
Use for concerning but recoverable situations:
- High latency (> p95 threshold)
- Retry attempts
- Service degradation detected
- Near rate limit
### ERROR Level
Use for failures requiring attention:
- SOAP faults
- HTTP errors (4xx, 5xx)
- Connection failures
- Authentication failures
### Correlation ID Best Practices
1. **Generate at entry point**: Create correlation ID when request enters your system
2. **Propagate through calls**: Pass correlation ID to all downstream services
3. **Include in SOAP headers**: Add correlation ID as custom SOAP header
4. **Log consistently**: Include correlation ID in every log entry
Example: Adding correlation ID to SOAP request
def call_with_tracing(client, operation, params) do correlation_id = generate_correlation_id()
headers = [
Lather.Soap.Header.custom("X-Correlation-ID", correlation_id, %{})
]
result = Lather.DynamicClient.call(
client,
operation,
params,
soap_headers: headers
)
{correlation_id, result} end
"""
Kino.Markdown.new(log_levels_md)
Real-Time Dashboard
Creating an interactive monitoring dashboard:
defmodule DashboardState do
use Agent
def start_link do
Agent.start_link(fn -> %{last_update: nil} end, name: __MODULE__)
end
def update_timestamp do
Agent.update(__MODULE__, fn state ->
%{state | last_update: DateTime.utc_now()}
end)
end
def get_last_update do
Agent.get(__MODULE__, fn state -> state.last_update end)
end
end
{:ok, _} = DashboardState.start_link()
# Create refresh button
refresh_button = Kino.Control.button("Refresh Dashboard")
# Create the dashboard frame
dashboard_frame = Kino.Frame.new()
# Dashboard rendering function
defmodule Dashboard do
def render do
metrics = MetricsCollector.get_metrics()
recent = MetricsCollector.get_recent_requests(20)
# Status indicators
status = cond do
metrics.error_rate > 20 -> {"CRITICAL", "background-color: #dc3545"}
metrics.error_rate > 5 -> {"WARNING", "background-color: #ffc107"}
true -> {"HEALTHY", "background-color: #28a745"}
end
# Build dashboard HTML
html = """
SOAP Service Dashboard
#{elem(status, 0)}
#{metrics.total_requests}
Total Requests
#{Float.round(100 - metrics.error_rate, 1)}%
Success Rate
#{metrics.latency_p50}ms
Median Latency
#{Float.round(metrics.throughput_per_second, 2)}
Requests/sec
Latency Percentiles
P50 (Median)
#{metrics.latency_p50}ms
P95
#{metrics.latency_p95}ms
P99
#{metrics.latency_p99}ms
Max
#{metrics.latency_max}ms
Errors by Type
#{render_error_breakdown(metrics.errors_by_type)}
Recent Requests
Time
Operation
Duration
Status
#{render_recent_requests(recent)}
Last updated: #{DateTime.utc_now() |> DateTime.to_iso8601()}
"""
Kino.HTML.new(html)
end
defp render_error_breakdown(errors) when map_size(errors) == 0 do
"No errors recorded
"
end
defp render_error_breakdown(errors) do
errors
|> Enum.map(fn {type, count} ->
"
#{type}
#{count}
"
end)
|> Enum.join("")
end
defp render_recent_requests(requests) do
requests
|> Enum.take(10)
|> Enum.map(fn req ->
status_badge = case req.status do
:success -> "OK"
{:error, type} -> "#{type}"
end
time = Calendar.strftime(req.timestamp, "%H:%M:%S")
"
#{time}
#{req.operation}
#{req.duration_ms}ms
#{status_badge}
"
end)
|> Enum.join("")
end
end
# Initial render
Kino.Frame.render(dashboard_frame, Dashboard.render())
# Handle refresh button clicks
Kino.listen(refresh_button, fn _event ->
# Simulate new traffic
MetricsSimulator.simulate_traffic(10)
DashboardState.update_timestamp()
Kino.Frame.render(dashboard_frame, Dashboard.render())
end)
Kino.Layout.grid([refresh_button, dashboard_frame], columns: 1)
Performance Baselines
Establishing and tracking performance baselines:
defmodule PerformanceBaseline do
@moduledoc """
Tools for establishing and comparing performance baselines.
"""
defstruct [
:name,
:created_at,
:latency_p50,
:latency_p95,
:latency_p99,
:error_rate,
:throughput,
:sample_size
]
def capture(name) do
metrics = MetricsCollector.get_metrics()
%__MODULE__{
name: name,
created_at: DateTime.utc_now(),
latency_p50: metrics.latency_p50,
latency_p95: metrics.latency_p95,
latency_p99: metrics.latency_p99,
error_rate: metrics.error_rate,
throughput: metrics.throughput_per_second,
sample_size: metrics.total_requests
}
end
def compare(current, baseline) do
comparisons = [
compare_metric("Latency P50", current.latency_p50, baseline.latency_p50, :lower_better),
compare_metric("Latency P95", current.latency_p95, baseline.latency_p95, :lower_better),
compare_metric("Latency P99", current.latency_p99, baseline.latency_p99, :lower_better),
compare_metric("Error Rate", current.error_rate, baseline.error_rate, :lower_better),
compare_metric("Throughput", current.throughput, baseline.throughput, :higher_better)
]
overall = calculate_overall_status(comparisons)
%{
comparisons: comparisons,
overall_status: overall,
baseline_name: baseline.name,
baseline_date: baseline.created_at
}
end
defp compare_metric(name, current, baseline, direction) when baseline > 0 do
pct_change = (current - baseline) / baseline * 100
status = case {direction, pct_change} do
{:lower_better, pct} when pct < -10 -> :improved
{:lower_better, pct} when pct > 20 -> :degraded
{:higher_better, pct} when pct > 10 -> :improved
{:higher_better, pct} when pct < -20 -> :degraded
_ -> :stable
end
%{
name: name,
current: current,
baseline: baseline,
pct_change: pct_change,
status: status
}
end
defp compare_metric(name, current, baseline, _direction) do
%{
name: name,
current: current,
baseline: baseline,
pct_change: 0.0,
status: :stable
}
end
defp calculate_overall_status(comparisons) do
degraded_count = Enum.count(comparisons, fn c -> c.status == :degraded end)
improved_count = Enum.count(comparisons, fn c -> c.status == :improved end)
cond do
degraded_count >= 2 -> :degraded
degraded_count == 1 and improved_count == 0 -> :warning
improved_count >= 2 -> :improved
true -> :stable
end
end
def capacity_estimate(baseline, target_throughput) do
if baseline.throughput > 0 do
scale_factor = target_throughput / baseline.throughput
%{
current_throughput: baseline.throughput,
target_throughput: target_throughput,
scale_factor: scale_factor,
estimated_instances: ceil(scale_factor),
recommendations: generate_capacity_recommendations(scale_factor, baseline)
}
else
%{error: "Insufficient baseline data for capacity estimation"}
end
end
defp generate_capacity_recommendations(scale_factor, baseline) do
recommendations = []
recommendations = if scale_factor > 1.5 do
["Consider horizontal scaling with #{ceil(scale_factor)} instances" | recommendations]
else
recommendations
end
recommendations = if baseline.latency_p99 > 2000 do
["Optimize high p99 latency before scaling" | recommendations]
else
recommendations
end
recommendations = if baseline.error_rate > 1 do
["Address error rate (#{Float.round(baseline.error_rate, 1)}%) before scaling" | recommendations]
else
recommendations
end
if Enum.empty?(recommendations) do
["Current capacity appears sufficient for target load"]
else
recommendations
end
end
end
# Capture baseline
baseline = PerformanceBaseline.capture("Production Baseline v1")
# Simulate some performance changes
MetricsSimulator.simulate_traffic(50)
# Capture current state and compare
current = PerformanceBaseline.capture("Current")
comparison = PerformanceBaseline.compare(current, baseline)
# Estimate capacity for 10x throughput
capacity = PerformanceBaseline.capacity_estimate(baseline, baseline.throughput * 10)
baseline_md = """
## Performance Baseline Analysis
### Captured Baseline: #{baseline.name}
**Captured At**: #{DateTime.to_iso8601(baseline.created_at)}
**Sample Size**: #{baseline.sample_size} requests
| Metric | Baseline Value |
|--------|----------------|
| Latency P50 | #{baseline.latency_p50}ms |
| Latency P95 | #{baseline.latency_p95}ms |
| Latency P99 | #{baseline.latency_p99}ms |
| Error Rate | #{Float.round(baseline.error_rate, 2)}% |
| Throughput | #{Float.round(baseline.throughput, 3)} req/s |
### Comparison with Current Performance
**Overall Status**: #{comparison.overall_status}
| Metric | Baseline | Current | Change | Status |
|--------|----------|---------|--------|--------|
#{Enum.map_join(comparison.comparisons, "\n", fn c ->
change_str = if c.pct_change >= 0 do
"+#{Float.round(c.pct_change, 1)}%"
else
"#{Float.round(c.pct_change, 1)}%"
end
status_icon = case c.status do
:improved -> "[+]"
:degraded -> "[-]"
:stable -> "[=]"
end
"| #{c.name} | #{c.baseline} | #{c.current} | #{change_str} | #{status_icon} |"
end)}
### Capacity Planning
**Current Throughput**: #{Float.round(capacity.current_throughput, 3)} req/s
**Target Throughput**: #{Float.round(capacity.target_throughput, 3)} req/s (10x)
**Scale Factor**: #{Float.round(capacity.scale_factor, 2)}x
**Estimated Instances**: #{capacity.estimated_instances}
**Recommendations**:
#{Enum.map_join(capacity.recommendations, "\n", fn r -> "- #{r}" end)}
"""
Kino.Markdown.new(baseline_md)
Load Testing Integration
defmodule LoadTestSimulator do
@moduledoc """
Simulates load testing scenarios for capacity planning.
"""
def run_scenario(name, config) do
IO.puts("Starting load test: #{name}")
IO.puts("Duration: #{config.duration_seconds}s, RPS: #{config.target_rps}")
start_time = System.monotonic_time(:second)
results = run_load(config, start_time)
analyze_results(name, results)
end
defp run_load(config, start_time) do
interval_ms = round(1000 / config.target_rps)
end_time = start_time + config.duration_seconds
Stream.repeatedly(fn -> :ok end)
|> Stream.take_while(fn _ ->
System.monotonic_time(:second) < end_time
end)
|> Enum.reduce([], fn _, acc ->
# Simulate request
latency = simulate_request(config)
status = if :rand.uniform() > config.error_rate, do: :success, else: :error
MetricsCollector.record_request("LoadTest", latency, status)
Process.sleep(interval_ms)
[{latency, status} | acc]
end)
end
defp simulate_request(config) do
base = config.base_latency_ms
variance = config.latency_variance_ms
# Add some realistic variance
jitter = :rand.uniform(variance * 2) - variance
# Occasionally add slow requests (tail latency)
if :rand.uniform() > 0.95 do
base + jitter + :rand.uniform(base * 3)
else
max(10, base + jitter)
end
end
defp analyze_results(name, results) do
latencies = Enum.map(results, fn {l, _} -> l end)
errors = Enum.count(results, fn {_, s} -> s == :error end)
%{
test_name: name,
total_requests: length(results),
errors: errors,
error_rate: errors / length(results) * 100,
latency_avg: Enum.sum(latencies) / length(latencies),
latency_max: Enum.max(latencies),
latency_min: Enum.min(latencies)
}
end
end
# Run a quick load test
load_config = %{
duration_seconds: 3,
target_rps: 20,
base_latency_ms: 150,
latency_variance_ms: 50,
error_rate: 0.05
}
load_result = LoadTestSimulator.run_scenario("Quick Load Test", load_config)
load_md = """
## Load Test Results: #{load_result.test_name}
| Metric | Value |
|--------|-------|
| Total Requests | #{load_result.total_requests} |
| Errors | #{load_result.errors} |
| Error Rate | #{Float.round(load_result.error_rate, 2)}% |
| Avg Latency | #{Float.round(load_result.latency_avg, 1)}ms |
| Min Latency | #{load_result.latency_min}ms |
| Max Latency | #{load_result.latency_max}ms |
### Load Testing Best Practices
1. **Warm-up Period**: Run warm-up requests before measuring
2. **Realistic Patterns**: Model actual traffic patterns (not just constant load)
3. **Gradual Ramp-up**: Increase load gradually to find breaking points
4. **Monitor Dependencies**: Watch downstream services during tests
5. **Test in Isolation**: Use dedicated test environments when possible
"""
Kino.Markdown.new(load_md)
Summary and Best Practices
summary_md = """
## Production Monitoring Summary
### Key Metrics to Track
| Metric | Why It Matters | Alert Threshold |
|--------|----------------|-----------------|
| Error Rate | Service reliability | > 5% warning, > 20% critical |
| Latency P95 | User experience | > 2s warning, > 5s critical |
| Latency P99 | Tail latency issues | > 5s warning, > 10s critical |
| Throughput | Capacity utilization | < 10% of baseline |
| SOAP Faults | Application errors | > 1% of requests |
### Monitoring Checklist
- [ ] Telemetry handlers attached for all Lather events
- [ ] Structured logging with correlation IDs
- [ ] Health check endpoints for liveness and readiness
- [ ] Alerting rules configured with appropriate thresholds
- [ ] Performance baselines established
- [ ] Dashboard available for real-time monitoring
- [ ] Sensitive data redaction in logs
- [ ] Error categorization by type
### Implementation Tips
1. **Start Simple**: Begin with basic metrics and add complexity as needed
2. **Use Correlation IDs**: Trace requests across all system components
3. **Redact Sensitive Data**: Never log passwords, tokens, or PII
4. **Set Meaningful Thresholds**: Base alerts on actual service behavior
5. **Monitor Trends**: Look for gradual degradation, not just spikes
6. **Test Your Monitoring**: Verify alerts fire when expected
7. **Document Runbooks**: Prepare response procedures for common alerts
### Integration with Observability Platforms
Lather's telemetry events can be exported to:
- **Prometheus**: Use `telemetry_metrics_prometheus`
- **StatsD**: Use `telemetry_metrics_statsd`
- **DataDog**: Use `telemetry_metrics_datadog`
- **Grafana Cloud**: Export via any of the above
Example Prometheus setup:
defmodule MyApp.Telemetry do use Supervisor import Telemetry.Metrics
def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
def init(_arg) do
children = [
{TelemetryMetricsPrometheus, metrics: metrics()}
]
Supervisor.init(children, strategy: :one_for_one)
end
defp metrics do
[
counter("lather.request.count", tags: [:operation, :status]),
distribution("lather.request.duration", unit: {:native, :millisecond}),
counter("lather.request.error.count", tags: [:error_type])
]
end end
Happy monitoring!
"""
Kino.Markdown.new(summary_md)
Next Steps
You now have a comprehensive understanding of production monitoring for SOAP services with Lather. Consider exploring:
-
Enterprise Integration: See
enterprise_integration.livemdfor advanced authentication -
Debugging: See
debugging_troubleshooting.livemdfor troubleshooting techniques -
Getting Started: See
getting_started.livemdfor basic usage patterns -
Advanced Types: See
advanced_types.livemdfor complex data handling