Enterprise SOAP Integration with Lather
Mix.install([
{:lather, "~> 1.0"},
{:finch, "~> 0.18"},
{:kino, "~> 0.12"},
{:jason, "~> 1.4"}
])
Introduction
This Livebook demonstrates how to integrate with enterprise SOAP services using the Lather library. We’ll cover:
- Authentication strategies (Basic Auth, WS-Security)
- SSL/TLS configuration for secure connections
- Complex parameter structures and data mapping
- Error handling and retry strategies
- Performance optimization techniques
Environment Setup
# Start applications
{:ok, _} = Application.ensure_all_started(:lather)
# Start Finch if not already running (safe to re-run this cell)
if Process.whereis(Lather.Finch) == nil do
children = [
{Finch,
name: Lather.Finch,
pools: %{
# High-traffic enterprise endpoints
"https://enterprise.example.com" => [
size: 25,
count: 4,
protocols: [:http2, :http1]
],
# Default pool for other endpoints
:default => [size: 10, count: 2]
}
}
]
{:ok, _} = Supervisor.start_link(children, strategy: :one_for_one)
end
IO.puts("🏢 Enterprise Lather environment ready!")
Configuration Panel
Let’s create an interactive panel to configure our enterprise SOAP connection:
# Enterprise service configuration inputs
# Fill in these fields, then run the configuration cell below
wsdl_input = Kino.Input.text("WSDL URL", default: "https://enterprise.example.com/services/UserService?wsdl")
username_input = Kino.Input.text("Username", default: "demo_user")
password_input = Kino.Input.password("Password")
timeout_input = Kino.Input.number("Timeout (ms)", default: 60000)
ssl_verify_input = Kino.Input.checkbox("Verify SSL", default: true)
# Layout the configuration
Kino.Layout.grid([
wsdl_input,
username_input, password_input,
timeout_input, ssl_verify_input
], columns: 2)
# Build configuration from inputs
enterprise_config = %{
wsdl_url: Kino.Input.read(wsdl_input),
username: Kino.Input.read(username_input),
password: Kino.Input.read(password_input),
timeout: Kino.Input.read(timeout_input),
ssl_verify: Kino.Input.read(ssl_verify_input)
}
IO.puts("🔧 Enterprise Configuration:")
IO.inspect(Map.drop(enterprise_config, [:password]), pretty: true)
IO.puts("Password: #{"*" |> String.duplicate(String.length(enterprise_config.password))}")
SSL/TLS Configuration
For enterprise services, proper SSL configuration is crucial:
defmodule EnterpriseSSL do
def ssl_options(verify \\ true) do
base_options = [
versions: [:"tlsv1.2", :"tlsv1.3"],
ciphers: :ssl.cipher_suites(:default, :"tlsv1.2")
]
if verify do
[
verify: :verify_peer,
cacerts: :public_key.cacerts_get(),
customize_hostname_check: [
match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
]
] ++ base_options
else
[
verify: :verify_none
] ++ base_options
end
end
def client_options(config) do
[
basic_auth: {config.username, config.password},
timeout: config.timeout,
pool_timeout: 10_000,
ssl_options: ssl_options(config.ssl_verify),
headers: [
{"User-Agent", "LatherDemo/1.0"},
{"Accept", "text/xml"},
{"X-API-Version", "2.0"}
]
]
end
end
# Display the SSL configuration
ssl_config = EnterpriseSSL.ssl_options(enterprise_config.ssl_verify)
IO.puts("🔒 SSL Configuration:")
IO.inspect(ssl_config, pretty: true)
Enterprise Client Connection
Now let’s attempt to connect to an enterprise SOAP service:
# Create enterprise client with full configuration
enterprise_client_result =
case Lather.DynamicClient.new(
enterprise_config.wsdl_url,
EnterpriseSSL.client_options(enterprise_config)
) do
{:ok, client} ->
IO.puts("✅ Successfully connected to enterprise service!")
# Store for use in other cells
Process.put(:enterprise_client, client)
# Show service capabilities
operations = Lather.DynamicClient.list_operations(client)
IO.puts("\n📋 Available Operations (#{length(operations)}):")
Enum.each(operations, fn op -> IO.puts(" • #{op}") end)
{:ok, client}
{:error, error} ->
IO.puts("❌ Failed to connect to enterprise service:")
IO.puts(" Type: #{error.type}")
IO.puts(" Details: #{Lather.Error.format_error(error)}")
# Check if the error is recoverable
if Lather.Error.recoverable?(error) do
IO.puts("\n🔄 This error might be recoverable - you could retry the connection")
end
{:error, error}
end
enterprise_client_result
Mock Enterprise Service Demo
Since we may not have access to a real enterprise service, let’s create a mock demonstration of enterprise SOAP operations:
defmodule MockEnterpriseDemo do
def simulate_user_search do
# Simulate complex enterprise search parameters
%{
"searchRequest" => %{
"criteria" => %{
"filters" => [
%{
"field" => "department",
"operator" => "equals",
"value" => "Engineering"
},
%{
"field" => "active",
"operator" => "equals",
"value" => true
},
%{
"field" => "hireDate",
"operator" => "greaterThan",
"value" => "2023-01-01"
}
],
"sorting" => [
%{
"field" => "lastName",
"direction" => "asc"
},
%{
"field" => "firstName",
"direction" => "asc"
}
],
"pagination" => %{
"pageSize" => 25,
"pageNumber" => 1
}
},
"includeFields" => [
"personalInfo",
"workInfo",
"permissions",
"projects"
],
"options" => %{
"includeMetadata" => true,
"auditTrail" => true,
"securityContext" => %{
"requesterId" => "api_user_123",
"reason" => "System Integration",
"ipAddress" => "10.0.1.100"
}
}
}
}
end
def simulate_user_creation do
%{
"createUserRequest" => %{
"user" => %{
"personalInfo" => %{
"firstName" => "John",
"lastName" => "Doe",
"email" => "john.doe@company.com",
"phone" => "+1-555-0123",
"title" => "Mr.",
"dateOfBirth" => "1985-03-15"
},
"workInfo" => %{
"employeeId" => "EMP2024001",
"department" => "Engineering",
"title" => "Senior Software Engineer",
"manager" => "jane.smith@company.com",
"startDate" => "2024-02-01",
"location" => %{
"office" => "New York HQ",
"floor" => "15",
"desk" => "15-A-042"
},
"costCenter" => "CC-ENG-001"
},
"permissions" => [
"access_development_tools",
"read_project_data",
"write_code_repositories",
"access_staging_environment"
],
"roles" => [
"Developer",
"CodeReviewer"
],
"metadata" => %{
"source" => "HR_SYSTEM",
"requestId" => generate_request_id(),
"createdBy" => "hr_admin",
"businessJustification" => "New hire onboarding"
}
},
"options" => %{
"sendWelcomeEmail" => true,
"provisionAccounts" => true,
"assignDefaultPermissions" => true,
"notifyManager" => true
}
}
}
end
def simulate_batch_operation do
%{
"batchRequest" => %{
"operations" => [
%{
"type" => "update",
"userId" => "EMP001",
"changes" => %{
"workInfo" => %{
"title" => "Principal Software Engineer",
"department" => "Platform Engineering"
}
}
},
%{
"type" => "update",
"userId" => "EMP002",
"changes" => %{
"permissions" => [
"access_development_tools",
"read_project_data",
"write_code_repositories",
"access_production_logs"
]
}
},
%{
"type" => "create",
"user" => %{
"personalInfo" => %{
"firstName" => "Alice",
"lastName" => "Johnson",
"email" => "alice.johnson@company.com"
},
"workInfo" => %{
"department" => "Marketing",
"title" => "Marketing Specialist"
}
}
}
],
"options" => %{
"continueOnError" => true,
"transactional" => false,
"auditLevel" => "detailed"
}
}
}
end
defp generate_request_id do
:crypto.strong_rand_bytes(16)
|> Base.encode16(case: :lower)
end
end
# Display example enterprise parameters
IO.puts("👥 Example: User Search Parameters")
IO.inspect(MockEnterpriseDemo.simulate_user_search(), pretty: true, limit: :infinity)
Parameter Validation Demo
Let’s demonstrate how Lather validates complex parameters:
# Create sample operation info for validation demo
sample_operation_info = %{
name: "SearchUsers",
input_parts: [
%{
name: "searchRequest",
type: "SearchRequest",
required: true
}
],
output_parts: [
%{
name: "searchResponse",
type: "SearchResponse"
}
]
}
# Test parameter validation
search_params = MockEnterpriseDemo.simulate_user_search()
IO.puts("🔍 Parameter Validation Demo")
IO.puts("=" |> String.duplicate(40))
# Simulate validation (in real usage, this happens automatically)
case Lather.Operation.Builder.validate_parameters(sample_operation_info, search_params) do
:ok ->
IO.puts("✅ Parameters passed validation")
# Show parameter structure analysis
IO.puts("\n📊 Parameter Analysis:")
IO.puts(" • Root elements: #{map_size(search_params)}")
search_request = search_params["searchRequest"]
if search_request do
criteria = search_request["criteria"]
if criteria do
filters = criteria["filters"] || []
IO.puts(" • Search filters: #{length(filters)}")
IO.puts(" • Sorting rules: #{length(criteria["sorting"] || [])}")
IO.puts(" • Include fields: #{length(search_request["includeFields"] || [])}")
end
end
{:error, error} ->
IO.puts("❌ Parameter validation failed:")
IO.puts(" #{Lather.Error.format_error(error)}")
end
Error Handling Strategies
Enterprise applications need robust error handling. Let’s explore different error scenarios:
defmodule EnterpriseErrorHandling do
def handle_soap_error(error) do
case error do
%{type: :soap_fault, fault_code: "Client", fault_string: message} ->
{
:client_error,
"Client Error: #{message}",
"Check your request parameters and try again"
}
%{type: :soap_fault, fault_code: "Server", fault_string: message} ->
{
:server_error,
"Server Error: #{message}",
"The service encountered an error. Retry may help"
}
%{type: :http_error, status: 401} ->
{
:authentication_error,
"Authentication failed",
"Check your credentials and authentication method"
}
%{type: :http_error, status: 403} ->
{
:authorization_error,
"Access denied",
"Your account may not have permission for this operation"
}
%{type: :http_error, status: 500} ->
{
:server_error,
"Internal server error",
"The service is experiencing issues. Try again later"
}
%{type: :transport_error, reason: :timeout} ->
{
:timeout_error,
"Request timed out",
"The service didn't respond in time. Consider increasing timeout or retry"
}
%{type: :transport_error, reason: :nxdomain} ->
{
:connection_error,
"Service not found",
"Check the service URL and network connectivity"
}
_ ->
{
:unknown_error,
"Unknown error occurred",
"Check logs for more details"
}
end
end
def retry_strategy(error, attempt) do
max_retries = 3
base_delay = 1000 # 1 second
cond do
attempt >= max_retries ->
{:stop, "Maximum retries exceeded"}
Lather.Error.recoverable?(error) ->
delay = base_delay * :math.pow(2, attempt) # Exponential backoff
{:retry, round(delay)}
true ->
{:stop, "Error is not recoverable"}
end
end
def execute_with_retry(client, operation, params, attempt \\ 0) do
case Lather.DynamicClient.call(client, operation, params) do
{:ok, response} ->
if attempt > 0 do
IO.puts("✅ Operation succeeded after #{attempt} retries")
end
{:ok, response}
{:error, error} ->
{error_type, message, suggestion} = handle_soap_error(error)
IO.puts("❌ #{error_type}: #{message}")
IO.puts("💡 Suggestion: #{suggestion}")
case retry_strategy(error, attempt) do
{:retry, delay} ->
IO.puts("🔄 Retrying in #{delay}ms (attempt #{attempt + 1})...")
Process.sleep(delay)
execute_with_retry(client, operation, params, attempt + 1)
{:stop, reason} ->
IO.puts("🛑 Stopping retries: #{reason}")
{:error, error}
end
end
end
end
# Demonstrate error categorization
sample_errors = [
%{type: :soap_fault, fault_code: "Client", fault_string: "Invalid parameter"},
%{type: :http_error, status: 401},
%{type: :transport_error, reason: :timeout},
%{type: :transport_error, reason: :nxdomain}
]
IO.puts("🚨 Error Handling Examples:")
Enum.each(sample_errors, fn error ->
{error_type, message, suggestion} = EnterpriseErrorHandling.handle_soap_error(error)
IO.puts("\n#{error_type}:")
IO.puts(" Message: #{message}")
IO.puts(" Suggestion: #{suggestion}")
end)
WS-Security Authentication Demo
For services requiring WS-Security, here’s how to set it up:
defmodule WSSecurityDemo do
@doc """
Creates a WS-Security header with username token.
The username_token/3 function returns the complete security header structure directly.
"""
def create_security_header(username, password) do
# username_token/3 returns the complete security header with wsse:Security wrapper
Lather.Auth.WSSecurity.username_token(
username,
password,
password_type: :digest,
include_nonce: true,
include_created: true
)
end
def wssecurity_client_options(username, password, base_options) do
# The username_token function returns the complete security header
security_header = create_security_header(username, password)
base_options ++ [
soap_headers: [security_header]
]
end
end
# Example WS-Security configuration
wssecurity_config = WSSecurityDemo.wssecurity_client_options(
enterprise_config.username,
enterprise_config.password,
EnterpriseSSL.client_options(enterprise_config)
)
IO.puts("🔐 WS-Security Configuration Example:")
IO.puts(" ✓ Username token with timestamp")
IO.puts(" ✓ Nonce for replay protection")
IO.puts(" ✓ Password digest (when supported)")
IO.puts(" ✓ Security header in SOAP envelope")
# Show the security header structure (without sensitive data)
sample_header = WSSecurityDemo.create_security_header("demo_user", "hidden")
IO.puts("\n📋 Security Header Structure:")
IO.inspect(sample_header, pretty: true)
Performance Monitoring
Let’s create a performance monitoring dashboard for SOAP operations:
defmodule PerformanceMonitor do
def track_operation(operation_name, fun) do
start_time = System.monotonic_time(:millisecond)
result = fun.()
end_time = System.monotonic_time(:millisecond)
duration = end_time - start_time
status = case result do
{:ok, _} -> :success
{:error, _} -> :error
end
%{
operation: operation_name,
duration_ms: duration,
status: status,
timestamp: DateTime.utc_now()
}
end
def simulate_performance_data do
# Simulate various operation performance metrics
operations = ["SearchUsers", "CreateUser", "UpdateUser", "DeleteUser", "GetUserDetails"]
Enum.map(1..20, fn _ ->
operation = Enum.random(operations)
duration = Enum.random(100..5000) # 100ms to 5s
status = if :rand.uniform() > 0.1, do: :success, else: :error
%{
operation: operation,
duration_ms: duration,
status: status,
timestamp: DateTime.utc_now()
}
end)
end
def analyze_performance(metrics) do
total_operations = length(metrics)
successful_ops = Enum.count(metrics, fn m -> m.status == :success end)
error_rate = (total_operations - successful_ops) / total_operations * 100
successful_metrics = Enum.filter(metrics, fn m -> m.status == :success end)
durations = Enum.map(successful_metrics, fn m -> m.duration_ms end)
avg_duration = if length(durations) > 0 do
Enum.sum(durations) / length(durations)
else
0
end
max_duration = if length(durations) > 0, do: Enum.max(durations), else: 0
min_duration = if length(durations) > 0, do: Enum.min(durations), else: 0
%{
total_operations: total_operations,
success_rate: 100 - error_rate,
error_rate: error_rate,
avg_duration_ms: round(avg_duration),
max_duration_ms: max_duration,
min_duration_ms: min_duration
}
end
end
# Generate and analyze performance data
performance_data = PerformanceMonitor.simulate_performance_data()
analysis = PerformanceMonitor.analyze_performance(performance_data)
IO.puts("📊 SOAP Operations Performance Analysis")
IO.puts("=" |> String.duplicate(45))
IO.puts("Total Operations: #{analysis.total_operations}")
IO.puts("Success Rate: #{Float.round(analysis.success_rate, 1)}%")
IO.puts("Error Rate: #{Float.round(analysis.error_rate, 1)}%")
IO.puts("Average Duration: #{analysis.avg_duration_ms}ms")
IO.puts("Fastest Operation: #{analysis.min_duration_ms}ms")
IO.puts("Slowest Operation: #{analysis.max_duration_ms}ms")
# Show recent operations
IO.puts("\n📈 Recent Operations:")
performance_data
|> Enum.take(5)
|> Enum.each(fn metric ->
status_icon = if metric.status == :success, do: "✅", else: "❌"
IO.puts(" #{status_icon} #{metric.operation}: #{metric.duration_ms}ms")
end)
Enterprise Integration Checklist
Let’s create an interactive checklist for enterprise SOAP integration:
# Enterprise integration checklist
checklist_items = [
"📋 WSDL Analysis Complete",
"🔐 Authentication Configured",
"🔒 SSL/TLS Security Verified",
"⚡ Connection Pooling Optimized",
"🛡️ Error Handling Implemented",
"🔄 Retry Strategy Defined",
"📊 Performance Monitoring Setup",
"🧪 Integration Testing Complete",
"📚 Documentation Updated",
"🚀 Production Deployment Ready"
]
# Create checkboxes for each item
checklist_widgets = Enum.map(checklist_items, fn item ->
Kino.Input.checkbox(item, default: false)
end)
# Layout the checklist
checklist_form = Kino.Layout.grid(
Enum.map(checklist_widgets, fn widget -> [widget] end),
columns: 1
)
IO.puts("✅ Enterprise SOAP Integration Checklist")
checklist_form
# Check completion status
completed_items = Enum.count(checklist_widgets, fn widget ->
Kino.Input.read(widget)
end)
total_items = length(checklist_items)
completion_percentage = completed_items / total_items * 100
IO.puts("📊 Integration Progress: #{completed_items}/#{total_items} (#{Float.round(completion_percentage, 1)}%)")
if completion_percentage == 100.0 do
IO.puts("🎉 Congratulations! Your enterprise SOAP integration is ready for production!")
elsif completion_percentage >= 80.0 do
IO.puts("🚀 Almost there! Just a few more items to complete.")
elsif completion_percentage >= 50.0 do
IO.puts("⚡ Good progress! You're halfway to production readiness.")
else
IO.puts("🏗️ Keep going! There's still work to be done for a production-ready integration.")
end
Advanced Resilience Patterns
Enterprise integrations require robust resilience patterns to handle failures gracefully. This section covers circuit breakers, advanced retry strategies, bulkheads, and fallback mechanisms.
Circuit Breaker Pattern
The circuit breaker pattern prevents cascading failures by monitoring for failures and temporarily stopping requests to a failing service.
States:
- Closed: Normal operation, requests flow through
- Open: Service is failing, requests are rejected immediately
- Half-Open: Testing if service has recovered
defmodule CircuitBreaker do
use GenServer
defstruct [
:name,
:state,
:failure_count,
:success_count,
:failure_threshold,
:success_threshold,
:reset_timeout,
:last_failure_time
]
# Client API
def start_link(opts) do
name = Keyword.fetch!(opts, :name)
GenServer.start_link(__MODULE__, opts, name: name)
end
def call(breaker, fun) do
GenServer.call(breaker, {:call, fun}, :infinity)
end
def state(breaker), do: GenServer.call(breaker, :state)
def reset(breaker), do: GenServer.cast(breaker, :reset)
# Server Implementation
@impl true
def init(opts) do
state = %__MODULE__{
name: Keyword.fetch!(opts, :name),
state: :closed,
failure_count: 0,
success_count: 0,
failure_threshold: Keyword.get(opts, :failure_threshold, 5),
success_threshold: Keyword.get(opts, :success_threshold, 2),
reset_timeout: Keyword.get(opts, :reset_timeout, 30_000),
last_failure_time: nil
}
{:ok, state}
end
@impl true
def handle_call(:state, _from, state) do
{:reply, %{state: state.state, failures: state.failure_count}, state}
end
@impl true
def handle_call({:call, fun}, _from, %{state: :open} = state) do
if should_attempt_reset?(state) do
attempt_call(fun, %{state | state: :half_open})
else
{:reply, {:error, :circuit_open}, state}
end
end
@impl true
def handle_call({:call, fun}, _from, state) do
attempt_call(fun, state)
end
@impl true
def handle_cast(:reset, state) do
{:noreply, %{state | state: :closed, failure_count: 0, success_count: 0}}
end
defp attempt_call(fun, state) do
case fun.() do
{:ok, result} -> handle_success(result, state)
{:error, _} = error -> handle_failure(error, state)
end
rescue
e -> handle_failure({:error, e}, state)
end
defp handle_success(result, %{state: :half_open} = state) do
new_success_count = state.success_count + 1
if new_success_count >= state.success_threshold do
{:reply, {:ok, result}, %{state | state: :closed, failure_count: 0, success_count: 0}}
else
{:reply, {:ok, result}, %{state | success_count: new_success_count}}
end
end
defp handle_success(result, state) do
{:reply, {:ok, result}, %{state | failure_count: 0}}
end
defp handle_failure(error, state) do
new_failure_count = state.failure_count + 1
new_state = if new_failure_count >= state.failure_threshold do
%{state | state: :open, failure_count: new_failure_count, last_failure_time: System.monotonic_time(:millisecond)}
else
%{state | failure_count: new_failure_count}
end
{:reply, error, new_state}
end
defp should_attempt_reset?(state) do
elapsed = System.monotonic_time(:millisecond) - state.last_failure_time
elapsed >= state.reset_timeout
end
end
# Demo: Start a circuit breaker
{:ok, breaker} = CircuitBreaker.start_link(
name: :soap_breaker,
failure_threshold: 3,
success_threshold: 2,
reset_timeout: 5_000
)
IO.puts("Circuit Breaker initialized with:")
IO.puts(" Failure threshold: 3")
IO.puts(" Success threshold: 2")
IO.puts(" Reset timeout: 5 seconds")
IO.inspect(CircuitBreaker.state(breaker), label: "Initial state")
# Demonstrate circuit breaker behavior
defmodule CircuitBreakerDemo do
def simulate_failures(breaker, count) do
Enum.each(1..count, fn i ->
result = CircuitBreaker.call(breaker, fn -> {:error, :service_unavailable} end)
state = CircuitBreaker.state(breaker)
IO.puts("Request #{i}: #{inspect(result)} | State: #{state.state}, Failures: #{state.failures}")
end)
end
def simulate_success(breaker) do
result = CircuitBreaker.call(breaker, fn -> {:ok, "Success!"} end)
state = CircuitBreaker.state(breaker)
IO.puts("Success request: #{inspect(result)} | State: #{state.state}")
result
end
end
IO.puts("\n--- Simulating failures to trip the circuit ---")
CircuitBreakerDemo.simulate_failures(:soap_breaker, 4)
IO.puts("\n--- Circuit is now open, requests are rejected ---")
CircuitBreakerDemo.simulate_failures(:soap_breaker, 2)
# Reset for next demo
CircuitBreaker.reset(:soap_breaker)
IO.puts("\nCircuit breaker reset for next demonstrations")
Advanced Retry Strategies
Simple retries can overwhelm a recovering service. Advanced strategies include exponential backoff, jitter, and retry budgets.
defmodule RetryStrategy do
@moduledoc """
Advanced retry strategies with exponential backoff, jitter, and budgets.
"""
defstruct [
:max_retries,
:base_delay_ms,
:max_delay_ms,
:jitter_factor,
:retry_budget,
:budget_window_ms,
:retryable_errors
]
def new(opts \\ []) do
%__MODULE__{
max_retries: Keyword.get(opts, :max_retries, 3),
base_delay_ms: Keyword.get(opts, :base_delay_ms, 1000),
max_delay_ms: Keyword.get(opts, :max_delay_ms, 30_000),
jitter_factor: Keyword.get(opts, :jitter_factor, 0.2),
retry_budget: Keyword.get(opts, :retry_budget, 0.1), # 10% of requests can be retries
budget_window_ms: Keyword.get(opts, :budget_window_ms, 60_000),
retryable_errors: Keyword.get(opts, :retryable_errors, [:timeout, :server_error, :connection_error])
}
end
@doc "Calculate delay with exponential backoff and jitter"
def calculate_delay(strategy, attempt) do
# Exponential backoff: base * 2^attempt
exponential_delay = strategy.base_delay_ms * :math.pow(2, attempt)
# Cap at max delay
capped_delay = min(exponential_delay, strategy.max_delay_ms)
# Add jitter to prevent thundering herd
jitter_range = capped_delay * strategy.jitter_factor
jitter = :rand.uniform() * jitter_range * 2 - jitter_range
round(max(0, capped_delay + jitter))
end
@doc "Determine if an error should be retried"
def should_retry?(strategy, error, attempt) do
cond do
attempt >= strategy.max_retries ->
{:no, :max_retries_exceeded}
not retryable_error?(strategy, error) ->
{:no, :not_retryable}
true ->
{:yes, calculate_delay(strategy, attempt)}
end
end
defp retryable_error?(strategy, error) do
error_type = categorize_error(error)
error_type in strategy.retryable_errors
end
defp categorize_error(error) do
case error do
%{type: :transport_error, reason: :timeout} -> :timeout
%{type: :transport_error} -> :connection_error
%{type: :http_error, status: status} when status >= 500 -> :server_error
%{type: :http_error, status: 429} -> :rate_limited
%{type: :http_error, status: status} when status >= 400 -> :client_error
%{type: :soap_fault, fault_code: "Server"} -> :server_error
%{type: :soap_fault, fault_code: "Client"} -> :client_error
_ -> :unknown
end
end
end
# Demonstrate delay calculations
strategy = RetryStrategy.new(
max_retries: 5,
base_delay_ms: 500,
max_delay_ms: 16_000,
jitter_factor: 0.25
)
IO.puts("Exponential Backoff with Jitter Demo:")
IO.puts("=" |> String.duplicate(45))
Enum.each(0..5, fn attempt ->
delays = Enum.map(1..3, fn _ -> RetryStrategy.calculate_delay(strategy, attempt) end)
avg_delay = Enum.sum(delays) / length(delays)
IO.puts("Attempt #{attempt}: ~#{round(avg_delay)}ms (samples: #{inspect(delays)})")
end)
defmodule RetryExecutor do
@moduledoc "Execute operations with configurable retry logic"
def execute(fun, strategy, opts \\ []) do
idempotent = Keyword.get(opts, :idempotent, true)
on_retry = Keyword.get(opts, :on_retry, fn _, _, _ -> :ok end)
do_execute(fun, strategy, 0, idempotent, on_retry)
end
defp do_execute(fun, strategy, attempt, idempotent, on_retry) do
case fun.() do
{:ok, result} ->
{:ok, result, %{attempts: attempt + 1}}
{:error, error} = err ->
case RetryStrategy.should_retry?(strategy, error, attempt) do
{:yes, delay} when idempotent ->
on_retry.(attempt + 1, error, delay)
Process.sleep(delay)
do_execute(fun, strategy, attempt + 1, idempotent, on_retry)
{:yes, _delay} ->
# Non-idempotent operations should not be retried blindly
{:error, error, %{attempts: attempt + 1, reason: :not_idempotent}}
{:no, reason} ->
{:error, error, %{attempts: attempt + 1, reason: reason}}
end
end
end
end
# Demonstrate retry execution
IO.puts("\n--- Retry Execution Demo ---")
# Simulate a flaky service that fails twice then succeeds
call_count = :counters.new(1, [:atomics])
flaky_operation = fn ->
count = :counters.get(call_count, 1)
:counters.add(call_count, 1, 1)
if count < 2 do
{:error, %{type: :transport_error, reason: :timeout}}
else
{:ok, "Operation succeeded!"}
end
end
strategy = RetryStrategy.new(max_retries: 3, base_delay_ms: 100)
result = RetryExecutor.execute(
flaky_operation,
strategy,
on_retry: fn attempt, error, delay ->
IO.puts("Retry #{attempt}: #{inspect(error.type)}, waiting #{delay}ms")
end
)
IO.puts("Final result: #{inspect(result)}")
Bulkhead Pattern
The bulkhead pattern isolates failures by partitioning resources. Each service gets its own connection pool, preventing one failing service from consuming all resources.
defmodule BulkheadConfig do
@moduledoc """
Configure isolated resource pools for different services.
"""
def pool_config do
%{
# Critical services get more resources
critical: %{
"https://payments.enterprise.com" => [size: 20, count: 4],
"https://inventory.enterprise.com" => [size: 15, count: 3]
},
# Standard services
standard: %{
"https://notifications.enterprise.com" => [size: 5, count: 2],
"https://analytics.enterprise.com" => [size: 5, count: 2]
},
# Low priority services - isolated to prevent resource starvation
low_priority: %{
"https://legacy-reports.enterprise.com" => [size: 2, count: 1]
}
}
end
def build_finch_pools do
pool_config()
|> Enum.flat_map(fn {_priority, services} ->
Enum.map(services, fn {url, config} -> {url, config} end)
end)
|> Map.new()
|> Map.put(:default, [size: 3, count: 1])
end
def service_limits do
%{
# Max concurrent requests per service
"payments" => %{max_concurrent: 50, queue_size: 100},
"inventory" => %{max_concurrent: 30, queue_size: 50},
"notifications" => %{max_concurrent: 10, queue_size: 20},
"legacy-reports" => %{max_concurrent: 5, queue_size: 10}
}
end
end
# Display bulkhead configuration
IO.puts("Bulkhead Configuration:")
IO.puts("=" |> String.duplicate(50))
BulkheadConfig.pool_config()
|> Enum.each(fn {priority, services} ->
IO.puts("\n#{String.upcase(to_string(priority))} SERVICES:")
Enum.each(services, fn {url, config} ->
IO.puts(" #{url}")
IO.puts(" Pool size: #{config[:size]}, Count: #{config[:count]}")
end)
end)
IO.puts("\nService Limits:")
BulkheadConfig.service_limits()
|> Enum.each(fn {service, limits} ->
IO.puts(" #{service}: max #{limits.max_concurrent} concurrent, queue #{limits.queue_size}")
end)
Fallback Strategies
When primary operations fail, fallback strategies provide graceful degradation.
defmodule FallbackStrategy do
@moduledoc """
Implements various fallback strategies for resilient operations.
"""
# Simple in-memory cache for demo purposes
defstruct [:cache, :default_values, :alternative_endpoints]
def new(opts \\ []) do
%__MODULE__{
cache: %{},
default_values: Keyword.get(opts, :default_values, %{}),
alternative_endpoints: Keyword.get(opts, :alternative_endpoints, [])
}
end
@doc "Execute with fallback chain: primary -> cache -> alternative -> default"
def execute_with_fallback(primary_fn, fallback_config, cache_key) do
case primary_fn.() do
{:ok, result} ->
# Cache successful result
{:ok, result, :primary}
{:error, _error} ->
try_fallbacks(fallback_config, cache_key)
end
end
defp try_fallbacks(config, cache_key) do
# Try cached response
case Map.get(config.cache, cache_key) do
nil -> try_alternatives_or_default(config, cache_key)
cached -> {:ok, cached, :cache}
end
end
defp try_alternatives_or_default(config, cache_key) do
# Try alternative endpoints
result = Enum.find_value(config.alternative_endpoints, fn endpoint_fn ->
case endpoint_fn.() do
{:ok, result} -> {:ok, result, :alternative}
{:error, _} -> nil
end
end)
case result do
nil ->
# Fall back to default value
case Map.get(config.default_values, cache_key) do
nil -> {:error, :all_fallbacks_exhausted}
default -> {:ok, default, :default}
end
result -> result
end
end
end
# Demonstrate fallback strategies
IO.puts("Fallback Strategy Demo:")
IO.puts("=" |> String.duplicate(45))
# Configure fallbacks
fallback_config = %FallbackStrategy{
cache: %{
"user_preferences" => %{theme: "light", language: "en"}
},
default_values: %{
"user_preferences" => %{theme: "system", language: "en"},
"feature_flags" => %{new_ui: false, beta_features: false}
},
alternative_endpoints: []
}
# Scenario 1: Primary succeeds
IO.puts("\n1. Primary service succeeds:")
result = FallbackStrategy.execute_with_fallback(
fn -> {:ok, %{theme: "dark", language: "es"}} end,
fallback_config,
"user_preferences"
)
IO.puts(" Result: #{inspect(result)}")
# Scenario 2: Primary fails, cache hit
IO.puts("\n2. Primary fails, using cache:")
result = FallbackStrategy.execute_with_fallback(
fn -> {:error, :timeout} end,
fallback_config,
"user_preferences"
)
IO.puts(" Result: #{inspect(result)}")
# Scenario 3: Primary fails, no cache, use default
IO.puts("\n3. Primary fails, no cache, using default:")
result = FallbackStrategy.execute_with_fallback(
fn -> {:error, :timeout} end,
fallback_config,
"feature_flags"
)
IO.puts(" Result: #{inspect(result)}")
Combining Patterns
Real-world applications combine these patterns for comprehensive resilience.
defmodule ResilientClient do
@moduledoc """
A resilient SOAP client wrapper combining circuit breaker, retry, and fallback patterns.
"""
defstruct [:client, :circuit_breaker, :retry_strategy, :fallback_config, :name]
def new(opts) do
%__MODULE__{
client: Keyword.fetch!(opts, :client),
circuit_breaker: Keyword.fetch!(opts, :circuit_breaker),
retry_strategy: Keyword.get(opts, :retry_strategy, RetryStrategy.new()),
fallback_config: Keyword.get(opts, :fallback_config, FallbackStrategy.new()),
name: Keyword.get(opts, :name, "resilient_client")
}
end
@doc """
Execute a SOAP operation with full resilience:
1. Check circuit breaker
2. Execute with retries
3. Apply fallback if all else fails
"""
def call(resilient_client, operation, params, opts \\ []) do
cache_key = Keyword.get(opts, :cache_key, "#{operation}_default")
idempotent = Keyword.get(opts, :idempotent, true)
start_time = System.monotonic_time(:millisecond)
# Wrap the operation in circuit breaker
result = CircuitBreaker.call(resilient_client.circuit_breaker, fn ->
# Execute with retry strategy
case execute_with_retry(resilient_client, operation, params, idempotent) do
{:ok, response, _meta} -> {:ok, response}
{:error, error, _meta} -> {:error, error}
end
end)
duration = System.monotonic_time(:millisecond) - start_time
# Apply fallback if needed
final_result = case result do
{:ok, response} ->
{:ok, response, %{source: :primary, duration_ms: duration}}
{:error, :circuit_open} ->
apply_fallback(resilient_client.fallback_config, cache_key, duration)
{:error, error} ->
case apply_fallback(resilient_client.fallback_config, cache_key, duration) do
{:ok, fallback_result, meta} -> {:ok, fallback_result, meta}
{:error, _} -> {:error, error, %{duration_ms: duration}}
end
end
log_result(resilient_client.name, operation, final_result)
final_result
end
defp execute_with_retry(resilient_client, operation, params, idempotent) do
RetryExecutor.execute(
fn -> simulate_soap_call(resilient_client.client, operation, params) end,
resilient_client.retry_strategy,
idempotent: idempotent,
on_retry: fn attempt, error, delay ->
IO.puts(" [#{resilient_client.name}] Retry #{attempt} for #{operation}: #{inspect(error)}, waiting #{delay}ms")
end
)
end
defp simulate_soap_call(_client, operation, _params) do
# Simulate SOAP call - in real implementation, use Lather.DynamicClient.call
# For demo, randomly succeed or fail
if :rand.uniform() > 0.3 do
{:ok, %{operation: operation, data: "simulated_response", timestamp: DateTime.utc_now()}}
else
{:error, %{type: :transport_error, reason: :timeout}}
end
end
defp apply_fallback(fallback_config, cache_key, duration) do
case FallbackStrategy.execute_with_fallback(
fn -> {:error, :primary_failed} end,
fallback_config,
cache_key
) do
{:ok, result, source} -> {:ok, result, %{source: source, duration_ms: duration}}
error -> error
end
end
defp log_result(client_name, operation, result) do
case result do
{:ok, _data, %{source: source, duration_ms: duration}} ->
IO.puts(" [#{client_name}] #{operation} succeeded via #{source} in #{duration}ms")
{:error, _error, %{duration_ms: duration}} ->
IO.puts(" [#{client_name}] #{operation} failed after #{duration}ms")
end
end
end
# Create and demonstrate the resilient client
IO.puts("Resilient Client Demo:")
IO.puts("=" |> String.duplicate(50))
# Start a fresh circuit breaker for this demo
{:ok, demo_breaker} = CircuitBreaker.start_link(
name: :demo_resilient_breaker,
failure_threshold: 3,
reset_timeout: 5_000
)
fallback_config = %FallbackStrategy{
cache: %{
"GetUserDetails_default" => %{id: "cached", name: "Cached User", status: "unknown"}
},
default_values: %{
"GetUserDetails_default" => %{id: "default", name: "Default User", status: "offline"}
},
alternative_endpoints: []
}
resilient_client = ResilientClient.new(
client: nil, # Would be a real Lather.DynamicClient in production
circuit_breaker: :demo_resilient_breaker,
retry_strategy: RetryStrategy.new(max_retries: 2, base_delay_ms: 100),
fallback_config: fallback_config,
name: "UserService"
)
# Execute several calls to demonstrate the patterns working together
IO.puts("\nExecuting resilient calls (results will vary due to simulated failures):")
Enum.each(1..5, fn i ->
IO.puts("\nCall #{i}:")
ResilientClient.call(resilient_client, "GetUserDetails", %{user_id: "123"})
end)
# Summary: When to use each pattern
IO.puts("""
RESILIENCE PATTERNS SUMMARY
===========================
CIRCUIT BREAKER - Use when:
* Calling external services that may become unavailable
* You want to fail fast rather than wait for timeouts
* Preventing cascade failures across services
* Configuration: failure_threshold, reset_timeout, success_threshold
RETRY WITH BACKOFF - Use when:
* Transient failures are expected (network blips, temporary overload)
* Operations are idempotent (safe to repeat)
* Configuration: max_retries, base_delay, max_delay, jitter
BULKHEAD - Use when:
* Multiple services share resources (connection pools, threads)
* One slow service shouldn't starve others
* Different services have different criticality levels
* Configuration: pool sizes per service, queue limits
FALLBACK - Use when:
* Degraded service is better than no service
* Cached/stale data is acceptable temporarily
* Default values provide reasonable behavior
* Configuration: cache TTL, default values, alternative endpoints
WHEN NOT TO RETRY:
* Non-idempotent operations (payments, order creation)
* Client errors (400-level HTTP, validation failures)
* Authentication/authorization failures
* Rate limiting (429) - respect backoff headers instead
""")
Best Practices Summary
Here are the key best practices for enterprise SOAP integration with Lather:
best_practices = """
🏢 ENTERPRISE SOAP INTEGRATION BEST PRACTICES
🔧 Configuration:
• Use environment variables for credentials
• Configure appropriate timeouts (60s+ for enterprise)
• Enable SSL verification in production
• Set up connection pooling for high traffic
🛡️ Security:
• Always use HTTPS in production
• Implement proper certificate validation
• Use WS-Security for enhanced authentication
• Log security events but not sensitive data
🚨 Error Handling:
• Distinguish between client and server errors
• Implement exponential backoff for retries
• Set maximum retry limits
• Log errors with correlation IDs
⚡ Performance:
• Reuse client instances across requests
• Use connection pooling effectively
• Monitor response times and error rates
• Implement circuit breakers for failing services
🧪 Testing:
• Test with mock services first
• Validate complex parameter structures
• Test error scenarios and recovery
• Load test with realistic traffic patterns
📊 Monitoring:
• Track operation success rates
• Monitor response times and timeouts
• Set up alerting for service degradation
• Use correlation IDs for request tracing
🔄 Deployment:
• Use feature flags for gradual rollouts
• Keep WSDL versions synchronized
• Plan for service versioning
• Document API changes and dependencies
"""
IO.puts(best_practices)
Next Steps
You’ve now seen how to build robust enterprise SOAP integrations with Lather! Here’s what to explore next:
- Custom Authentication: Implement custom authentication schemes for your specific enterprise requirements
- Service Mesh Integration: Integrate with service mesh technologies for enhanced observability
- Advanced Error Recovery: Build sophisticated error recovery and circuit breaker patterns
- Performance Optimization: Fine-tune connection pools and caching strategies for your workload
Happy enterprise SOAP integration! 🏢✨