Performance and Cost Testing
Performance and cost testing ensure your infrastructure is not only functional and secure, but also efficient and cost-effective. These tests validate resource sizing, identify optimization opportunities, and prevent cost overruns before they impact your budget. Automated cost analysis and performance validation help maintain operational efficiency as your infrastructure scales.
This part covers comprehensive strategies for testing infrastructure performance characteristics and validating cost implications of your Terraform configurations.
Cost Impact Analysis
Analyze the cost implications of infrastructure changes:
#!/usr/bin/env python3
# scripts/cost_impact_analyzer.py
import json
import re
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class ResourceCost:
resource_type: str
resource_name: str
monthly_cost: float
annual_cost: float
cost_factors: Dict[str, any]
class CostAnalyzer:
def __init__(self):
# AWS pricing data (simplified - in practice, use AWS Pricing API)
self.pricing_data = {
"aws_instance": {
"t3.micro": {"hourly": 0.0104, "monthly": 7.59},
"t3.small": {"hourly": 0.0208, "monthly": 15.18},
"t3.medium": {"hourly": 0.0416, "monthly": 30.37},
"t3.large": {"hourly": 0.0832, "monthly": 60.74},
"m5.large": {"hourly": 0.096, "monthly": 70.08},
"m5.xlarge": {"hourly": 0.192, "monthly": 140.16},
},
"aws_rds_instance": {
"db.t3.micro": {"hourly": 0.017, "monthly": 12.41},
"db.t3.small": {"hourly": 0.034, "monthly": 24.82},
"db.r5.large": {"hourly": 0.24, "monthly": 175.20},
"db.r5.xlarge": {"hourly": 0.48, "monthly": 350.40},
},
"aws_s3_bucket": {
"standard": {"per_gb_monthly": 0.023},
"ia": {"per_gb_monthly": 0.0125},
"glacier": {"per_gb_monthly": 0.004},
},
"aws_ebs_volume": {
"gp3": {"per_gb_monthly": 0.08},
"gp2": {"per_gb_monthly": 0.10},
"io1": {"per_gb_monthly": 0.125, "per_iops_monthly": 0.065},
}
}
def analyze_terraform_plan(self, plan_file: str) -> Dict[str, any]:
"""Analyze Terraform plan for cost implications"""
with open(plan_file, 'r') as f:
plan_data = json.load(f)
resource_costs = []
total_monthly_cost = 0
# Analyze planned resources
if 'planned_values' in plan_data and 'root_module' in plan_data['planned_values']:
resources = plan_data['planned_values']['root_module'].get('resources', [])
for resource in resources:
cost = self._calculate_resource_cost(resource)
if cost:
resource_costs.append(cost)
total_monthly_cost += cost.monthly_cost
# Analyze resource changes
cost_changes = self._analyze_cost_changes(plan_data.get('resource_changes', []))
return {
"total_monthly_cost": total_monthly_cost,
"total_annual_cost": total_monthly_cost * 12,
"resource_costs": [
{
"resource_type": rc.resource_type,
"resource_name": rc.resource_name,
"monthly_cost": rc.monthly_cost,
"annual_cost": rc.annual_cost,
"cost_factors": rc.cost_factors
}
for rc in resource_costs
],
"cost_changes": cost_changes,
"cost_breakdown": self._generate_cost_breakdown(resource_costs)
}
def _calculate_resource_cost(self, resource: Dict) -> Optional[ResourceCost]:
"""Calculate cost for a specific resource"""
resource_type = resource.get('type')
resource_name = resource.get('name', 'unknown')
values = resource.get('values', {})
if resource_type == 'aws_instance':
return self._calculate_ec2_cost(resource_name, values)
elif resource_type == 'aws_rds_instance':
return self._calculate_rds_cost(resource_name, values)
elif resource_type == 'aws_s3_bucket':
return self._calculate_s3_cost(resource_name, values)
elif resource_type == 'aws_ebs_volume':
return self._calculate_ebs_cost(resource_name, values)
return None
def _calculate_ec2_cost(self, name: str, values: Dict) -> Optional[ResourceCost]:
"""Calculate EC2 instance cost"""
instance_type = values.get('instance_type')
if not instance_type or instance_type not in self.pricing_data['aws_instance']:
return None
pricing = self.pricing_data['aws_instance'][instance_type]
monthly_cost = pricing['monthly']
# Adjust for additional costs
if values.get('ebs_optimized'):
monthly_cost *= 1.1 # 10% premium for EBS optimization
return ResourceCost(
resource_type='aws_instance',
resource_name=name,
monthly_cost=monthly_cost,
annual_cost=monthly_cost * 12,
cost_factors={
'instance_type': instance_type,
'ebs_optimized': values.get('ebs_optimized', False),
'hourly_rate': pricing['hourly']
}
)
def _calculate_rds_cost(self, name: str, values: Dict) -> Optional[ResourceCost]:
"""Calculate RDS instance cost"""
instance_class = values.get('instance_class')
if not instance_class or instance_class not in self.pricing_data['aws_rds_instance']:
return None
pricing = self.pricing_data['aws_rds_instance'][instance_class]
monthly_cost = pricing['monthly']
# Adjust for Multi-AZ
if values.get('multi_az'):
monthly_cost *= 2
# Add storage cost
allocated_storage = values.get('allocated_storage', 20)
storage_cost = allocated_storage * 0.115 # GP2 storage cost per GB
monthly_cost += storage_cost
return ResourceCost(
resource_type='aws_rds_instance',
resource_name=name,
monthly_cost=monthly_cost,
annual_cost=monthly_cost * 12,
cost_factors={
'instance_class': instance_class,
'multi_az': values.get('multi_az', False),
'allocated_storage': allocated_storage,
'storage_cost': storage_cost
}
)
def _calculate_s3_cost(self, name: str, values: Dict) -> ResourceCost:
"""Calculate S3 bucket cost (estimated)"""
# S3 cost depends on usage, so we provide estimates
estimated_gb = 100 # Default estimate
storage_class = 'standard' # Default
pricing = self.pricing_data['aws_s3_bucket'][storage_class]
monthly_cost = estimated_gb * pricing['per_gb_monthly']
return ResourceCost(
resource_type='aws_s3_bucket',
resource_name=name,
monthly_cost=monthly_cost,
annual_cost=monthly_cost * 12,
cost_factors={
'estimated_storage_gb': estimated_gb,
'storage_class': storage_class,
'per_gb_cost': pricing['per_gb_monthly']
}
)
def _calculate_ebs_cost(self, name: str, values: Dict) -> Optional[ResourceCost]:
"""Calculate EBS volume cost"""
volume_type = values.get('type', 'gp3')
size = values.get('size', 8)
if volume_type not in self.pricing_data['aws_ebs_volume']:
return None
pricing = self.pricing_data['aws_ebs_volume'][volume_type]
monthly_cost = size * pricing['per_gb_monthly']
# Add IOPS cost for io1 volumes
if volume_type == 'io1':
iops = values.get('iops', 100)
monthly_cost += iops * pricing['per_iops_monthly']
return ResourceCost(
resource_type='aws_ebs_volume',
resource_name=name,
monthly_cost=monthly_cost,
annual_cost=monthly_cost * 12,
cost_factors={
'volume_type': volume_type,
'size_gb': size,
'iops': values.get('iops') if volume_type == 'io1' else None
}
)
def _analyze_cost_changes(self, resource_changes: List[Dict]) -> Dict[str, any]:
"""Analyze cost impact of resource changes"""
changes = {
"new_resources": 0,
"modified_resources": 0,
"destroyed_resources": 0,
"cost_increase": 0,
"cost_decrease": 0
}
for change in resource_changes:
actions = change.get('change', {}).get('actions', [])
if 'create' in actions:
changes["new_resources"] += 1
# Estimate cost increase for new resources
if change.get('type') == 'aws_instance':
instance_type = change.get('change', {}).get('after', {}).get('instance_type')
if instance_type in self.pricing_data['aws_instance']:
changes["cost_increase"] += self.pricing_data['aws_instance'][instance_type]['monthly']
elif 'update' in actions:
changes["modified_resources"] += 1
elif 'delete' in actions:
changes["destroyed_resources"] += 1
# Estimate cost decrease for destroyed resources
if change.get('type') == 'aws_instance':
instance_type = change.get('change', {}).get('before', {}).get('instance_type')
if instance_type in self.pricing_data['aws_instance']:
changes["cost_decrease"] += self.pricing_data['aws_instance'][instance_type]['monthly']
changes["net_cost_change"] = changes["cost_increase"] - changes["cost_decrease"]
return changes
def _generate_cost_breakdown(self, resource_costs: List[ResourceCost]) -> Dict[str, any]:
"""Generate cost breakdown by resource type"""
breakdown = {}
for cost in resource_costs:
if cost.resource_type not in breakdown:
breakdown[cost.resource_type] = {
"count": 0,
"monthly_cost": 0,
"annual_cost": 0
}
breakdown[cost.resource_type]["count"] += 1
breakdown[cost.resource_type]["monthly_cost"] += cost.monthly_cost
breakdown[cost.resource_type]["annual_cost"] += cost.annual_cost
return breakdown
def main():
import argparse
parser = argparse.ArgumentParser(description='Terraform cost impact analyzer')
parser.add_argument('--plan-file', required=True, help='Terraform plan JSON file')
parser.add_argument('--budget-limit', type=float, help='Monthly budget limit for validation')
parser.add_argument('--output', required=True, help='Output file for cost analysis')
args = parser.parse_args()
analyzer = CostAnalyzer()
analysis = analyzer.analyze_terraform_plan(args.plan_file)
with open(args.output, 'w') as f:
json.dump(analysis, f, indent=2)
print(f"Cost Analysis Complete:")
print(f" Monthly Cost: ${analysis['total_monthly_cost']:.2f}")
print(f" Annual Cost: ${analysis['total_annual_cost']:.2f}")
if args.budget_limit:
if analysis['total_monthly_cost'] > args.budget_limit:
print(f" ⚠️ BUDGET EXCEEDED: ${analysis['total_monthly_cost']:.2f} > ${args.budget_limit:.2f}")
exit(1)
else:
print(f" ✅ Within budget: ${analysis['total_monthly_cost']:.2f} <= ${args.budget_limit:.2f}")
print("\nCost Breakdown:")
for resource_type, breakdown in analysis['cost_breakdown'].items():
print(f" {resource_type}: {breakdown['count']} resources, ${breakdown['monthly_cost']:.2f}/month")
if __name__ == "__main__":
main()
Performance Testing Framework
Test infrastructure performance characteristics:
#!/usr/bin/env python3
# scripts/performance_tester.py
import json
import time
import concurrent.futures
import requests
from dataclasses import dataclass
from typing import List, Dict, Optional
import boto3
@dataclass
class PerformanceMetric:
metric_name: str
value: float
unit: str
threshold: Optional[float] = None
passed: Optional[bool] = None
class InfrastructurePerformanceTester:
def __init__(self, terraform_outputs: Dict):
self.outputs = terraform_outputs
self.metrics = []
def test_web_application_performance(self, url: str, concurrent_users: int = 10, duration: int = 60):
"""Test web application performance under load"""
print(f"Testing web application performance: {url}")
def make_request():
try:
start_time = time.time()
response = requests.get(url, timeout=30)
end_time = time.time()
return {
'response_time': end_time - start_time,
'status_code': response.status_code,
'success': response.status_code == 200
}
except Exception as e:
return {
'response_time': 30.0,
'status_code': 0,
'success': False,
'error': str(e)
}
# Run load test
results = []
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
while time.time() - start_time < duration:
futures = [executor.submit(make_request) for _ in range(concurrent_users)]
batch_results = [future.result() for future in concurrent.futures.as_completed(futures)]
results.extend(batch_results)
time.sleep(1) # 1 second between batches
# Analyze results
successful_requests = [r for r in results if r['success']]
response_times = [r['response_time'] for r in successful_requests]
if response_times:
avg_response_time = sum(response_times) / len(response_times)
max_response_time = max(response_times)
min_response_time = min(response_times)
p95_response_time = sorted(response_times)[int(len(response_times) * 0.95)]
else:
avg_response_time = max_response_time = min_response_time = p95_response_time = 0
success_rate = len(successful_requests) / len(results) * 100 if results else 0
# Add metrics
self.metrics.extend([
PerformanceMetric("avg_response_time", avg_response_time, "seconds", 2.0),
PerformanceMetric("max_response_time", max_response_time, "seconds", 5.0),
PerformanceMetric("p95_response_time", p95_response_time, "seconds", 3.0),
PerformanceMetric("success_rate", success_rate, "percent", 95.0),
PerformanceMetric("total_requests", len(results), "count"),
])
return {
'total_requests': len(results),
'successful_requests': len(successful_requests),
'success_rate': success_rate,
'avg_response_time': avg_response_time,
'max_response_time': max_response_time,
'min_response_time': min_response_time,
'p95_response_time': p95_response_time
}
def test_database_performance(self, db_endpoint: str, db_name: str):
"""Test database performance"""
print(f"Testing database performance: {db_endpoint}")
# This would typically involve connecting to the database
# and running performance tests. For this example, we'll
# use CloudWatch metrics instead.
try:
cloudwatch = boto3.client('cloudwatch')
# Get recent database metrics
end_time = time.time()
start_time = end_time - 3600 # Last hour
metrics_to_check = [
('CPUUtilization', 'AWS/RDS'),
('DatabaseConnections', 'AWS/RDS'),
('ReadLatency', 'AWS/RDS'),
('WriteLatency', 'AWS/RDS'),
]
db_metrics = {}
for metric_name, namespace in metrics_to_check:
response = cloudwatch.get_metric_statistics(
Namespace=namespace,
MetricName=metric_name,
Dimensions=[
{
'Name': 'DBInstanceIdentifier',
'Value': db_name
}
],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average', 'Maximum']
)
if response['Datapoints']:
latest_datapoint = max(response['Datapoints'], key=lambda x: x['Timestamp'])
db_metrics[metric_name] = {
'average': latest_datapoint['Average'],
'maximum': latest_datapoint['Maximum']
}
# Add database performance metrics
if 'CPUUtilization' in db_metrics:
self.metrics.append(
PerformanceMetric("db_cpu_utilization", db_metrics['CPUUtilization']['average'], "percent", 80.0)
)
if 'ReadLatency' in db_metrics:
self.metrics.append(
PerformanceMetric("db_read_latency", db_metrics['ReadLatency']['average'] * 1000, "milliseconds", 20.0)
)
if 'WriteLatency' in db_metrics:
self.metrics.append(
PerformanceMetric("db_write_latency", db_metrics['WriteLatency']['average'] * 1000, "milliseconds", 50.0)
)
return db_metrics
except Exception as e:
print(f"Error testing database performance: {e}")
return {}
def test_auto_scaling_performance(self, asg_name: str):
"""Test auto scaling group performance"""
print(f"Testing auto scaling performance: {asg_name}")
try:
autoscaling = boto3.client('autoscaling')
cloudwatch = boto3.client('cloudwatch')
# Get ASG details
response = autoscaling.describe_auto_scaling_groups(
AutoScalingGroupNames=[asg_name]
)
if not response['AutoScalingGroups']:
return {}
asg = response['AutoScalingGroups'][0]
# Check scaling metrics
end_time = time.time()
start_time = end_time - 3600 # Last hour
# Get CloudWatch metrics for the ASG
response = cloudwatch.get_metric_statistics(
Namespace='AWS/AutoScaling',
MetricName='GroupTotalInstances',
Dimensions=[
{
'Name': 'AutoScalingGroupName',
'Value': asg_name
}
],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average', 'Maximum', 'Minimum']
)
scaling_metrics = {}
if response['Datapoints']:
latest_datapoint = max(response['Datapoints'], key=lambda x: x['Timestamp'])
scaling_metrics = {
'current_instances': latest_datapoint['Average'],
'max_instances': latest_datapoint['Maximum'],
'min_instances': latest_datapoint['Minimum']
}
# Add scaling performance metrics
self.metrics.extend([
PerformanceMetric("asg_current_capacity", asg['DesiredCapacity'], "count"),
PerformanceMetric("asg_min_size", asg['MinSize'], "count"),
PerformanceMetric("asg_max_size", asg['MaxSize'], "count"),
])
return {
'asg_name': asg_name,
'desired_capacity': asg['DesiredCapacity'],
'min_size': asg['MinSize'],
'max_size': asg['MaxSize'],
'current_instances': len(asg['Instances']),
'scaling_metrics': scaling_metrics
}
except Exception as e:
print(f"Error testing auto scaling performance: {e}")
return {}
def evaluate_performance_thresholds(self):
"""Evaluate all metrics against their thresholds"""
for metric in self.metrics:
if metric.threshold is not None:
if metric.metric_name.endswith('_rate'):
# For rates, higher is better
metric.passed = metric.value >= metric.threshold
elif 'latency' in metric.metric_name or 'response_time' in metric.metric_name:
# For latency/response time, lower is better
metric.passed = metric.value <= metric.threshold
elif 'utilization' in metric.metric_name:
# For utilization, lower is better (below threshold)
metric.passed = metric.value <= metric.threshold
else:
# Default: lower is better
metric.passed = metric.value <= metric.threshold
def generate_performance_report(self) -> Dict:
"""Generate comprehensive performance report"""
self.evaluate_performance_thresholds()
passed_metrics = [m for m in self.metrics if m.passed is True]
failed_metrics = [m for m in self.metrics if m.passed is False]
return {
'timestamp': time.time(),
'total_metrics': len(self.metrics),
'passed_metrics': len(passed_metrics),
'failed_metrics': len(failed_metrics),
'success_rate': len(passed_metrics) / len(self.metrics) * 100 if self.metrics else 0,
'metrics': [
{
'name': m.metric_name,
'value': m.value,
'unit': m.unit,
'threshold': m.threshold,
'passed': m.passed
}
for m in self.metrics
],
'failed_tests': [
{
'name': m.metric_name,
'value': m.value,
'threshold': m.threshold,
'unit': m.unit
}
for m in failed_metrics
]
}
def main():
import argparse
parser = argparse.ArgumentParser(description='Infrastructure performance tester')
parser.add_argument('--terraform-outputs', required=True, help='Terraform outputs JSON file')
parser.add_argument('--output', required=True, help='Output file for performance report')
parser.add_argument('--load-test-duration', type=int, default=60, help='Load test duration in seconds')
parser.add_argument('--concurrent-users', type=int, default=10, help='Number of concurrent users for load testing')
args = parser.parse_args()
# Load Terraform outputs
with open(args.terraform_outputs, 'r') as f:
terraform_outputs = json.load(f)
tester = InfrastructurePerformanceTester(terraform_outputs)
# Run performance tests based on available outputs
if 'load_balancer_dns_name' in terraform_outputs:
url = f"http://{terraform_outputs['load_balancer_dns_name']['value']}"
tester.test_web_application_performance(
url,
args.concurrent_users,
args.load_test_duration
)
if 'database_endpoint' in terraform_outputs:
db_endpoint = terraform_outputs['database_endpoint']['value']
db_name = terraform_outputs.get('database_name', {}).get('value', 'main')
tester.test_database_performance(db_endpoint, db_name)
if 'asg_name' in terraform_outputs:
asg_name = terraform_outputs['asg_name']['value']
tester.test_auto_scaling_performance(asg_name)
# Generate report
report = tester.generate_performance_report()
with open(args.output, 'w') as f:
json.dump(report, f, indent=2)
print(f"Performance testing complete:")
print(f" Total metrics: {report['total_metrics']}")
print(f" Passed: {report['passed_metrics']}")
print(f" Failed: {report['failed_metrics']}")
print(f" Success rate: {report['success_rate']:.1f}%")
if report['failed_tests']:
print("\nFailed performance tests:")
for test in report['failed_tests']:
print(f" - {test['name']}: {test['value']} {test['unit']} (threshold: {test['threshold']})")
exit(1)
if __name__ == "__main__":
main()
Resource Optimization Analysis
Analyze resource configurations for optimization opportunities:
#!/bin/bash
# scripts/optimization-analyzer.sh
set -e
TERRAFORM_DIR=${1:-"infrastructure"}
OUTPUT_DIR=${2:-"optimization-reports"}
mkdir -p "$OUTPUT_DIR"
echo "Analyzing Terraform configurations for optimization opportunities..."
# Generate Terraform plans for analysis
find "$TERRAFORM_DIR" -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
echo "Analyzing $dir for optimization opportunities..."
cd "$dir"
terraform init -backend=false
terraform plan -out=optimization.tfplan
terraform show -json optimization.tfplan > optimization-plan.json
cd - > /dev/null
# Run optimization analysis
python3 scripts/resource_optimizer.py \
--plan-file "$dir/optimization-plan.json" \
--output "$OUTPUT_DIR/$(basename "$dir")-optimization.json"
done
# Generate consolidated optimization report
python3 scripts/consolidate_optimization_reports.py \
--reports-dir "$OUTPUT_DIR" \
--output "$OUTPUT_DIR/consolidated-optimization-report.json"
echo "Optimization analysis complete. Reports saved to $OUTPUT_DIR/"
What’s Next
Performance and cost testing complete the comprehensive testing strategy by validating the efficiency and economic impact of your infrastructure. Combined with functional, security, and policy testing, these techniques ensure your infrastructure meets all requirements for production deployment.
In the final part, we’ll integrate all these testing strategies into comprehensive CI/CD pipelines that automate the entire testing workflow, from static analysis through performance validation, creating a complete quality assurance framework for infrastructure as code.