Security and Compliance

Security testing goes beyond policy validation to identify vulnerabilities, misconfigurations, and compliance gaps in your infrastructure code. Automated security scanning catches issues that manual reviews might miss, while compliance testing ensures your infrastructure meets regulatory requirements like SOC 2, GDPR, and industry-specific standards.

This part covers comprehensive security testing strategies that integrate with your Terraform workflow to identify and remediate security issues before they reach production.

Infrastructure Security Scanning

Use specialized tools to scan for security vulnerabilities:

#!/bin/bash
# scripts/security-scan.sh

set -e

TERRAFORM_DIR=${1:-"infrastructure"}
REPORT_DIR=${2:-"security-reports"}

mkdir -p "$REPORT_DIR"

echo "Running comprehensive security scan on Terraform configurations..."

# Checkov - comprehensive security scanning
echo "Running Checkov security scan..."
checkov -d "$TERRAFORM_DIR" \
    --framework terraform \
    --output cli \
    --output json \
    --output-file-path console,"$REPORT_DIR/checkov-report.json" \
    --soft-fail

# TFSec - Terraform-specific security scanner
echo "Running TFSec security scan..."
tfsec "$TERRAFORM_DIR" \
    --format json \
    --out "$REPORT_DIR/tfsec-report.json" \
    --soft-fail

# Terrascan - policy-based security scanner
echo "Running Terrascan security scan..."
terrascan scan -t terraform \
    -d "$TERRAFORM_DIR" \
    -o json \
    --output "$REPORT_DIR/terrascan-report.json" \
    --non-recursive

# Custom security checks
echo "Running custom security validations..."
python3 scripts/custom_security_checks.py \
    --terraform-dir "$TERRAFORM_DIR" \
    --output "$REPORT_DIR/custom-security.json"

echo "Security scan complete. Reports saved to $REPORT_DIR/"

Custom Security Validation

Implement organization-specific security checks:

#!/usr/bin/env python3
# scripts/custom_security_checks.py

import json
import os
import re
import argparse
from pathlib import Path

class SecurityValidator:
    def __init__(self, terraform_dir):
        self.terraform_dir = Path(terraform_dir)
        self.findings = []
    
    def check_hardcoded_secrets(self):
        """Check for hardcoded secrets in Terraform files"""
        secret_patterns = [
            (r'password\s*=\s*"[^"]{8,}"', "Hardcoded password detected"),
            (r'secret_key\s*=\s*"[A-Za-z0-9+/]{20,}"', "Hardcoded secret key detected"),
            (r'api_key\s*=\s*"[A-Za-z0-9]{20,}"', "Hardcoded API key detected"),
            (r'token\s*=\s*"[A-Za-z0-9]{20,}"', "Hardcoded token detected"),
        ]
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            for pattern, message in secret_patterns:
                matches = re.finditer(pattern, content, re.IGNORECASE)
                for match in matches:
                    line_num = content[:match.start()].count('\n') + 1
                    self.findings.append({
                        "type": "hardcoded_secret",
                        "severity": "HIGH",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": message,
                        "code": match.group(0)
                    })
    
    def check_public_resources(self):
        """Check for resources that might be publicly accessible"""
        public_patterns = [
            (r'cidr_blocks\s*=\s*\["0\.0\.0\.0/0"\]', "Resource allows access from anywhere"),
            (r'publicly_accessible\s*=\s*true', "Resource is publicly accessible"),
            (r'public_read_access\s*=\s*true', "Resource allows public read access"),
        ]
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            for pattern, message in public_patterns:
                matches = re.finditer(pattern, content, re.IGNORECASE)
                for match in matches:
                    line_num = content[:match.start()].count('\n') + 1
                    self.findings.append({
                        "type": "public_access",
                        "severity": "MEDIUM",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": message,
                        "code": match.group(0)
                    })
    
    def check_encryption_settings(self):
        """Check for missing encryption configurations"""
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check S3 buckets without encryption
            s3_buckets = re.finditer(r'resource\s+"aws_s3_bucket"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for bucket in s3_buckets:
                bucket_config = bucket.group(1)
                if "server_side_encryption_configuration" not in bucket_config:
                    line_num = content[:bucket.start()].count('\n') + 1
                    self.findings.append({
                        "type": "missing_encryption",
                        "severity": "HIGH",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": "S3 bucket missing encryption configuration",
                        "resource": bucket.group(0).split('"')[3]
                    })
            
            # Check RDS instances without encryption
            rds_instances = re.finditer(r'resource\s+"aws_db_instance"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for instance in rds_instances:
                instance_config = instance.group(1)
                if "storage_encrypted" not in instance_config or "storage_encrypted = false" in instance_config:
                    line_num = content[:instance.start()].count('\n') + 1
                    self.findings.append({
                        "type": "missing_encryption",
                        "severity": "HIGH",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": "RDS instance missing encryption",
                        "resource": instance.group(0).split('"')[3]
                    })
    
    def check_network_security(self):
        """Check for network security issues"""
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check for overly permissive security groups
            sg_rules = re.finditer(r'resource\s+"aws_security_group_rule"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for rule in sg_rules:
                rule_config = rule.group(1)
                
                # Check for SSH open to world
                if ('from_port = 22' in rule_config and 
                    'to_port = 22' in rule_config and 
                    'cidr_blocks = ["0.0.0.0/0"]' in rule_config):
                    
                    line_num = content[:rule.start()].count('\n') + 1
                    self.findings.append({
                        "type": "network_security",
                        "severity": "CRITICAL",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": "SSH port open to the world",
                        "resource": rule.group(0).split('"')[3]
                    })
                
                # Check for RDP open to world
                if ('from_port = 3389' in rule_config and 
                    'to_port = 3389' in rule_config and 
                    'cidr_blocks = ["0.0.0.0/0"]' in rule_config):
                    
                    line_num = content[:rule.start()].count('\n') + 1
                    self.findings.append({
                        "type": "network_security",
                        "severity": "CRITICAL",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": "RDP port open to the world",
                        "resource": rule.group(0).split('"')[3]
                    })
    
    def run_all_checks(self):
        """Run all security checks"""
        self.check_hardcoded_secrets()
        self.check_public_resources()
        self.check_encryption_settings()
        self.check_network_security()
        
        return {
            "total_findings": len(self.findings),
            "critical": len([f for f in self.findings if f["severity"] == "CRITICAL"]),
            "high": len([f for f in self.findings if f["severity"] == "HIGH"]),
            "medium": len([f for f in self.findings if f["severity"] == "MEDIUM"]),
            "low": len([f for f in self.findings if f["severity"] == "LOW"]),
            "findings": self.findings
        }

def main():
    parser = argparse.ArgumentParser(description='Custom Terraform security validator')
    parser.add_argument('--terraform-dir', required=True, help='Terraform directory to scan')
    parser.add_argument('--output', required=True, help='Output file for results')
    
    args = parser.parse_args()
    
    validator = SecurityValidator(args.terraform_dir)
    results = validator.run_all_checks()
    
    with open(args.output, 'w') as f:
        json.dump(results, f, indent=2)
    
    print(f"Security validation complete. Found {results['total_findings']} issues.")
    print(f"Critical: {results['critical']}, High: {results['high']}, Medium: {results['medium']}, Low: {results['low']}")
    
    # Exit with error if critical or high severity issues found
    if results['critical'] > 0 or results['high'] > 0:
        exit(1)

if __name__ == "__main__":
    main()

Compliance Framework Testing

Implement automated compliance validation:

#!/usr/bin/env python3
# scripts/compliance_validator.py

import json
import re
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class ComplianceCheck:
    framework: str
    control_id: str
    description: str
    severity: str
    check_function: callable

class ComplianceValidator:
    def __init__(self, terraform_dir: str):
        self.terraform_dir = Path(terraform_dir)
        self.findings = []
        self.checks = self._initialize_checks()
    
    def _initialize_checks(self) -> List[ComplianceCheck]:
        """Initialize compliance checks for various frameworks"""
        return [
            # SOC 2 Type II checks
            ComplianceCheck(
                framework="SOC2",
                control_id="CC6.1",
                description="Logical and physical access controls",
                severity="HIGH",
                check_function=self._check_access_controls
            ),
            ComplianceCheck(
                framework="SOC2",
                control_id="CC6.7",
                description="Data transmission and disposal",
                severity="HIGH",
                check_function=self._check_data_encryption
            ),
            
            # GDPR checks
            ComplianceCheck(
                framework="GDPR",
                control_id="Art.32",
                description="Security of processing",
                severity="CRITICAL",
                check_function=self._check_data_security
            ),
            ComplianceCheck(
                framework="GDPR",
                control_id="Art.17",
                description="Right to erasure",
                severity="MEDIUM",
                check_function=self._check_data_retention
            ),
            
            # PCI DSS checks
            ComplianceCheck(
                framework="PCI_DSS",
                control_id="1.1.4",
                description="Network segmentation",
                severity="CRITICAL",
                check_function=self._check_network_segmentation
            ),
            ComplianceCheck(
                framework="PCI_DSS",
                control_id="3.4",
                description="Encryption of cardholder data",
                severity="CRITICAL",
                check_function=self._check_cardholder_data_encryption
            ),
        ]
    
    def _check_access_controls(self) -> List[Dict[str, Any]]:
        """SOC 2 - Check for proper access controls"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check for IAM policies with overly broad permissions
            iam_policies = re.finditer(r'resource\s+"aws_iam_policy"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for policy in iam_policies:
                policy_config = policy.group(1)
                if '"*"' in policy_config and '"Action"' in policy_config:
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:policy.start()].count('\n') + 1,
                        "message": "IAM policy grants overly broad permissions",
                        "resource": policy.group(0).split('"')[3]
                    })
        
        return findings
    
    def _check_data_encryption(self) -> List[Dict[str, Any]]:
        """SOC 2 - Check for data encryption in transit and at rest"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check S3 buckets for encryption
            s3_buckets = re.finditer(r'resource\s+"aws_s3_bucket"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for bucket in s3_buckets:
                bucket_config = bucket.group(1)
                if "server_side_encryption_configuration" not in bucket_config:
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:bucket.start()].count('\n') + 1,
                        "message": "S3 bucket lacks encryption at rest",
                        "resource": bucket.group(0).split('"')[3]
                    })
            
            # Check ALB listeners for HTTPS
            alb_listeners = re.finditer(r'resource\s+"aws_lb_listener"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for listener in alb_listeners:
                listener_config = listener.group(1)
                if 'protocol = "HTTP"' in listener_config and 'port = "80"' in listener_config:
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:listener.start()].count('\n') + 1,
                        "message": "Load balancer listener uses unencrypted HTTP",
                        "resource": listener.group(0).split('"')[3]
                    })
        
        return findings
    
    def _check_data_security(self) -> List[Dict[str, Any]]:
        """GDPR - Check for data security measures"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check for resources handling personal data without encryption
            resources_with_personal_data = re.finditer(
                r'resource\s+"[^"]+"\s+"[^"]+"\s*{([^}]+tags\s*=\s*{[^}]*DataClassification\s*=\s*"personal"[^}]*}[^}]*)}', 
                content, re.DOTALL
            )
            
            for resource in resources_with_personal_data:
                resource_config = resource.group(1)
                if ("encryption" not in resource_config.lower() and 
                    "kms" not in resource_config.lower()):
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:resource.start()].count('\n') + 1,
                        "message": "Resource handling personal data lacks encryption",
                        "resource": resource.group(0).split('"')[3]
                    })
        
        return findings
    
    def _check_data_retention(self) -> List[Dict[str, Any]]:
        """GDPR - Check for data retention policies"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check S3 buckets for lifecycle policies
            s3_buckets = re.finditer(r'resource\s+"aws_s3_bucket"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for bucket in s3_buckets:
                bucket_name = bucket.group(0).split('"')[3]
                
                # Look for corresponding lifecycle configuration
                lifecycle_pattern = f'resource\\s+"aws_s3_bucket_lifecycle_configuration"\\s+"[^"]*{bucket_name}[^"]*"'
                if not re.search(lifecycle_pattern, content):
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:bucket.start()].count('\n') + 1,
                        "message": "S3 bucket lacks data retention policy",
                        "resource": bucket_name
                    })
        
        return findings
    
    def _check_network_segmentation(self) -> List[Dict[str, Any]]:
        """PCI DSS - Check for proper network segmentation"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check for PCI-scoped resources without proper network isolation
            pci_resources = re.finditer(
                r'resource\s+"[^"]+"\s+"[^"]+"\s*{([^}]+tags\s*=\s*{[^}]*PCIScope\s*=\s*"true"[^}]*}[^}]*)}', 
                content, re.DOTALL
            )
            
            for resource in pci_resources:
                resource_config = resource.group(1)
                # Check if resource is in a dedicated VPC or subnet
                if ("vpc_id" not in resource_config and 
                    "subnet_id" not in resource_config):
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:resource.start()].count('\n') + 1,
                        "message": "PCI-scoped resource lacks network segmentation",
                        "resource": resource.group(0).split('"')[3]
                    })
        
        return findings
    
    def _check_cardholder_data_encryption(self) -> List[Dict[str, Any]]:
        """PCI DSS - Check encryption of cardholder data"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check databases that might store cardholder data
            db_instances = re.finditer(
                r'resource\s+"aws_db_instance"\s+"[^"]+"\s*{([^}]+tags\s*=\s*{[^}]*CardholderData\s*=\s*"true"[^}]*}[^}]*)}', 
                content, re.DOTALL
            )
            
            for db in db_instances:
                db_config = db.group(1)
                if "storage_encrypted = true" not in db_config:
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:db.start()].count('\n') + 1,
                        "message": "Database storing cardholder data is not encrypted",
                        "resource": db.group(0).split('"')[3]
                    })
        
        return findings
    
    def run_compliance_checks(self) -> Dict[str, Any]:
        """Run all compliance checks"""
        results = {
            "frameworks": {},
            "total_findings": 0,
            "critical": 0,
            "high": 0,
            "medium": 0,
            "low": 0
        }
        
        for check in self.checks:
            framework_findings = check.check_function()
            
            if check.framework not in results["frameworks"]:
                results["frameworks"][check.framework] = {
                    "controls": {},
                    "total_findings": 0
                }
            
            results["frameworks"][check.framework]["controls"][check.control_id] = {
                "description": check.description,
                "severity": check.severity,
                "findings": framework_findings,
                "compliant": len(framework_findings) == 0
            }
            
            results["frameworks"][check.framework]["total_findings"] += len(framework_findings)
            results["total_findings"] += len(framework_findings)
            
            # Count by severity
            severity_count = len(framework_findings)
            if check.severity == "CRITICAL":
                results["critical"] += severity_count
            elif check.severity == "HIGH":
                results["high"] += severity_count
            elif check.severity == "MEDIUM":
                results["medium"] += severity_count
            else:
                results["low"] += severity_count
        
        return results

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='Terraform compliance validator')
    parser.add_argument('--terraform-dir', required=True, help='Terraform directory to validate')
    parser.add_argument('--output', required=True, help='Output file for results')
    parser.add_argument('--frameworks', nargs='+', default=['SOC2', 'GDPR', 'PCI_DSS'], 
                       help='Compliance frameworks to check')
    
    args = parser.parse_args()
    
    validator = ComplianceValidator(args.terraform_dir)
    results = validator.run_compliance_checks()
    
    # Filter results by requested frameworks
    if args.frameworks:
        filtered_frameworks = {k: v for k, v in results["frameworks"].items() 
                             if k in args.frameworks}
        results["frameworks"] = filtered_frameworks
    
    with open(args.output, 'w') as f:
        json.dump(results, f, indent=2)
    
    print(f"Compliance validation complete.")
    print(f"Total findings: {results['total_findings']}")
    print(f"Critical: {results['critical']}, High: {results['high']}, Medium: {results['medium']}")
    
    for framework, data in results["frameworks"].items():
        compliant_controls = sum(1 for control in data["controls"].values() if control["compliant"])
        total_controls = len(data["controls"])
        compliance_rate = (compliant_controls / total_controls * 100) if total_controls > 0 else 0
        print(f"{framework}: {compliance_rate:.1f}% compliant ({compliant_controls}/{total_controls} controls)")

if __name__ == "__main__":
    main()

What’s Next

Security and compliance testing provides deep validation of your infrastructure’s security posture and regulatory compliance. Combined with policy as code, these testing strategies create a comprehensive security validation framework that catches issues early in the development cycle.

In the next part, we’ll explore performance and cost testing techniques that validate not just the functionality and security of your infrastructure, but also its efficiency, scalability, and cost-effectiveness.