Infrastructure code needs the same quality assurance practices as application code, but testing infrastructure presents unique challenges. How do you unit test a VPC? How do you validate that your security policies actually work? How do you catch configuration errors before they reach production?

This guide covers the complete spectrum of Terraform testing and validation, from static analysis and policy validation to integration testing with real cloud resources.

Static Analysis and Linting

Static analysis catches errors before you even run Terraform, identifying syntax issues, security problems, and style inconsistencies that could cause problems later. Unlike application code, infrastructure code mistakes can be expensive—literally. A misconfigured security group or an oversized instance type can cost money and create security vulnerabilities.

The tools and practices in this part form the first line of defense against infrastructure code problems, catching issues in your editor and CI pipeline before they reach cloud resources.

Terraform Built-in Validation

Terraform includes several built-in validation commands that should be part of every workflow:

# Format code consistently
terraform fmt -recursive

# Check for syntax errors and validate configuration
terraform validate

# Generate and review execution plans
terraform plan -out=tfplan

# Show plan in human-readable format
terraform show tfplan

# Show plan in JSON for automated analysis
terraform show -json tfplan | jq '.planned_values'

Automated formatting ensures consistent code style:

# Check if files need formatting (exits with code 3 if changes needed)
terraform fmt -check -recursive

# Format all files in current directory and subdirectories
terraform fmt -recursive

# Show what would be formatted without making changes
terraform fmt -diff -check

Configuration validation catches syntax and logic errors:

# Validate configuration syntax
terraform validate

# Validate with specific variable values
terraform validate -var="environment=prod"

# Validate without initializing providers
terraform validate -backend=false

TFLint for Advanced Linting

TFLint provides deeper analysis than Terraform’s built-in validation:

# Install TFLint
curl -s https://raw.githubusercontent.com/terraform-linters/tflint/master/install_linux.sh | bash

# Initialize TFLint with plugins
tflint --init

# Run linting
tflint

# Run with specific ruleset
tflint --enable-rule=terraform_unused_declarations

TFLint configuration (.tflint.hcl):

config {
  module = true
  force = false
}

plugin "aws" {
  enabled = true
  version = "0.24.1"
  source  = "github.com/terraform-linters/tflint-ruleset-aws"
}

rule "terraform_deprecated_interpolation" {
  enabled = true
}

rule "terraform_unused_declarations" {
  enabled = true
}

rule "terraform_comment_syntax" {
  enabled = true
}

rule "terraform_documented_outputs" {
  enabled = true
}

rule "terraform_documented_variables" {
  enabled = true
}

rule "terraform_typed_variables" {
  enabled = true
}

rule "terraform_module_pinned_source" {
  enabled = true
}

rule "terraform_naming_convention" {
  enabled = true
  format  = "snake_case"
}

rule "terraform_standard_module_structure" {
  enabled = true
}

AWS-specific rules catch cloud-specific issues:

# Check for deprecated instance types
tflint --enable-rule=aws_instance_previous_type

# Validate security group rules
tflint --enable-rule=aws_security_group_rule_description

# Check for invalid AMI IDs
tflint --enable-rule=aws_instance_invalid_ami

Checkov for Security Scanning

Checkov scans for security and compliance issues:

# Install Checkov
pip install checkov

# Scan Terraform files
checkov -f main.tf

# Scan entire directory
checkov -d .

# Output in different formats
checkov -d . --output json
checkov -d . --output sarif

# Skip specific checks
checkov -d . --skip-check CKV_AWS_23

# Run only specific frameworks
checkov -d . --framework terraform

Custom Checkov policies for organization-specific rules:

# custom_checks/RequireOwnerTag.py
from checkov.common.models.enums import TRUE_VALUES
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck

class RequireOwnerTag(BaseResourceCheck):
    def __init__(self):
        name = "Ensure all resources have Owner tag"
        id = "CKV_CUSTOM_1"
        supported_resources = ['*']
        categories = [CheckCategories.GENERAL_SECURITY]
        super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

    def scan_resource_conf(self, conf):
        """
        Looks for Owner tag in resource configuration
        """
        if 'tags' in conf:
            tags = conf['tags'][0]
            if isinstance(tags, dict) and 'Owner' in tags:
                return CheckResult.PASSED
        return CheckResult.FAILED

check = RequireOwnerTag()

Terraform Docs for Documentation

Terraform-docs generates documentation from your code:

# Install terraform-docs
curl -sSLo ./terraform-docs.tar.gz https://terraform-docs.io/dl/v0.16.0/terraform-docs-v0.16.0-$(uname)-amd64.tar.gz
tar -xzf terraform-docs.tar.gz
chmod +x terraform-docs
sudo mv terraform-docs /usr/local/bin/terraform-docs

# Generate documentation
terraform-docs markdown table . > README.md

# Generate with custom template
terraform-docs markdown table --output-file README.md .

Configuration file (.terraform-docs.yml):

formatter: "markdown table"
header-from: main.tf
footer-from: ""
recursive:
  enabled: false
  path: modules
sections:
  hide: []
  show: []
content: |-
  # {{ .Header }}
  
  {{ .Requirements }}
  
  {{ .Providers }}
  
  {{ .Modules }}
  
  {{ .Resources }}
  
  {{ .Inputs }}
  
  {{ .Outputs }}
output:
  file: "README.md"
  mode: inject
  template: |-
    <!-- BEGIN_TF_DOCS -->
    {{ .Content }}
    <!-- END_TF_DOCS -->
sort:
  enabled: true
  by: name
settings:
  anchor: true
  color: true
  default: true
  description: false
  escape: true
  hide-empty: false
  html: true
  indent: 2
  lockfile: true
  read-comments: true
  required: true
  sensitive: true
  type: true

Pre-commit Hooks

Pre-commit hooks run validation automatically before commits:

# Install pre-commit
pip install pre-commit

# Install hooks
pre-commit install

Pre-commit configuration (.pre-commit-config.yaml):

repos:
  - repo: https://github.com/antonbabenko/pre-commit-terraform
    rev: v1.81.0
    hooks:
      - id: terraform_fmt
      - id: terraform_validate
      - id: terraform_docs
        args:
          - --hook-config=--path-to-file=README.md
          - --hook-config=--add-to-existing-file=true
          - --hook-config=--create-file-if-not-exist=true
      - id: terraform_tflint
        args:
          - --args=--only=terraform_deprecated_interpolation
          - --args=--only=terraform_unused_declarations
          - --args=--only=terraform_comment_syntax
          - --args=--only=terraform_documented_outputs
          - --args=--only=terraform_documented_variables
          - --args=--only=terraform_typed_variables
          - --args=--only=terraform_module_pinned_source
          - --args=--only=terraform_naming_convention
          - --args=--only=terraform_standard_module_structure
      - id: terraform_tfsec
      - id: terraform_checkov
        args:
          - --args=--skip-check CKV2_AWS_6
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.4.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-added-large-files

TFSec for Security Analysis

TFSec focuses specifically on security issues:

# Install tfsec
curl -s https://raw.githubusercontent.com/aquasecurity/tfsec/master/scripts/install_linux.sh | bash

# Run security scan
tfsec .

# Output in different formats
tfsec --format json .
tfsec --format sarif .

# Exclude specific checks
tfsec --exclude aws-s3-enable-logging .

# Run with custom checks
tfsec --custom-check-dir ./custom-checks .

Custom TFSec rules:

package custom

import (
    "github.com/aquasecurity/tfsec/pkg/result"
    "github.com/aquasecurity/tfsec/pkg/severity"
    "github.com/aquasecurity/tfsec/pkg/state"
    "github.com/aquasecurity/tfsec/pkg/rule"
)

var RequireOwnerTag = rule.Rule{
    LegacyID: "CUS001",
    BadExample: []string{`
resource "aws_instance" "bad_example" {
  ami           = "ami-12345678"
  instance_type = "t2.micro"
}
`},
    GoodExample: []string{`
resource "aws_instance" "good_example" {
  ami           = "ami-12345678"
  instance_type = "t2.micro"
  
  tags = {
    Owner = "team-name"
  }
}
`},
    Links: []string{
        "https://example.com/tagging-policy",
    },
    RequiredTypes: []string{"resource"},
    RequiredLabels: []string{"aws_instance"},
    Base: rule.Base{
        Rule: result.Rule{
            AVDID:       "AVD-CUS-0001",
            Provider:    "aws",
            Service:     "ec2",
            ShortCode:   "require-owner-tag",
            Summary:     "Resource should have Owner tag",
            Impact:      "Resources without Owner tag cannot be tracked for cost allocation",
            Resolution:  "Add Owner tag to resource",
            Explanation: "All resources should have an Owner tag for cost allocation and management purposes",
            Severity:    severity.Medium,
        },
    },
}

Automated Quality Gates

Integrate static analysis into CI/CD pipelines:

# GitHub Actions workflow
name: Terraform Quality Gates
on:
  pull_request:
    paths: ['**.tf', '**.tfvars']

jobs:
  quality:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v2
        with:
          terraform_version: 1.6.0
      
      - name: Terraform Format Check
        run: terraform fmt -check -recursive
      
      - name: Terraform Validate
        run: |
          terraform init -backend=false
          terraform validate
      
      - name: Run TFLint
        uses: terraform-linters/setup-tflint@v3
        with:
          tflint_version: v0.47.0
      - run: tflint --init
      - run: tflint -f compact
      
      - name: Run Checkov
        uses: bridgecrewio/checkov-action@master
        with:
          directory: .
          framework: terraform
          output_format: sarif
          output_file_path: checkov.sarif
      
      - name: Run TFSec
        uses: aquasecurity/[email protected]
        with:
          sarif_file: tfsec.sarif
      
      - name: Upload SARIF files
        uses: github/codeql-action/upload-sarif@v2
        with:
          sarif_file: |
            checkov.sarif
            tfsec.sarif

Quality Metrics and Reporting

Track code quality metrics over time:

#!/bin/bash
# quality-report.sh

echo "=== Terraform Quality Report ==="
echo "Generated: $(date)"
echo

echo "=== Format Check ==="
terraform fmt -check -recursive
FORMAT_EXIT=$?

echo "=== Validation ==="
terraform validate
VALIDATE_EXIT=$?

echo "=== TFLint Results ==="
tflint --format compact
TFLINT_EXIT=$?

echo "=== Security Scan ==="
tfsec --format table
TFSEC_EXIT=$?

echo "=== Documentation Check ==="
terraform-docs markdown table . > /tmp/docs.md
if diff -q README.md /tmp/docs.md > /dev/null; then
    echo "Documentation is up to date"
    DOCS_EXIT=0
else
    echo "Documentation needs updating"
    DOCS_EXIT=1
fi

echo
echo "=== Summary ==="
echo "Format: $([ $FORMAT_EXIT -eq 0 ] && echo "PASS" || echo "FAIL")"
echo "Validation: $([ $VALIDATE_EXIT -eq 0 ] && echo "PASS" || echo "FAIL")"
echo "Linting: $([ $TFLINT_EXIT -eq 0 ] && echo "PASS" || echo "FAIL")"
echo "Security: $([ $TFSEC_EXIT -eq 0 ] && echo "PASS" || echo "FAIL")"
echo "Documentation: $([ $DOCS_EXIT -eq 0 ] && echo "PASS" || echo "FAIL")"

OVERALL_EXIT=$((FORMAT_EXIT + VALIDATE_EXIT + TFLINT_EXIT + TFSEC_EXIT + DOCS_EXIT))
exit $OVERALL_EXIT

What’s Next

Static analysis provides the foundation for infrastructure code quality, but it can only catch certain types of issues. To validate that your infrastructure actually works as intended, you need testing strategies that go beyond syntax checking.

In the next part, we’ll explore unit testing strategies for Terraform modules, including techniques for testing logic without creating real cloud resources.

Unit Testing Strategies

Unit testing Terraform modules presents unique challenges since infrastructure code ultimately creates real cloud resources. However, you can test much of your Terraform logic—variable validation, conditional expressions, and output calculations—without provisioning actual infrastructure. These techniques catch logic errors early and make your modules more reliable.

This part covers strategies for testing Terraform modules in isolation, validating configuration logic, and ensuring your modules behave correctly across different input scenarios.

Testing Module Logic with Validation

Terraform’s validation blocks provide the first line of defense for unit testing:

# modules/vpc/variables.tf
variable "cidr_block" {
  description = "CIDR block for the VPC"
  type        = string
  
  validation {
    condition = can(cidrhost(var.cidr_block, 0))
    error_message = "The cidr_block must be a valid CIDR block."
  }
  
  validation {
    condition = can(regex("^10\\.|^172\\.(1[6-9]|2[0-9]|3[0-1])\\.|^192\\.168\\.", var.cidr_block))
    error_message = "The cidr_block must use private IP address space (10.x.x.x, 172.16-31.x.x, or 192.168.x.x)."
  }
}

variable "availability_zones" {
  description = "List of availability zones"
  type        = list(string)
  
  validation {
    condition = length(var.availability_zones) >= 2
    error_message = "At least 2 availability zones must be specified for high availability."
  }
  
  validation {
    condition = length(var.availability_zones) <= 6
    error_message = "Maximum of 6 availability zones supported."
  }
}

variable "environment" {
  description = "Environment name"
  type        = string
  
  validation {
    condition = contains(["dev", "staging", "prod"], var.environment)
    error_message = "Environment must be one of: dev, staging, prod."
  }
}

variable "subnet_configuration" {
  description = "Subnet configuration"
  type = object({
    public_subnets  = list(string)
    private_subnets = list(string)
  })
  
  validation {
    condition = length(var.subnet_configuration.public_subnets) == length(var.subnet_configuration.private_subnets)
    error_message = "Number of public and private subnets must be equal."
  }
  
  validation {
    condition = alltrue([
      for cidr in concat(var.subnet_configuration.public_subnets, var.subnet_configuration.private_subnets) :
      can(cidrhost(cidr, 0))
    ])
    error_message = "All subnet CIDR blocks must be valid."
  }
}

Testing with Terraform Plan

Use terraform plan to test module logic without creating resources:

#!/bin/bash
# test-module.sh

set -e

MODULE_DIR="modules/vpc"
TEST_DIR="test/unit"

# Create test directory
mkdir -p "$TEST_DIR"

# Test case 1: Valid configuration
cat > "$TEST_DIR/valid-config.tf" << EOF
module "vpc_test" {
  source = "../../$MODULE_DIR"
  
  name               = "test-vpc"
  cidr_block         = "10.0.0.0/16"
  availability_zones = ["us-west-2a", "us-west-2b"]
  environment        = "dev"
  
  subnet_configuration = {
    public_subnets  = ["10.0.1.0/24", "10.0.2.0/24"]
    private_subnets = ["10.0.11.0/24", "10.0.12.0/24"]
  }
}

output "test_outputs" {
  value = {
    vpc_id = module.vpc_test.vpc_id
    public_subnet_ids = module.vpc_test.public_subnet_ids
    private_subnet_ids = module.vpc_test.private_subnet_ids
  }
}
EOF

# Test case 2: Invalid CIDR
cat > "$TEST_DIR/invalid-cidr.tf" << EOF
module "vpc_test_invalid" {
  source = "../../$MODULE_DIR"
  
  name               = "test-vpc"
  cidr_block         = "invalid-cidr"
  availability_zones = ["us-west-2a", "us-west-2b"]
  environment        = "dev"
  
  subnet_configuration = {
    public_subnets  = ["10.0.1.0/24", "10.0.2.0/24"]
    private_subnets = ["10.0.11.0/24", "10.0.12.0/24"]
  }
}
EOF

echo "Testing valid configuration..."
cd "$TEST_DIR"
terraform init -backend=false
terraform validate
terraform plan -out=valid.tfplan

echo "Testing invalid configuration..."
if terraform validate -var-file=<(echo 'cidr_block = "invalid-cidr"') 2>/dev/null; then
  echo "ERROR: Invalid configuration should have failed validation"
  exit 1
else
  echo "SUCCESS: Invalid configuration correctly rejected"
fi

echo "All unit tests passed!"

Testing Local Values and Expressions

Test complex local value calculations:

# modules/networking/locals-test.tf
locals {
  # Test subnet CIDR calculation
  test_vpc_cidr = "10.0.0.0/16"
  test_az_count = 3
  
  # Calculate subnet CIDRs
  public_subnet_cidrs = [
    for i in range(local.test_az_count) :
    cidrsubnet(local.test_vpc_cidr, 8, i + 1)
  ]
  
  private_subnet_cidrs = [
    for i in range(local.test_az_count) :
    cidrsubnet(local.test_vpc_cidr, 8, i + 11)
  ]
  
  # Test naming conventions
  resource_names = {
    for i in range(local.test_az_count) :
    "subnet-${i}" => {
      public  = "public-subnet-${i + 1}"
      private = "private-subnet-${i + 1}"
    }
  }
  
  # Test conditional logic
  environment_config = {
    dev = {
      instance_type = "t3.micro"
      min_size     = 1
      max_size     = 3
    }
    prod = {
      instance_type = "t3.large"
      min_size     = 3
      max_size     = 10
    }
  }
  
  selected_config = local.environment_config[var.environment]
}

# Output calculated values for testing
output "calculated_subnets" {
  value = {
    public_cidrs  = local.public_subnet_cidrs
    private_cidrs = local.private_subnet_cidrs
  }
}

output "resource_names" {
  value = local.resource_names
}

output "environment_config" {
  value = local.selected_config
}

Mock Testing with Null Resources

Use null resources to test logic without creating real infrastructure:

# test/unit/mock-test.tf
variable "test_scenarios" {
  description = "Test scenarios for validation"
  type = map(object({
    environment = string
    region     = string
    az_count   = number
  }))
  
  default = {
    scenario_1 = {
      environment = "dev"
      region     = "us-west-2"
      az_count   = 2
    }
    scenario_2 = {
      environment = "prod"
      region     = "us-east-1"
      az_count   = 3
    }
  }
}

# Mock data sources
locals {
  mock_availability_zones = {
    "us-west-2" = ["us-west-2a", "us-west-2b", "us-west-2c"]
    "us-east-1" = ["us-east-1a", "us-east-1b", "us-east-1c"]
  }
}

# Test module logic with null resources
resource "null_resource" "test_scenarios" {
  for_each = var.test_scenarios
  
  triggers = {
    environment = each.value.environment
    region     = each.value.region
    az_count   = each.value.az_count
    
    # Test subnet calculation
    vpc_cidr = "10.0.0.0/16"
    public_subnets = jsonencode([
      for i in range(each.value.az_count) :
      cidrsubnet("10.0.0.0/16", 8, i + 1)
    ])
    private_subnets = jsonencode([
      for i in range(each.value.az_count) :
      cidrsubnet("10.0.0.0/16", 8, i + 11)
    ])
    
    # Test naming
    resource_prefix = "${each.value.environment}-${each.value.region}"
    
    # Test availability zones
    selected_azs = jsonencode(slice(
      local.mock_availability_zones[each.value.region],
      0,
      each.value.az_count
    ))
  }
}

output "test_results" {
  value = {
    for scenario, resource in null_resource.test_scenarios :
    scenario => {
      environment     = resource.triggers.environment
      region         = resource.triggers.region
      vpc_cidr       = resource.triggers.vpc_cidr
      public_subnets = jsondecode(resource.triggers.public_subnets)
      private_subnets = jsondecode(resource.triggers.private_subnets)
      resource_prefix = resource.triggers.resource_prefix
      selected_azs   = jsondecode(resource.triggers.selected_azs)
    }
  }
}

Testing with Terraform Console

Use Terraform console for interactive testing:

# test-console.sh
#!/bin/bash

# Start terraform console with test variables
terraform console << 'EOF'
# Test CIDR calculations
cidrsubnet("10.0.0.0/16", 8, 1)
cidrsubnet("10.0.0.0/16", 8, 11)

# Test list operations
[for i in range(3) : "subnet-${i + 1}"]

# Test conditional expressions
"dev" == "prod" ? "t3.large" : "t3.micro"

# Test validation functions
can(cidrhost("10.0.0.0/16", 0))
can(cidrhost("invalid-cidr", 0))

# Test string operations
replace("my-resource-name", "-", "_")
upper("environment")
lower("PRODUCTION")

# Test map operations
merge({"a" = 1}, {"b" = 2})

# Test complex expressions
{
  for env in ["dev", "staging", "prod"] :
  env => {
    instance_type = env == "prod" ? "t3.large" : "t3.micro"
    min_size     = env == "prod" ? 3 : 1
  }
}
EOF

Automated Unit Test Suite

Create an automated test suite for your modules:

#!/usr/bin/env python3
# test_terraform_modules.py

import subprocess
import json
import os
import tempfile
import shutil
from pathlib import Path

class TerraformModuleTester:
    def __init__(self, module_path):
        self.module_path = Path(module_path)
        self.test_dir = None
    
    def setup_test_environment(self):
        """Create temporary test directory"""
        self.test_dir = Path(tempfile.mkdtemp())
        return self.test_dir
    
    def cleanup_test_environment(self):
        """Clean up temporary test directory"""
        if self.test_dir and self.test_dir.exists():
            shutil.rmtree(self.test_dir)
    
    def create_test_config(self, config_content):
        """Create test configuration file"""
        config_file = self.test_dir / "test.tf"
        config_file.write_text(config_content)
        return config_file
    
    def run_terraform_command(self, command, cwd=None):
        """Run terraform command and return result"""
        if cwd is None:
            cwd = self.test_dir
        
        try:
            result = subprocess.run(
                ["terraform"] + command,
                cwd=cwd,
                capture_output=True,
                text=True,
                check=True
            )
            return {"success": True, "stdout": result.stdout, "stderr": result.stderr}
        except subprocess.CalledProcessError as e:
            return {"success": False, "stdout": e.stdout, "stderr": e.stderr}
    
    def test_valid_configuration(self, config):
        """Test that valid configuration passes validation"""
        self.setup_test_environment()
        try:
            self.create_test_config(config)
            
            # Initialize
            init_result = self.run_terraform_command(["init", "-backend=false"])
            if not init_result["success"]:
                return False, f"Init failed: {init_result['stderr']}"
            
            # Validate
            validate_result = self.run_terraform_command(["validate"])
            if not validate_result["success"]:
                return False, f"Validation failed: {validate_result['stderr']}"
            
            # Plan
            plan_result = self.run_terraform_command(["plan", "-out=test.tfplan"])
            if not plan_result["success"]:
                return False, f"Plan failed: {plan_result['stderr']}"
            
            return True, "Configuration is valid"
        
        finally:
            self.cleanup_test_environment()
    
    def test_invalid_configuration(self, config, expected_error=None):
        """Test that invalid configuration fails validation"""
        self.setup_test_environment()
        try:
            self.create_test_config(config)
            
            # Initialize
            init_result = self.run_terraform_command(["init", "-backend=false"])
            if not init_result["success"]:
                return True, f"Init correctly failed: {init_result['stderr']}"
            
            # Validate
            validate_result = self.run_terraform_command(["validate"])
            if not validate_result["success"]:
                if expected_error and expected_error in validate_result["stderr"]:
                    return True, f"Validation correctly failed with expected error"
                return True, f"Validation correctly failed: {validate_result['stderr']}"
            
            return False, "Configuration should have failed validation"
        
        finally:
            self.cleanup_test_environment()
    
    def test_output_values(self, config, expected_outputs):
        """Test that outputs match expected values"""
        self.setup_test_environment()
        try:
            self.create_test_config(config)
            
            # Initialize and plan
            self.run_terraform_command(["init", "-backend=false"])
            plan_result = self.run_terraform_command(["plan", "-out=test.tfplan"])
            
            if not plan_result["success"]:
                return False, f"Plan failed: {plan_result['stderr']}"
            
            # Get planned outputs
            show_result = self.run_terraform_command(["show", "-json", "test.tfplan"])
            if not show_result["success"]:
                return False, f"Show failed: {show_result['stderr']}"
            
            plan_data = json.loads(show_result["stdout"])
            planned_outputs = plan_data.get("planned_values", {}).get("outputs", {})
            
            # Compare outputs
            for output_name, expected_value in expected_outputs.items():
                if output_name not in planned_outputs:
                    return False, f"Output '{output_name}' not found"
                
                actual_value = planned_outputs[output_name]["value"]
                if actual_value != expected_value:
                    return False, f"Output '{output_name}': expected {expected_value}, got {actual_value}"
            
            return True, "All outputs match expected values"
        
        finally:
            self.cleanup_test_environment()

# Test cases
def test_vpc_module():
    tester = TerraformModuleTester("modules/vpc")
    
    # Test valid configuration
    valid_config = '''
    module "vpc_test" {
      source = "../../modules/vpc"
      
      name               = "test-vpc"
      cidr_block         = "10.0.0.0/16"
      availability_zones = ["us-west-2a", "us-west-2b"]
      environment        = "dev"
    }
    
    output "vpc_cidr" {
      value = module.vpc_test.vpc_cidr_block
    }
    '''
    
    success, message = tester.test_valid_configuration(valid_config)
    print(f"Valid configuration test: {'PASS' if success else 'FAIL'} - {message}")
    
    # Test invalid CIDR
    invalid_config = '''
    module "vpc_test" {
      source = "../../modules/vpc"
      
      name               = "test-vpc"
      cidr_block         = "invalid-cidr"
      availability_zones = ["us-west-2a", "us-west-2b"]
      environment        = "dev"
    }
    '''
    
    success, message = tester.test_invalid_configuration(invalid_config, "valid CIDR block")
    print(f"Invalid CIDR test: {'PASS' if success else 'FAIL'} - {message}")

if __name__ == "__main__":
    test_vpc_module()

Property-Based Testing

Use property-based testing for comprehensive validation:

#!/usr/bin/env python3
# property_based_tests.py

import hypothesis
from hypothesis import given, strategies as st
import ipaddress
import subprocess
import tempfile
import json

# Property-based test for CIDR calculations
@given(
    vpc_cidr=st.from_regex(r"10\.\d{1,3}\.\d{1,3}\.0/16"),
    subnet_count=st.integers(min_value=1, max_value=10)
)
def test_subnet_cidr_calculation(vpc_cidr, subnet_count):
    """Test that subnet CIDR calculations are valid"""
    
    # Validate VPC CIDR
    try:
        vpc_network = ipaddress.IPv4Network(vpc_cidr)
    except ValueError:
        return  # Skip invalid CIDR
    
    # Calculate subnet CIDRs (simulating Terraform logic)
    subnet_cidrs = []
    for i in range(subnet_count):
        try:
            subnet = list(vpc_network.subnets(new_prefix=24))[i]
            subnet_cidrs.append(str(subnet))
        except IndexError:
            break  # Not enough subnets available
    
    # Verify all subnets are within VPC CIDR
    for subnet_cidr in subnet_cidrs:
        subnet_network = ipaddress.IPv4Network(subnet_cidr)
        assert subnet_network.subnet_of(vpc_network), f"Subnet {subnet_cidr} not within VPC {vpc_cidr}"
    
    # Verify no subnet overlap
    for i, subnet1 in enumerate(subnet_cidrs):
        for subnet2 in subnet_cidrs[i+1:]:
            net1 = ipaddress.IPv4Network(subnet1)
            net2 = ipaddress.IPv4Network(subnet2)
            assert not net1.overlaps(net2), f"Subnets {subnet1} and {subnet2} overlap"

# Property-based test for resource naming
@given(
    environment=st.sampled_from(["dev", "staging", "prod"]),
    region=st.sampled_from(["us-west-2", "us-east-1", "eu-west-1"]),
    resource_type=st.sampled_from(["vpc", "subnet", "sg", "instance"])
)
def test_resource_naming_convention(environment, region, resource_type):
    """Test that resource names follow conventions"""
    
    # Simulate Terraform naming logic
    resource_name = f"{environment}-{region}-{resource_type}"
    
    # Verify naming conventions
    assert len(resource_name) <= 63, "Resource name too long"
    assert resource_name.replace("-", "").replace("_", "").isalnum(), "Resource name contains invalid characters"
    assert not resource_name.startswith("-"), "Resource name cannot start with hyphen"
    assert not resource_name.endswith("-"), "Resource name cannot end with hyphen"

if __name__ == "__main__":
    # Run property-based tests
    test_subnet_cidr_calculation()
    test_resource_naming_convention()
    print("All property-based tests passed!")

What’s Next

Unit testing strategies help you catch logic errors and validate module behavior without provisioning real infrastructure. However, some issues only surface when your modules interact with actual cloud services and real network conditions.

In the next part, we’ll explore integration testing with Terratest and other tools that provision real cloud resources to validate that your infrastructure works correctly in practice.

Integration Testing

Integration testing validates that your Terraform modules work correctly with real cloud resources, handling the complexity of actual API interactions, network configurations, and service dependencies. While unit tests catch logic errors, integration tests ensure your infrastructure actually functions as intended in real environments.

This part covers comprehensive integration testing strategies using Terratest, custom testing frameworks, and cloud-native testing approaches.

Terratest Fundamentals

Terratest is the most popular framework for testing Terraform modules with real infrastructure:

// test/integration/vpc_test.go
package test

import (
    "testing"
    "github.com/gruntwork-io/terratest/modules/terraform"
    "github.com/gruntwork-io/terratest/modules/aws"
    "github.com/stretchr/testify/assert"
)

func TestVPCModule(t *testing.T) {
    t.Parallel()
    
    // Pick a random AWS region to test in
    awsRegion := aws.GetRandomStableRegion(t, nil, nil)
    
    terraformOptions := &terraform.Options{
        TerraformDir: "../modules/vpc",
        
        Vars: map[string]interface{}{
            "name":               "test-vpc",
            "cidr_block":         "10.0.0.0/16",
            "availability_zones": []string{awsRegion + "a", awsRegion + "b"},
            "environment":        "test",
        },
        
        EnvVars: map[string]string{
            "AWS_DEFAULT_REGION": awsRegion,
        },
    }
    
    // Clean up resources with "defer"
    defer terraform.Destroy(t, terraformOptions)
    
    // Deploy the infrastructure
    terraform.InitAndApply(t, terraformOptions)
    
    // Validate the infrastructure
    vpcId := terraform.Output(t, terraformOptions, "vpc_id")
    assert.NotEmpty(t, vpcId)
    
    publicSubnetIds := terraform.OutputList(t, terraformOptions, "public_subnet_ids")
    assert.Len(t, publicSubnetIds, 2)
    
    privateSubnetIds := terraform.OutputList(t, terraformOptions, "private_subnet_ids")
    assert.Len(t, privateSubnetIds, 2)
    
    // Validate VPC properties using AWS SDK
    vpc := aws.GetVpcById(t, vpcId, awsRegion)
    assert.Equal(t, "10.0.0.0/16", *vpc.CidrBlock)
    assert.True(t, *vpc.EnableDnsHostnames)
    assert.True(t, *vpc.EnableDnsSupport)
    
    // Validate subnets
    for _, subnetId := range publicSubnetIds {
        subnet := aws.GetSubnetById(t, subnetId, awsRegion)
        assert.True(t, *subnet.MapPublicIpOnLaunch)
        assert.Contains(t, []string{awsRegion + "a", awsRegion + "b"}, *subnet.AvailabilityZone)
    }
    
    for _, subnetId := range privateSubnetIds {
        subnet := aws.GetSubnetById(t, subnetId, awsRegion)
        assert.False(t, *subnet.MapPublicIpOnLaunch)
    }
}

Testing Complex Infrastructure

Test complete application stacks with multiple components:

// test/integration/complete_app_test.go
package test

import (
    "fmt"
    "testing"
    "time"
    "github.com/gruntwork-io/terratest/modules/terraform"
    "github.com/gruntwork-io/terratest/modules/aws"
    "github.com/gruntwork-io/terratest/modules/http-helper"
    "github.com/gruntwork-io/terratest/modules/retry"
    "github.com/stretchr/testify/assert"
)

func TestCompleteApplication(t *testing.T) {
    t.Parallel()
    
    awsRegion := aws.GetRandomStableRegion(t, nil, nil)
    
    terraformOptions := &terraform.Options{
        TerraformDir: "../examples/complete-app",
        
        Vars: map[string]interface{}{
            "name":        "test-app",
            "environment": "test",
            "region":      awsRegion,
        },
        
        EnvVars: map[string]string{
            "AWS_DEFAULT_REGION": awsRegion,
        },
    }
    
    defer terraform.Destroy(t, terraformOptions)
    terraform.InitAndApply(t, terraformOptions)
    
    // Test VPC
    vpcId := terraform.Output(t, terraformOptions, "vpc_id")
    assert.NotEmpty(t, vpcId)
    
    // Test RDS
    dbEndpoint := terraform.Output(t, terraformOptions, "database_endpoint")
    assert.NotEmpty(t, dbEndpoint)
    
    // Test Load Balancer
    albDnsName := terraform.Output(t, terraformOptions, "load_balancer_dns_name")
    assert.NotEmpty(t, albDnsName)
    
    // Test application health endpoint
    url := fmt.Sprintf("http://%s/health", albDnsName)
    
    // Retry the health check as the application may take time to start
    retry.DoWithRetry(t, "Check application health", 30, 10*time.Second, func() (string, error) {
        statusCode, body := http_helper.HttpGet(t, url, nil)
        if statusCode != 200 {
            return "", fmt.Errorf("Expected status 200, got %d", statusCode)
        }
        
        assert.Contains(t, body, "healthy")
        return body, nil
    })
    
    // Test database connectivity through application
    dbTestUrl := fmt.Sprintf("http://%s/db-test", albDnsName)
    statusCode, body := http_helper.HttpGet(t, dbTestUrl, nil)
    assert.Equal(t, 200, statusCode)
    assert.Contains(t, body, "database_connected")
}

Testing with Multiple Environments

Test modules across different environment configurations:

// test/integration/multi_environment_test.go
package test

import (
    "testing"
    "github.com/gruntwork-io/terratest/modules/terraform"
    "github.com/gruntwork-io/terratest/modules/aws"
    "github.com/stretchr/testify/assert"
)

func TestMultiEnvironmentDeployment(t *testing.T) {
    environments := []struct {
        name         string
        instanceType string
        minSize      int
        maxSize      int
    }{
        {"dev", "t3.micro", 1, 3},
        {"staging", "t3.small", 2, 5},
        {"prod", "t3.medium", 3, 10},
    }
    
    for _, env := range environments {
        env := env // Capture range variable
        t.Run(env.name, func(t *testing.T) {
            t.Parallel()
            
            awsRegion := aws.GetRandomStableRegion(t, nil, nil)
            
            terraformOptions := &terraform.Options{
                TerraformDir: "../modules/auto-scaling-group",
                
                Vars: map[string]interface{}{
                    "name":          fmt.Sprintf("test-asg-%s", env.name),
                    "environment":   env.name,
                    "instance_type": env.instanceType,
                    "min_size":      env.minSize,
                    "max_size":      env.maxSize,
                    "vpc_id":        getTestVpcId(t, awsRegion),
                    "subnet_ids":    getTestSubnetIds(t, awsRegion),
                },
                
                EnvVars: map[string]string{
                    "AWS_DEFAULT_REGION": awsRegion,
                },
            }
            
            defer terraform.Destroy(t, terraformOptions)
            terraform.InitAndApply(t, terraformOptions)
            
            // Validate Auto Scaling Group
            asgName := terraform.Output(t, terraformOptions, "asg_name")
            asg := aws.GetAsgByName(t, asgName, awsRegion)
            
            assert.Equal(t, int64(env.minSize), *asg.MinSize)
            assert.Equal(t, int64(env.maxSize), *asg.MaxSize)
            
            // Validate Launch Template
            launchTemplateId := terraform.Output(t, terraformOptions, "launch_template_id")
            launchTemplate := aws.GetLaunchTemplate(t, launchTemplateId, awsRegion)
            
            assert.Equal(t, env.instanceType, *launchTemplate.LaunchTemplateData.InstanceType)
        })
    }
}

Testing Security Configurations

Validate security group rules and IAM policies:

// test/integration/security_test.go
package test

import (
    "testing"
    "github.com/gruntwork-io/terratest/modules/terraform"
    "github.com/gruntwork-io/terratest/modules/aws"
    "github.com/stretchr/testify/assert"
)

func TestSecurityGroupConfiguration(t *testing.T) {
    t.Parallel()
    
    awsRegion := aws.GetRandomStableRegion(t, nil, nil)
    
    terraformOptions := &terraform.Options{
        TerraformDir: "../modules/web-security-group",
        
        Vars: map[string]interface{}{
            "name":   "test-web-sg",
            "vpc_id": getTestVpcId(t, awsRegion),
            "allowed_cidr_blocks": []string{"10.0.0.0/8"},
        },
        
        EnvVars: map[string]string{
            "AWS_DEFAULT_REGION": awsRegion,
        },
    }
    
    defer terraform.Destroy(t, terraformOptions)
    terraform.InitAndApply(t, terraformOptions)
    
    // Get security group
    sgId := terraform.Output(t, terraformOptions, "security_group_id")
    sg := aws.GetSecurityGroupById(t, sgId, awsRegion)
    
    // Validate ingress rules
    assert.Len(t, sg.IpPermissions, 2) // HTTP and HTTPS
    
    for _, rule := range sg.IpPermissions {
        if *rule.FromPort == 80 {
            assert.Equal(t, int64(80), *rule.ToPort)
            assert.Equal(t, "tcp", *rule.IpProtocol)
            assert.Len(t, rule.IpRanges, 1)
            assert.Equal(t, "10.0.0.0/8", *rule.IpRanges[0].CidrIp)
        } else if *rule.FromPort == 443 {
            assert.Equal(t, int64(443), *rule.ToPort)
            assert.Equal(t, "tcp", *rule.IpProtocol)
        }
    }
    
    // Validate egress rules
    assert.Len(t, sg.IpPermissionsEgress, 1)
    egressRule := sg.IpPermissionsEgress[0]
    assert.Equal(t, int64(0), *egressRule.FromPort)
    assert.Equal(t, int64(0), *egressRule.ToPort)
    assert.Equal(t, "-1", *egressRule.IpProtocol)
}

func TestIAMRoleConfiguration(t *testing.T) {
    t.Parallel()
    
    terraformOptions := &terraform.Options{
        TerraformDir: "../modules/iam-role",
        
        Vars: map[string]interface{}{
            "role_name": "test-role",
            "service":   "ec2.amazonaws.com",
            "policies": []string{
                "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess",
            },
        },
    }
    
    defer terraform.Destroy(t, terraformOptions)
    terraform.InitAndApply(t, terraformOptions)
    
    // Validate IAM role
    roleName := terraform.Output(t, terraformOptions, "role_name")
    role := aws.GetIamRole(t, roleName)
    
    assert.Equal(t, roleName, *role.RoleName)
    assert.Contains(t, *role.AssumeRolePolicyDocument, "ec2.amazonaws.com")
    
    // Validate attached policies
    attachedPolicies := aws.GetIamRoleAttachedPolicies(t, roleName)
    assert.Len(t, attachedPolicies, 1)
    assert.Equal(t, "AmazonS3ReadOnlyAccess", *attachedPolicies[0].PolicyName)
}

Performance and Load Testing

Test infrastructure under load:

// test/integration/performance_test.go
package test

import (
    "fmt"
    "testing"
    "time"
    "sync"
    "github.com/gruntwork-io/terratest/modules/terraform"
    "github.com/gruntwork-io/terratest/modules/http-helper"
    "github.com/stretchr/testify/assert"
)

func TestLoadBalancerPerformance(t *testing.T) {
    t.Parallel()
    
    terraformOptions := &terraform.Options{
        TerraformDir: "../examples/load-balanced-app",
        
        Vars: map[string]interface{}{
            "name":         "perf-test-app",
            "environment":  "test",
            "min_capacity": 3,
            "max_capacity": 10,
        },
    }
    
    defer terraform.Destroy(t, terraformOptions)
    terraform.InitAndApply(t, terraformOptions)
    
    // Get load balancer DNS name
    albDnsName := terraform.Output(t, terraformOptions, "load_balancer_dns_name")
    url := fmt.Sprintf("http://%s/", albDnsName)
    
    // Wait for application to be ready
    http_helper.HttpGetWithRetry(t, url, nil, 200, "OK", 30, 10*time.Second)
    
    // Perform load test
    concurrentRequests := 50
    requestsPerWorker := 20
    
    var wg sync.WaitGroup
    results := make(chan int, concurrentRequests*requestsPerWorker)
    
    for i := 0; i < concurrentRequests; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < requestsPerWorker; j++ {
                statusCode, _ := http_helper.HttpGet(t, url, nil)
                results <- statusCode
            }
        }()
    }
    
    wg.Wait()
    close(results)
    
    // Analyze results
    successCount := 0
    totalRequests := 0
    
    for statusCode := range results {
        totalRequests++
        if statusCode == 200 {
            successCount++
        }
    }
    
    successRate := float64(successCount) / float64(totalRequests)
    assert.GreaterOrEqual(t, successRate, 0.95, "Success rate should be at least 95%")
    
    t.Logf("Load test results: %d/%d requests successful (%.2f%%)", 
           successCount, totalRequests, successRate*100)
}

Custom Testing Framework

Build a custom testing framework for specific needs:

#!/usr/bin/env python3
# custom_terraform_tester.py

import boto3
import subprocess
import json
import time
import requests
from typing import Dict, List, Any, Optional

class TerraformIntegrationTester:
    def __init__(self, terraform_dir: str, aws_region: str = "us-west-2"):
        self.terraform_dir = terraform_dir
        self.aws_region = aws_region
        self.aws_session = boto3.Session(region_name=aws_region)
        self.outputs = {}
        
    def deploy(self, variables: Dict[str, Any]) -> bool:
        """Deploy infrastructure with Terraform"""
        try:
            # Create tfvars file
            tfvars_content = "\n".join([
                f'{key} = {json.dumps(value)}' 
                for key, value in variables.items()
            ])
            
            with open(f"{self.terraform_dir}/test.tfvars", "w") as f:
                f.write(tfvars_content)
            
            # Initialize
            subprocess.run(
                ["terraform", "init"],
                cwd=self.terraform_dir,
                check=True,
                capture_output=True
            )
            
            # Apply
            result = subprocess.run(
                ["terraform", "apply", "-var-file=test.tfvars", "-auto-approve"],
                cwd=self.terraform_dir,
                check=True,
                capture_output=True,
                text=True
            )
            
            # Get outputs
            output_result = subprocess.run(
                ["terraform", "output", "-json"],
                cwd=self.terraform_dir,
                check=True,
                capture_output=True,
                text=True
            )
            
            self.outputs = json.loads(output_result.stdout)
            return True
            
        except subprocess.CalledProcessError as e:
            print(f"Terraform deployment failed: {e.stderr}")
            return False
    
    def destroy(self) -> bool:
        """Destroy infrastructure"""
        try:
            subprocess.run(
                ["terraform", "destroy", "-var-file=test.tfvars", "-auto-approve"],
                cwd=self.terraform_dir,
                check=True,
                capture_output=True
            )
            return True
        except subprocess.CalledProcessError as e:
            print(f"Terraform destroy failed: {e.stderr}")
            return False
    
    def get_output(self, key: str) -> Any:
        """Get Terraform output value"""
        return self.outputs.get(key, {}).get("value")
    
    def test_vpc_configuration(self, expected_cidr: str) -> bool:
        """Test VPC configuration"""
        vpc_id = self.get_output("vpc_id")
        if not vpc_id:
            return False
        
        ec2 = self.aws_session.client("ec2")
        response = ec2.describe_vpcs(VpcIds=[vpc_id])
        
        if not response["Vpcs"]:
            return False
        
        vpc = response["Vpcs"][0]
        return vpc["CidrBlock"] == expected_cidr
    
    def test_application_health(self, timeout: int = 300) -> bool:
        """Test application health endpoint"""
        load_balancer_dns = self.get_output("load_balancer_dns_name")
        if not load_balancer_dns:
            return False
        
        url = f"http://{load_balancer_dns}/health"
        
        start_time = time.time()
        while time.time() - start_time < timeout:
            try:
                response = requests.get(url, timeout=10)
                if response.status_code == 200:
                    return True
            except requests.RequestException:
                pass
            
            time.sleep(10)
        
        return False
    
    def test_database_connectivity(self) -> bool:
        """Test database connectivity"""
        db_endpoint = self.get_output("database_endpoint")
        if not db_endpoint:
            return False
        
        # This would typically involve connecting to the database
        # and running a simple query
        rds = self.aws_session.client("rds")
        
        try:
            response = rds.describe_db_instances()
            for db in response["DBInstances"]:
                if db["Endpoint"]["Address"] == db_endpoint:
                    return db["DBInstanceStatus"] == "available"
        except Exception as e:
            print(f"Database connectivity test failed: {e}")
        
        return False

# Example usage
def test_complete_application():
    tester = TerraformIntegrationTester("../examples/complete-app")
    
    variables = {
        "name": "integration-test",
        "environment": "test",
        "instance_type": "t3.micro",
        "min_size": 2,
        "max_size": 4
    }
    
    try:
        # Deploy
        assert tester.deploy(variables), "Deployment failed"
        
        # Test VPC
        assert tester.test_vpc_configuration("10.0.0.0/16"), "VPC test failed"
        
        # Test application
        assert tester.test_application_health(), "Application health test failed"
        
        # Test database
        assert tester.test_database_connectivity(), "Database test failed"
        
        print("All integration tests passed!")
        
    finally:
        # Clean up
        tester.destroy()

if __name__ == "__main__":
    test_complete_application()

What’s Next

Integration testing validates that your infrastructure works correctly with real cloud resources, but ensuring compliance and implementing governance requires policy-based validation that goes beyond functional testing.

In the next part, we’ll explore policy as code using Open Policy Agent (OPA) and Sentinel to implement automated governance, compliance validation, and security policy enforcement for your Terraform configurations.

Policy as Code

Policy as code transforms governance from manual reviews to automated enforcement, ensuring that infrastructure changes comply with organizational standards before they reach production. Open Policy Agent (OPA) and HashiCorp Sentinel provide powerful frameworks for implementing policy validation that integrates seamlessly with Terraform workflows.

This part covers implementing comprehensive policy frameworks that enforce security, compliance, and operational standards across your Terraform configurations.

Open Policy Agent (OPA) Fundamentals

OPA uses the Rego language to define policies that can evaluate JSON data:

# policies/terraform/security.rego
package terraform.security

# Deny resources that allow unrestricted SSH access
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_security_group_rule"
    resource.values.type == "ingress"
    resource.values.from_port <= 22
    resource.values.to_port >= 22
    resource.values.cidr_blocks[_] == "0.0.0.0/0"
    
    msg := sprintf("Security group rule allows SSH from anywhere: %v", [resource.address])
}

# Require encryption for S3 buckets
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_s3_bucket"
    not has_encryption(resource)
    
    msg := sprintf("S3 bucket must have encryption enabled: %v", [resource.address])
}

has_encryption(resource) {
    resource.values.server_side_encryption_configuration[_]
}

# Require specific tags
required_tags := ["Environment", "Project", "Owner"]

deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_instance"
    missing_tags := required_tags - object.get(resource.values, "tags", {})
    count(missing_tags) > 0
    
    msg := sprintf("Resource missing required tags %v: %v", [missing_tags, resource.address])
}

# Cost control policies
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_instance"
    expensive_types := ["m5.4xlarge", "m5.8xlarge", "m5.12xlarge", "m5.16xlarge"]
    resource.values.instance_type in expensive_types
    
    msg := sprintf("Instance type %v is not allowed: %v", [resource.values.instance_type, resource.address])
}

Advanced OPA Policies

Implement complex governance rules:

# policies/terraform/compliance.rego
package terraform.compliance

import future.keywords.in

# GDPR compliance - ensure data residency
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type in ["aws_s3_bucket", "aws_db_instance", "aws_rds_cluster"]
    
    # Check if resource is in EU region for GDPR compliance
    provider_config := input.configuration.provider_config.aws
    region := provider_config.expressions.region.constant_value
    not startswith(region, "eu-")
    
    # Check if resource handles personal data
    tags := object.get(resource.values, "tags", {})
    tags.DataClassification in ["personal", "sensitive"]
    
    msg := sprintf("GDPR: Personal data resource must be in EU region: %v", [resource.address])
}

# SOC2 compliance - audit logging
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_s3_bucket"
    not has_access_logging(resource)
    
    msg := sprintf("SOC2: S3 bucket must have access logging enabled: %v", [resource.address])
}

has_access_logging(resource) {
    resource.values.logging[_]
}

# PCI DSS compliance - network segmentation
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_security_group_rule"
    resource.values.type == "ingress"
    
    # Check if this is a PCI environment
    tags := object.get(resource.values, "tags", {})
    tags.PCIScope == "true"
    
    # Ensure no broad network access in PCI scope
    "0.0.0.0/0" in resource.values.cidr_blocks
    
    msg := sprintf("PCI DSS: Broad network access not allowed in PCI scope: %v", [resource.address])
}

# Data retention policies
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_s3_bucket"
    
    tags := object.get(resource.values, "tags", {})
    data_retention := object.get(tags, "DataRetention", "")
    
    # Require data retention policy for certain data types
    tags.DataType in ["logs", "backups", "archives"]
    data_retention == ""
    
    msg := sprintf("Data retention policy required for %v: %v", [tags.DataType, resource.address])
}

Testing OPA Policies

Create comprehensive tests for your policies:

# policies/terraform/security_test.rego
package terraform.security

# Test SSH restriction policy
test_deny_ssh_from_anywhere {
    deny[_] with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_security_group_rule.bad_ssh",
                    "type": "aws_security_group_rule",
                    "values": {
                        "type": "ingress",
                        "from_port": 22,
                        "to_port": 22,
                        "cidr_blocks": ["0.0.0.0/0"]
                    }
                }]
            }
        }
    }
}

test_allow_ssh_from_specific_cidr {
    count(deny) == 0 with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_security_group_rule.good_ssh",
                    "type": "aws_security_group_rule",
                    "values": {
                        "type": "ingress",
                        "from_port": 22,
                        "to_port": 22,
                        "cidr_blocks": ["10.0.0.0/8"]
                    }
                }]
            }
        }
    }
}

# Test S3 encryption policy
test_deny_unencrypted_s3 {
    deny[_] with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_s3_bucket.unencrypted",
                    "type": "aws_s3_bucket",
                    "values": {
                        "bucket": "my-bucket"
                    }
                }]
            }
        }
    }
}

test_allow_encrypted_s3 {
    count(deny) == 0 with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_s3_bucket.encrypted",
                    "type": "aws_s3_bucket",
                    "values": {
                        "bucket": "my-bucket",
                        "server_side_encryption_configuration": [{
                            "rule": [{
                                "apply_server_side_encryption_by_default": [{
                                    "sse_algorithm": "AES256"
                                }]
                            }]
                        }]
                    }
                }]
            }
        }
    }
}

# Test required tags policy
test_deny_missing_required_tags {
    deny[_] with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_instance.no_tags",
                    "type": "aws_instance",
                    "values": {
                        "instance_type": "t3.micro",
                        "tags": {
                            "Name": "test-instance"
                        }
                    }
                }]
            }
        }
    }
}

test_allow_all_required_tags {
    count(deny) == 0 with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_instance.with_tags",
                    "type": "aws_instance",
                    "values": {
                        "instance_type": "t3.micro",
                        "tags": {
                            "Name": "test-instance",
                            "Environment": "dev",
                            "Project": "test-project",
                            "Owner": "team-name"
                        }
                    }
                }]
            }
        }
    }
}

Sentinel Policies

HashiCorp Sentinel provides another powerful policy framework:

# policies/sentinel/aws-security.sentinel
import "tfplan/v2" as tfplan
import "strings"

# Get all AWS security group rules
security_group_rules = filter tfplan.resource_changes as _, rc {
    rc.type is "aws_security_group_rule" and
    rc.mode is "managed" and
    (rc.change.actions contains "create" or rc.change.actions contains "update")
}

# Function to check if SSH is open to the world
ssh_open_to_world = func(rule) {
    return rule.change.after.type is "ingress" and
           rule.change.after.from_port <= 22 and
           rule.change.after.to_port >= 22 and
           "0.0.0.0/0" in rule.change.after.cidr_blocks
}

# Main rule
main = rule {
    all security_group_rules as _, rule {
        not ssh_open_to_world(rule)
    }
}

# Violation messages
violations = [
    {
        "resource": rule.address,
        "message": "SSH (port 22) should not be open to 0.0.0.0/0"
    } for rule in security_group_rules if ssh_open_to_world(rule)
]

# Print violations
print("SSH Security Violations:")
for violations as violation {
    print("  -", violation.resource, ":", violation.message)
}

Policy Integration with CI/CD

Integrate policy validation into your CI/CD pipeline:

#!/bin/bash
# scripts/policy-check.sh

set -e

TERRAFORM_DIR=${1:-"infrastructure"}
POLICY_DIR=${2:-"policies"}

echo "Running policy validation on Terraform configurations..."

# Find all Terraform directories
find "$TERRAFORM_DIR" -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
    echo "Checking policies for $dir"
    
    cd "$dir"
    
    # Generate Terraform plan
    terraform init -backend=false
    terraform plan -out=plan.tfplan
    terraform show -json plan.tfplan > plan.json
    
    # Run OPA policy evaluation
    echo "Running OPA policy checks..."
    opa eval -d "../../$POLICY_DIR" -i plan.json "data.terraform.deny[x]" --format pretty
    
    # Check if there are any policy violations
    violations=$(opa eval -d "../../$POLICY_DIR" -i plan.json "data.terraform.deny[x]" --format raw)
    
    if [ "$violations" != "[]" ]; then
        echo "❌ Policy violations found in $dir"
        echo "$violations" | jq -r '.[]'
        exit 1
    else
        echo "✅ No policy violations found in $dir"
    fi
    
    cd - > /dev/null
done

echo "All policy checks passed!"

Dynamic Policy Configuration

Create policies that adapt to different environments:

# policies/terraform/environment_policies.rego
package terraform.environment

import future.keywords.in

# Environment-specific configurations
environment_config := {
    "dev": {
        "allowed_instance_types": ["t3.micro", "t3.small"],
        "max_instance_count": 5,
        "require_encryption": false
    },
    "staging": {
        "allowed_instance_types": ["t3.small", "t3.medium", "m5.large"],
        "max_instance_count": 10,
        "require_encryption": true
    },
    "prod": {
        "allowed_instance_types": ["t3.medium", "t3.large", "m5.large", "m5.xlarge"],
        "max_instance_count": 50,
        "require_encryption": true
    }
}

# Get environment from tags or variables
get_environment(resource) = env {
    env := resource.values.tags.Environment
} else = env {
    # Fallback to terraform variables
    env := input.variables.environment.value
} else = "dev" {
    # Default to dev if no environment specified
    true
}

# Check instance type restrictions
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_instance"
    
    env := get_environment(resource)
    config := environment_config[env]
    
    not resource.values.instance_type in config.allowed_instance_types
    
    msg := sprintf("Instance type %v not allowed in %v environment. Allowed types: %v", 
                   [resource.values.instance_type, env, config.allowed_instance_types])
}

# Check instance count limits
deny[msg] {
    env := input.variables.environment.value
    config := environment_config[env]
    
    instances := [r | r := input.planned_values.root_module.resources[_]; r.type == "aws_instance"]
    count(instances) > config.max_instance_count
    
    msg := sprintf("Too many instances (%v) for %v environment. Maximum allowed: %v", 
                   [count(instances), env, config.max_instance_count])
}

# Environment-specific encryption requirements
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_s3_bucket"
    
    env := get_environment(resource)
    config := environment_config[env]
    
    config.require_encryption == true
    not has_encryption(resource)
    
    msg := sprintf("S3 bucket encryption required in %v environment: %v", [env, resource.address])
}

has_encryption(resource) {
    resource.values.server_side_encryption_configuration[_]
}

Policy Reporting and Metrics

Generate comprehensive policy compliance reports:

#!/usr/bin/env python3
# scripts/policy_report.py

import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path

class PolicyReporter:
    def __init__(self, terraform_dir, policy_dir):
        self.terraform_dir = Path(terraform_dir)
        self.policy_dir = Path(policy_dir)
        self.results = []
    
    def run_policy_check(self, tf_dir):
        """Run OPA policy check on a Terraform directory"""
        try:
            # Generate plan
            subprocess.run(
                ["terraform", "init", "-backend=false"],
                cwd=tf_dir,
                check=True,
                capture_output=True
            )
            
            subprocess.run(
                ["terraform", "plan", "-out=plan.tfplan"],
                cwd=tf_dir,
                check=True,
                capture_output=True
            )
            
            subprocess.run(
                ["terraform", "show", "-json", "plan.tfplan"],
                cwd=tf_dir,
                check=True,
                capture_output=True,
                stdout=open(tf_dir / "plan.json", "w")
            )
            
            # Run OPA evaluation
            result = subprocess.run(
                ["opa", "eval", "-d", str(self.policy_dir), 
                 "-i", str(tf_dir / "plan.json"), 
                 "data.terraform.deny[x]", "--format", "json"],
                capture_output=True,
                text=True
            )
            
            violations = json.loads(result.stdout)
            
            return {
                "directory": str(tf_dir.relative_to(self.terraform_dir)),
                "violations": violations.get("result", []),
                "status": "failed" if violations.get("result") else "passed"
            }
            
        except subprocess.CalledProcessError as e:
            return {
                "directory": str(tf_dir.relative_to(self.terraform_dir)),
                "violations": [f"Error running policy check: {e}"],
                "status": "error"
            }
    
    def generate_report(self):
        """Generate comprehensive policy compliance report"""
        # Find all Terraform directories
        tf_dirs = []
        for tf_file in self.terraform_dir.rglob("*.tf"):
            tf_dirs.append(tf_file.parent)
        
        tf_dirs = list(set(tf_dirs))  # Remove duplicates
        
        # Run policy checks
        for tf_dir in tf_dirs:
            result = self.run_policy_check(tf_dir)
            self.results.append(result)
        
        # Generate summary
        total_dirs = len(self.results)
        passed_dirs = len([r for r in self.results if r["status"] == "passed"])
        failed_dirs = len([r for r in self.results if r["status"] == "failed"])
        error_dirs = len([r for r in self.results if r["status"] == "error"])
        
        total_violations = sum(len(r["violations"]) for r in self.results)
        
        report = {
            "timestamp": datetime.now().isoformat(),
            "summary": {
                "total_directories": total_dirs,
                "passed": passed_dirs,
                "failed": failed_dirs,
                "errors": error_dirs,
                "total_violations": total_violations,
                "compliance_rate": (passed_dirs / total_dirs * 100) if total_dirs > 0 else 0
            },
            "results": self.results
        }
        
        return report
    
    def save_report(self, report, filename="policy_report.json"):
        """Save report to file"""
        with open(filename, "w") as f:
            json.dump(report, f, indent=2)
        
        print(f"Policy report saved to {filename}")
    
    def print_summary(self, report):
        """Print report summary to console"""
        summary = report["summary"]
        
        print("\n" + "="*50)
        print("POLICY COMPLIANCE REPORT")
        print("="*50)
        print(f"Timestamp: {report['timestamp']}")
        print(f"Total Directories: {summary['total_directories']}")
        print(f"Passed: {summary['passed']}")
        print(f"Failed: {summary['failed']}")
        print(f"Errors: {summary['errors']}")
        print(f"Total Violations: {summary['total_violations']}")
        print(f"Compliance Rate: {summary['compliance_rate']:.1f}%")
        
        if summary['failed'] > 0:
            print("\nFAILED DIRECTORIES:")
            for result in report['results']:
                if result['status'] == 'failed':
                    print(f"  - {result['directory']}: {len(result['violations'])} violations")
                    for violation in result['violations']:
                        print(f"    • {violation}")
        
        print("="*50)

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: python3 policy_report.py <terraform_dir> <policy_dir>")
        sys.exit(1)
    
    reporter = PolicyReporter(sys.argv[1], sys.argv[2])
    report = reporter.generate_report()
    reporter.save_report(report)
    reporter.print_summary(report)
    
    # Exit with error code if there are violations
    if report["summary"]["failed"] > 0 or report["summary"]["errors"] > 0:
        sys.exit(1)

What’s Next

Policy as code transforms infrastructure governance from reactive reviews to proactive enforcement, ensuring compliance and security standards are met before resources are created. The combination of OPA and Sentinel provides powerful frameworks for implementing comprehensive governance that scales with your organization.

In the next part, we’ll explore security and compliance testing, including vulnerability scanning, compliance validation, and automated security assessments that complement policy enforcement with deeper security analysis.

Security and Compliance

Security testing goes beyond policy validation to identify vulnerabilities, misconfigurations, and compliance gaps in your infrastructure code. Automated security scanning catches issues that manual reviews might miss, while compliance testing ensures your infrastructure meets regulatory requirements like SOC 2, GDPR, and industry-specific standards.

This part covers comprehensive security testing strategies that integrate with your Terraform workflow to identify and remediate security issues before they reach production.

Infrastructure Security Scanning

Use specialized tools to scan for security vulnerabilities:

#!/bin/bash
# scripts/security-scan.sh

set -e

TERRAFORM_DIR=${1:-"infrastructure"}
REPORT_DIR=${2:-"security-reports"}

mkdir -p "$REPORT_DIR"

echo "Running comprehensive security scan on Terraform configurations..."

# Checkov - comprehensive security scanning
echo "Running Checkov security scan..."
checkov -d "$TERRAFORM_DIR" \
    --framework terraform \
    --output cli \
    --output json \
    --output-file-path console,"$REPORT_DIR/checkov-report.json" \
    --soft-fail

# TFSec - Terraform-specific security scanner
echo "Running TFSec security scan..."
tfsec "$TERRAFORM_DIR" \
    --format json \
    --out "$REPORT_DIR/tfsec-report.json" \
    --soft-fail

# Terrascan - policy-based security scanner
echo "Running Terrascan security scan..."
terrascan scan -t terraform \
    -d "$TERRAFORM_DIR" \
    -o json \
    --output "$REPORT_DIR/terrascan-report.json" \
    --non-recursive

# Custom security checks
echo "Running custom security validations..."
python3 scripts/custom_security_checks.py \
    --terraform-dir "$TERRAFORM_DIR" \
    --output "$REPORT_DIR/custom-security.json"

echo "Security scan complete. Reports saved to $REPORT_DIR/"

Custom Security Validation

Implement organization-specific security checks:

#!/usr/bin/env python3
# scripts/custom_security_checks.py

import json
import os
import re
import argparse
from pathlib import Path

class SecurityValidator:
    def __init__(self, terraform_dir):
        self.terraform_dir = Path(terraform_dir)
        self.findings = []
    
    def check_hardcoded_secrets(self):
        """Check for hardcoded secrets in Terraform files"""
        secret_patterns = [
            (r'password\s*=\s*"[^"]{8,}"', "Hardcoded password detected"),
            (r'secret_key\s*=\s*"[A-Za-z0-9+/]{20,}"', "Hardcoded secret key detected"),
            (r'api_key\s*=\s*"[A-Za-z0-9]{20,}"', "Hardcoded API key detected"),
            (r'token\s*=\s*"[A-Za-z0-9]{20,}"', "Hardcoded token detected"),
        ]
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            for pattern, message in secret_patterns:
                matches = re.finditer(pattern, content, re.IGNORECASE)
                for match in matches:
                    line_num = content[:match.start()].count('\n') + 1
                    self.findings.append({
                        "type": "hardcoded_secret",
                        "severity": "HIGH",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": message,
                        "code": match.group(0)
                    })
    
    def check_public_resources(self):
        """Check for resources that might be publicly accessible"""
        public_patterns = [
            (r'cidr_blocks\s*=\s*\["0\.0\.0\.0/0"\]', "Resource allows access from anywhere"),
            (r'publicly_accessible\s*=\s*true', "Resource is publicly accessible"),
            (r'public_read_access\s*=\s*true', "Resource allows public read access"),
        ]
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            for pattern, message in public_patterns:
                matches = re.finditer(pattern, content, re.IGNORECASE)
                for match in matches:
                    line_num = content[:match.start()].count('\n') + 1
                    self.findings.append({
                        "type": "public_access",
                        "severity": "MEDIUM",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": message,
                        "code": match.group(0)
                    })
    
    def check_encryption_settings(self):
        """Check for missing encryption configurations"""
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check S3 buckets without encryption
            s3_buckets = re.finditer(r'resource\s+"aws_s3_bucket"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for bucket in s3_buckets:
                bucket_config = bucket.group(1)
                if "server_side_encryption_configuration" not in bucket_config:
                    line_num = content[:bucket.start()].count('\n') + 1
                    self.findings.append({
                        "type": "missing_encryption",
                        "severity": "HIGH",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": "S3 bucket missing encryption configuration",
                        "resource": bucket.group(0).split('"')[3]
                    })
            
            # Check RDS instances without encryption
            rds_instances = re.finditer(r'resource\s+"aws_db_instance"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for instance in rds_instances:
                instance_config = instance.group(1)
                if "storage_encrypted" not in instance_config or "storage_encrypted = false" in instance_config:
                    line_num = content[:instance.start()].count('\n') + 1
                    self.findings.append({
                        "type": "missing_encryption",
                        "severity": "HIGH",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": "RDS instance missing encryption",
                        "resource": instance.group(0).split('"')[3]
                    })
    
    def check_network_security(self):
        """Check for network security issues"""
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check for overly permissive security groups
            sg_rules = re.finditer(r'resource\s+"aws_security_group_rule"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for rule in sg_rules:
                rule_config = rule.group(1)
                
                # Check for SSH open to world
                if ('from_port = 22' in rule_config and 
                    'to_port = 22' in rule_config and 
                    'cidr_blocks = ["0.0.0.0/0"]' in rule_config):
                    
                    line_num = content[:rule.start()].count('\n') + 1
                    self.findings.append({
                        "type": "network_security",
                        "severity": "CRITICAL",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": "SSH port open to the world",
                        "resource": rule.group(0).split('"')[3]
                    })
                
                # Check for RDP open to world
                if ('from_port = 3389' in rule_config and 
                    'to_port = 3389' in rule_config and 
                    'cidr_blocks = ["0.0.0.0/0"]' in rule_config):
                    
                    line_num = content[:rule.start()].count('\n') + 1
                    self.findings.append({
                        "type": "network_security",
                        "severity": "CRITICAL",
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": line_num,
                        "message": "RDP port open to the world",
                        "resource": rule.group(0).split('"')[3]
                    })
    
    def run_all_checks(self):
        """Run all security checks"""
        self.check_hardcoded_secrets()
        self.check_public_resources()
        self.check_encryption_settings()
        self.check_network_security()
        
        return {
            "total_findings": len(self.findings),
            "critical": len([f for f in self.findings if f["severity"] == "CRITICAL"]),
            "high": len([f for f in self.findings if f["severity"] == "HIGH"]),
            "medium": len([f for f in self.findings if f["severity"] == "MEDIUM"]),
            "low": len([f for f in self.findings if f["severity"] == "LOW"]),
            "findings": self.findings
        }

def main():
    parser = argparse.ArgumentParser(description='Custom Terraform security validator')
    parser.add_argument('--terraform-dir', required=True, help='Terraform directory to scan')
    parser.add_argument('--output', required=True, help='Output file for results')
    
    args = parser.parse_args()
    
    validator = SecurityValidator(args.terraform_dir)
    results = validator.run_all_checks()
    
    with open(args.output, 'w') as f:
        json.dump(results, f, indent=2)
    
    print(f"Security validation complete. Found {results['total_findings']} issues.")
    print(f"Critical: {results['critical']}, High: {results['high']}, Medium: {results['medium']}, Low: {results['low']}")
    
    # Exit with error if critical or high severity issues found
    if results['critical'] > 0 or results['high'] > 0:
        exit(1)

if __name__ == "__main__":
    main()

Compliance Framework Testing

Implement automated compliance validation:

#!/usr/bin/env python3
# scripts/compliance_validator.py

import json
import re
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class ComplianceCheck:
    framework: str
    control_id: str
    description: str
    severity: str
    check_function: callable

class ComplianceValidator:
    def __init__(self, terraform_dir: str):
        self.terraform_dir = Path(terraform_dir)
        self.findings = []
        self.checks = self._initialize_checks()
    
    def _initialize_checks(self) -> List[ComplianceCheck]:
        """Initialize compliance checks for various frameworks"""
        return [
            # SOC 2 Type II checks
            ComplianceCheck(
                framework="SOC2",
                control_id="CC6.1",
                description="Logical and physical access controls",
                severity="HIGH",
                check_function=self._check_access_controls
            ),
            ComplianceCheck(
                framework="SOC2",
                control_id="CC6.7",
                description="Data transmission and disposal",
                severity="HIGH",
                check_function=self._check_data_encryption
            ),
            
            # GDPR checks
            ComplianceCheck(
                framework="GDPR",
                control_id="Art.32",
                description="Security of processing",
                severity="CRITICAL",
                check_function=self._check_data_security
            ),
            ComplianceCheck(
                framework="GDPR",
                control_id="Art.17",
                description="Right to erasure",
                severity="MEDIUM",
                check_function=self._check_data_retention
            ),
            
            # PCI DSS checks
            ComplianceCheck(
                framework="PCI_DSS",
                control_id="1.1.4",
                description="Network segmentation",
                severity="CRITICAL",
                check_function=self._check_network_segmentation
            ),
            ComplianceCheck(
                framework="PCI_DSS",
                control_id="3.4",
                description="Encryption of cardholder data",
                severity="CRITICAL",
                check_function=self._check_cardholder_data_encryption
            ),
        ]
    
    def _check_access_controls(self) -> List[Dict[str, Any]]:
        """SOC 2 - Check for proper access controls"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check for IAM policies with overly broad permissions
            iam_policies = re.finditer(r'resource\s+"aws_iam_policy"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for policy in iam_policies:
                policy_config = policy.group(1)
                if '"*"' in policy_config and '"Action"' in policy_config:
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:policy.start()].count('\n') + 1,
                        "message": "IAM policy grants overly broad permissions",
                        "resource": policy.group(0).split('"')[3]
                    })
        
        return findings
    
    def _check_data_encryption(self) -> List[Dict[str, Any]]:
        """SOC 2 - Check for data encryption in transit and at rest"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check S3 buckets for encryption
            s3_buckets = re.finditer(r'resource\s+"aws_s3_bucket"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for bucket in s3_buckets:
                bucket_config = bucket.group(1)
                if "server_side_encryption_configuration" not in bucket_config:
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:bucket.start()].count('\n') + 1,
                        "message": "S3 bucket lacks encryption at rest",
                        "resource": bucket.group(0).split('"')[3]
                    })
            
            # Check ALB listeners for HTTPS
            alb_listeners = re.finditer(r'resource\s+"aws_lb_listener"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for listener in alb_listeners:
                listener_config = listener.group(1)
                if 'protocol = "HTTP"' in listener_config and 'port = "80"' in listener_config:
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:listener.start()].count('\n') + 1,
                        "message": "Load balancer listener uses unencrypted HTTP",
                        "resource": listener.group(0).split('"')[3]
                    })
        
        return findings
    
    def _check_data_security(self) -> List[Dict[str, Any]]:
        """GDPR - Check for data security measures"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check for resources handling personal data without encryption
            resources_with_personal_data = re.finditer(
                r'resource\s+"[^"]+"\s+"[^"]+"\s*{([^}]+tags\s*=\s*{[^}]*DataClassification\s*=\s*"personal"[^}]*}[^}]*)}', 
                content, re.DOTALL
            )
            
            for resource in resources_with_personal_data:
                resource_config = resource.group(1)
                if ("encryption" not in resource_config.lower() and 
                    "kms" not in resource_config.lower()):
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:resource.start()].count('\n') + 1,
                        "message": "Resource handling personal data lacks encryption",
                        "resource": resource.group(0).split('"')[3]
                    })
        
        return findings
    
    def _check_data_retention(self) -> List[Dict[str, Any]]:
        """GDPR - Check for data retention policies"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check S3 buckets for lifecycle policies
            s3_buckets = re.finditer(r'resource\s+"aws_s3_bucket"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
            for bucket in s3_buckets:
                bucket_name = bucket.group(0).split('"')[3]
                
                # Look for corresponding lifecycle configuration
                lifecycle_pattern = f'resource\\s+"aws_s3_bucket_lifecycle_configuration"\\s+"[^"]*{bucket_name}[^"]*"'
                if not re.search(lifecycle_pattern, content):
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:bucket.start()].count('\n') + 1,
                        "message": "S3 bucket lacks data retention policy",
                        "resource": bucket_name
                    })
        
        return findings
    
    def _check_network_segmentation(self) -> List[Dict[str, Any]]:
        """PCI DSS - Check for proper network segmentation"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check for PCI-scoped resources without proper network isolation
            pci_resources = re.finditer(
                r'resource\s+"[^"]+"\s+"[^"]+"\s*{([^}]+tags\s*=\s*{[^}]*PCIScope\s*=\s*"true"[^}]*}[^}]*)}', 
                content, re.DOTALL
            )
            
            for resource in pci_resources:
                resource_config = resource.group(1)
                # Check if resource is in a dedicated VPC or subnet
                if ("vpc_id" not in resource_config and 
                    "subnet_id" not in resource_config):
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:resource.start()].count('\n') + 1,
                        "message": "PCI-scoped resource lacks network segmentation",
                        "resource": resource.group(0).split('"')[3]
                    })
        
        return findings
    
    def _check_cardholder_data_encryption(self) -> List[Dict[str, Any]]:
        """PCI DSS - Check encryption of cardholder data"""
        findings = []
        
        for tf_file in self.terraform_dir.rglob("*.tf"):
            content = tf_file.read_text()
            
            # Check databases that might store cardholder data
            db_instances = re.finditer(
                r'resource\s+"aws_db_instance"\s+"[^"]+"\s*{([^}]+tags\s*=\s*{[^}]*CardholderData\s*=\s*"true"[^}]*}[^}]*)}', 
                content, re.DOTALL
            )
            
            for db in db_instances:
                db_config = db.group(1)
                if "storage_encrypted = true" not in db_config:
                    findings.append({
                        "file": str(tf_file.relative_to(self.terraform_dir)),
                        "line": content[:db.start()].count('\n') + 1,
                        "message": "Database storing cardholder data is not encrypted",
                        "resource": db.group(0).split('"')[3]
                    })
        
        return findings
    
    def run_compliance_checks(self) -> Dict[str, Any]:
        """Run all compliance checks"""
        results = {
            "frameworks": {},
            "total_findings": 0,
            "critical": 0,
            "high": 0,
            "medium": 0,
            "low": 0
        }
        
        for check in self.checks:
            framework_findings = check.check_function()
            
            if check.framework not in results["frameworks"]:
                results["frameworks"][check.framework] = {
                    "controls": {},
                    "total_findings": 0
                }
            
            results["frameworks"][check.framework]["controls"][check.control_id] = {
                "description": check.description,
                "severity": check.severity,
                "findings": framework_findings,
                "compliant": len(framework_findings) == 0
            }
            
            results["frameworks"][check.framework]["total_findings"] += len(framework_findings)
            results["total_findings"] += len(framework_findings)
            
            # Count by severity
            severity_count = len(framework_findings)
            if check.severity == "CRITICAL":
                results["critical"] += severity_count
            elif check.severity == "HIGH":
                results["high"] += severity_count
            elif check.severity == "MEDIUM":
                results["medium"] += severity_count
            else:
                results["low"] += severity_count
        
        return results

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='Terraform compliance validator')
    parser.add_argument('--terraform-dir', required=True, help='Terraform directory to validate')
    parser.add_argument('--output', required=True, help='Output file for results')
    parser.add_argument('--frameworks', nargs='+', default=['SOC2', 'GDPR', 'PCI_DSS'], 
                       help='Compliance frameworks to check')
    
    args = parser.parse_args()
    
    validator = ComplianceValidator(args.terraform_dir)
    results = validator.run_compliance_checks()
    
    # Filter results by requested frameworks
    if args.frameworks:
        filtered_frameworks = {k: v for k, v in results["frameworks"].items() 
                             if k in args.frameworks}
        results["frameworks"] = filtered_frameworks
    
    with open(args.output, 'w') as f:
        json.dump(results, f, indent=2)
    
    print(f"Compliance validation complete.")
    print(f"Total findings: {results['total_findings']}")
    print(f"Critical: {results['critical']}, High: {results['high']}, Medium: {results['medium']}")
    
    for framework, data in results["frameworks"].items():
        compliant_controls = sum(1 for control in data["controls"].values() if control["compliant"])
        total_controls = len(data["controls"])
        compliance_rate = (compliant_controls / total_controls * 100) if total_controls > 0 else 0
        print(f"{framework}: {compliance_rate:.1f}% compliant ({compliant_controls}/{total_controls} controls)")

if __name__ == "__main__":
    main()

What’s Next

Security and compliance testing provides deep validation of your infrastructure’s security posture and regulatory compliance. Combined with policy as code, these testing strategies create a comprehensive security validation framework that catches issues early in the development cycle.

In the next part, we’ll explore performance and cost testing techniques that validate not just the functionality and security of your infrastructure, but also its efficiency, scalability, and cost-effectiveness.

Performance and Cost Testing

Performance and cost testing ensure your infrastructure is not only functional and secure, but also efficient and cost-effective. These tests validate resource sizing, identify optimization opportunities, and prevent cost overruns before they impact your budget. Automated cost analysis and performance validation help maintain operational efficiency as your infrastructure scales.

This part covers comprehensive strategies for testing infrastructure performance characteristics and validating cost implications of your Terraform configurations.

Cost Impact Analysis

Analyze the cost implications of infrastructure changes:

#!/usr/bin/env python3
# scripts/cost_impact_analyzer.py

import json
import re
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Optional

@dataclass
class ResourceCost:
    resource_type: str
    resource_name: str
    monthly_cost: float
    annual_cost: float
    cost_factors: Dict[str, any]

class CostAnalyzer:
    def __init__(self):
        # AWS pricing data (simplified - in practice, use AWS Pricing API)
        self.pricing_data = {
            "aws_instance": {
                "t3.micro": {"hourly": 0.0104, "monthly": 7.59},
                "t3.small": {"hourly": 0.0208, "monthly": 15.18},
                "t3.medium": {"hourly": 0.0416, "monthly": 30.37},
                "t3.large": {"hourly": 0.0832, "monthly": 60.74},
                "m5.large": {"hourly": 0.096, "monthly": 70.08},
                "m5.xlarge": {"hourly": 0.192, "monthly": 140.16},
            },
            "aws_rds_instance": {
                "db.t3.micro": {"hourly": 0.017, "monthly": 12.41},
                "db.t3.small": {"hourly": 0.034, "monthly": 24.82},
                "db.r5.large": {"hourly": 0.24, "monthly": 175.20},
                "db.r5.xlarge": {"hourly": 0.48, "monthly": 350.40},
            },
            "aws_s3_bucket": {
                "standard": {"per_gb_monthly": 0.023},
                "ia": {"per_gb_monthly": 0.0125},
                "glacier": {"per_gb_monthly": 0.004},
            },
            "aws_ebs_volume": {
                "gp3": {"per_gb_monthly": 0.08},
                "gp2": {"per_gb_monthly": 0.10},
                "io1": {"per_gb_monthly": 0.125, "per_iops_monthly": 0.065},
            }
        }
    
    def analyze_terraform_plan(self, plan_file: str) -> Dict[str, any]:
        """Analyze Terraform plan for cost implications"""
        with open(plan_file, 'r') as f:
            plan_data = json.load(f)
        
        resource_costs = []
        total_monthly_cost = 0
        
        # Analyze planned resources
        if 'planned_values' in plan_data and 'root_module' in plan_data['planned_values']:
            resources = plan_data['planned_values']['root_module'].get('resources', [])
            
            for resource in resources:
                cost = self._calculate_resource_cost(resource)
                if cost:
                    resource_costs.append(cost)
                    total_monthly_cost += cost.monthly_cost
        
        # Analyze resource changes
        cost_changes = self._analyze_cost_changes(plan_data.get('resource_changes', []))
        
        return {
            "total_monthly_cost": total_monthly_cost,
            "total_annual_cost": total_monthly_cost * 12,
            "resource_costs": [
                {
                    "resource_type": rc.resource_type,
                    "resource_name": rc.resource_name,
                    "monthly_cost": rc.monthly_cost,
                    "annual_cost": rc.annual_cost,
                    "cost_factors": rc.cost_factors
                }
                for rc in resource_costs
            ],
            "cost_changes": cost_changes,
            "cost_breakdown": self._generate_cost_breakdown(resource_costs)
        }
    
    def _calculate_resource_cost(self, resource: Dict) -> Optional[ResourceCost]:
        """Calculate cost for a specific resource"""
        resource_type = resource.get('type')
        resource_name = resource.get('name', 'unknown')
        values = resource.get('values', {})
        
        if resource_type == 'aws_instance':
            return self._calculate_ec2_cost(resource_name, values)
        elif resource_type == 'aws_rds_instance':
            return self._calculate_rds_cost(resource_name, values)
        elif resource_type == 'aws_s3_bucket':
            return self._calculate_s3_cost(resource_name, values)
        elif resource_type == 'aws_ebs_volume':
            return self._calculate_ebs_cost(resource_name, values)
        
        return None
    
    def _calculate_ec2_cost(self, name: str, values: Dict) -> Optional[ResourceCost]:
        """Calculate EC2 instance cost"""
        instance_type = values.get('instance_type')
        if not instance_type or instance_type not in self.pricing_data['aws_instance']:
            return None
        
        pricing = self.pricing_data['aws_instance'][instance_type]
        monthly_cost = pricing['monthly']
        
        # Adjust for additional costs
        if values.get('ebs_optimized'):
            monthly_cost *= 1.1  # 10% premium for EBS optimization
        
        return ResourceCost(
            resource_type='aws_instance',
            resource_name=name,
            monthly_cost=monthly_cost,
            annual_cost=monthly_cost * 12,
            cost_factors={
                'instance_type': instance_type,
                'ebs_optimized': values.get('ebs_optimized', False),
                'hourly_rate': pricing['hourly']
            }
        )
    
    def _calculate_rds_cost(self, name: str, values: Dict) -> Optional[ResourceCost]:
        """Calculate RDS instance cost"""
        instance_class = values.get('instance_class')
        if not instance_class or instance_class not in self.pricing_data['aws_rds_instance']:
            return None
        
        pricing = self.pricing_data['aws_rds_instance'][instance_class]
        monthly_cost = pricing['monthly']
        
        # Adjust for Multi-AZ
        if values.get('multi_az'):
            monthly_cost *= 2
        
        # Add storage cost
        allocated_storage = values.get('allocated_storage', 20)
        storage_cost = allocated_storage * 0.115  # GP2 storage cost per GB
        monthly_cost += storage_cost
        
        return ResourceCost(
            resource_type='aws_rds_instance',
            resource_name=name,
            monthly_cost=monthly_cost,
            annual_cost=monthly_cost * 12,
            cost_factors={
                'instance_class': instance_class,
                'multi_az': values.get('multi_az', False),
                'allocated_storage': allocated_storage,
                'storage_cost': storage_cost
            }
        )
    
    def _calculate_s3_cost(self, name: str, values: Dict) -> ResourceCost:
        """Calculate S3 bucket cost (estimated)"""
        # S3 cost depends on usage, so we provide estimates
        estimated_gb = 100  # Default estimate
        storage_class = 'standard'  # Default
        
        pricing = self.pricing_data['aws_s3_bucket'][storage_class]
        monthly_cost = estimated_gb * pricing['per_gb_monthly']
        
        return ResourceCost(
            resource_type='aws_s3_bucket',
            resource_name=name,
            monthly_cost=monthly_cost,
            annual_cost=monthly_cost * 12,
            cost_factors={
                'estimated_storage_gb': estimated_gb,
                'storage_class': storage_class,
                'per_gb_cost': pricing['per_gb_monthly']
            }
        )
    
    def _calculate_ebs_cost(self, name: str, values: Dict) -> Optional[ResourceCost]:
        """Calculate EBS volume cost"""
        volume_type = values.get('type', 'gp3')
        size = values.get('size', 8)
        
        if volume_type not in self.pricing_data['aws_ebs_volume']:
            return None
        
        pricing = self.pricing_data['aws_ebs_volume'][volume_type]
        monthly_cost = size * pricing['per_gb_monthly']
        
        # Add IOPS cost for io1 volumes
        if volume_type == 'io1':
            iops = values.get('iops', 100)
            monthly_cost += iops * pricing['per_iops_monthly']
        
        return ResourceCost(
            resource_type='aws_ebs_volume',
            resource_name=name,
            monthly_cost=monthly_cost,
            annual_cost=monthly_cost * 12,
            cost_factors={
                'volume_type': volume_type,
                'size_gb': size,
                'iops': values.get('iops') if volume_type == 'io1' else None
            }
        )
    
    def _analyze_cost_changes(self, resource_changes: List[Dict]) -> Dict[str, any]:
        """Analyze cost impact of resource changes"""
        changes = {
            "new_resources": 0,
            "modified_resources": 0,
            "destroyed_resources": 0,
            "cost_increase": 0,
            "cost_decrease": 0
        }
        
        for change in resource_changes:
            actions = change.get('change', {}).get('actions', [])
            
            if 'create' in actions:
                changes["new_resources"] += 1
                # Estimate cost increase for new resources
                if change.get('type') == 'aws_instance':
                    instance_type = change.get('change', {}).get('after', {}).get('instance_type')
                    if instance_type in self.pricing_data['aws_instance']:
                        changes["cost_increase"] += self.pricing_data['aws_instance'][instance_type]['monthly']
            
            elif 'update' in actions:
                changes["modified_resources"] += 1
            
            elif 'delete' in actions:
                changes["destroyed_resources"] += 1
                # Estimate cost decrease for destroyed resources
                if change.get('type') == 'aws_instance':
                    instance_type = change.get('change', {}).get('before', {}).get('instance_type')
                    if instance_type in self.pricing_data['aws_instance']:
                        changes["cost_decrease"] += self.pricing_data['aws_instance'][instance_type]['monthly']
        
        changes["net_cost_change"] = changes["cost_increase"] - changes["cost_decrease"]
        
        return changes
    
    def _generate_cost_breakdown(self, resource_costs: List[ResourceCost]) -> Dict[str, any]:
        """Generate cost breakdown by resource type"""
        breakdown = {}
        
        for cost in resource_costs:
            if cost.resource_type not in breakdown:
                breakdown[cost.resource_type] = {
                    "count": 0,
                    "monthly_cost": 0,
                    "annual_cost": 0
                }
            
            breakdown[cost.resource_type]["count"] += 1
            breakdown[cost.resource_type]["monthly_cost"] += cost.monthly_cost
            breakdown[cost.resource_type]["annual_cost"] += cost.annual_cost
        
        return breakdown

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='Terraform cost impact analyzer')
    parser.add_argument('--plan-file', required=True, help='Terraform plan JSON file')
    parser.add_argument('--budget-limit', type=float, help='Monthly budget limit for validation')
    parser.add_argument('--output', required=True, help='Output file for cost analysis')
    
    args = parser.parse_args()
    
    analyzer = CostAnalyzer()
    analysis = analyzer.analyze_terraform_plan(args.plan_file)
    
    with open(args.output, 'w') as f:
        json.dump(analysis, f, indent=2)
    
    print(f"Cost Analysis Complete:")
    print(f"  Monthly Cost: ${analysis['total_monthly_cost']:.2f}")
    print(f"  Annual Cost: ${analysis['total_annual_cost']:.2f}")
    
    if args.budget_limit:
        if analysis['total_monthly_cost'] > args.budget_limit:
            print(f"  ⚠️  BUDGET EXCEEDED: ${analysis['total_monthly_cost']:.2f} > ${args.budget_limit:.2f}")
            exit(1)
        else:
            print(f"  ✅ Within budget: ${analysis['total_monthly_cost']:.2f} <= ${args.budget_limit:.2f}")
    
    print("\nCost Breakdown:")
    for resource_type, breakdown in analysis['cost_breakdown'].items():
        print(f"  {resource_type}: {breakdown['count']} resources, ${breakdown['monthly_cost']:.2f}/month")

if __name__ == "__main__":
    main()

Performance Testing Framework

Test infrastructure performance characteristics:

#!/usr/bin/env python3
# scripts/performance_tester.py

import json
import time
import concurrent.futures
import requests
from dataclasses import dataclass
from typing import List, Dict, Optional
import boto3

@dataclass
class PerformanceMetric:
    metric_name: str
    value: float
    unit: str
    threshold: Optional[float] = None
    passed: Optional[bool] = None

class InfrastructurePerformanceTester:
    def __init__(self, terraform_outputs: Dict):
        self.outputs = terraform_outputs
        self.metrics = []
    
    def test_web_application_performance(self, url: str, concurrent_users: int = 10, duration: int = 60):
        """Test web application performance under load"""
        print(f"Testing web application performance: {url}")
        
        def make_request():
            try:
                start_time = time.time()
                response = requests.get(url, timeout=30)
                end_time = time.time()
                
                return {
                    'response_time': end_time - start_time,
                    'status_code': response.status_code,
                    'success': response.status_code == 200
                }
            except Exception as e:
                return {
                    'response_time': 30.0,
                    'status_code': 0,
                    'success': False,
                    'error': str(e)
                }
        
        # Run load test
        results = []
        start_time = time.time()
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
            while time.time() - start_time < duration:
                futures = [executor.submit(make_request) for _ in range(concurrent_users)]
                batch_results = [future.result() for future in concurrent.futures.as_completed(futures)]
                results.extend(batch_results)
                time.sleep(1)  # 1 second between batches
        
        # Analyze results
        successful_requests = [r for r in results if r['success']]
        response_times = [r['response_time'] for r in successful_requests]
        
        if response_times:
            avg_response_time = sum(response_times) / len(response_times)
            max_response_time = max(response_times)
            min_response_time = min(response_times)
            p95_response_time = sorted(response_times)[int(len(response_times) * 0.95)]
        else:
            avg_response_time = max_response_time = min_response_time = p95_response_time = 0
        
        success_rate = len(successful_requests) / len(results) * 100 if results else 0
        
        # Add metrics
        self.metrics.extend([
            PerformanceMetric("avg_response_time", avg_response_time, "seconds", 2.0),
            PerformanceMetric("max_response_time", max_response_time, "seconds", 5.0),
            PerformanceMetric("p95_response_time", p95_response_time, "seconds", 3.0),
            PerformanceMetric("success_rate", success_rate, "percent", 95.0),
            PerformanceMetric("total_requests", len(results), "count"),
        ])
        
        return {
            'total_requests': len(results),
            'successful_requests': len(successful_requests),
            'success_rate': success_rate,
            'avg_response_time': avg_response_time,
            'max_response_time': max_response_time,
            'min_response_time': min_response_time,
            'p95_response_time': p95_response_time
        }
    
    def test_database_performance(self, db_endpoint: str, db_name: str):
        """Test database performance"""
        print(f"Testing database performance: {db_endpoint}")
        
        # This would typically involve connecting to the database
        # and running performance tests. For this example, we'll
        # use CloudWatch metrics instead.
        
        try:
            cloudwatch = boto3.client('cloudwatch')
            
            # Get recent database metrics
            end_time = time.time()
            start_time = end_time - 3600  # Last hour
            
            metrics_to_check = [
                ('CPUUtilization', 'AWS/RDS'),
                ('DatabaseConnections', 'AWS/RDS'),
                ('ReadLatency', 'AWS/RDS'),
                ('WriteLatency', 'AWS/RDS'),
            ]
            
            db_metrics = {}
            
            for metric_name, namespace in metrics_to_check:
                response = cloudwatch.get_metric_statistics(
                    Namespace=namespace,
                    MetricName=metric_name,
                    Dimensions=[
                        {
                            'Name': 'DBInstanceIdentifier',
                            'Value': db_name
                        }
                    ],
                    StartTime=start_time,
                    EndTime=end_time,
                    Period=300,
                    Statistics=['Average', 'Maximum']
                )
                
                if response['Datapoints']:
                    latest_datapoint = max(response['Datapoints'], key=lambda x: x['Timestamp'])
                    db_metrics[metric_name] = {
                        'average': latest_datapoint['Average'],
                        'maximum': latest_datapoint['Maximum']
                    }
            
            # Add database performance metrics
            if 'CPUUtilization' in db_metrics:
                self.metrics.append(
                    PerformanceMetric("db_cpu_utilization", db_metrics['CPUUtilization']['average'], "percent", 80.0)
                )
            
            if 'ReadLatency' in db_metrics:
                self.metrics.append(
                    PerformanceMetric("db_read_latency", db_metrics['ReadLatency']['average'] * 1000, "milliseconds", 20.0)
                )
            
            if 'WriteLatency' in db_metrics:
                self.metrics.append(
                    PerformanceMetric("db_write_latency", db_metrics['WriteLatency']['average'] * 1000, "milliseconds", 50.0)
                )
            
            return db_metrics
            
        except Exception as e:
            print(f"Error testing database performance: {e}")
            return {}
    
    def test_auto_scaling_performance(self, asg_name: str):
        """Test auto scaling group performance"""
        print(f"Testing auto scaling performance: {asg_name}")
        
        try:
            autoscaling = boto3.client('autoscaling')
            cloudwatch = boto3.client('cloudwatch')
            
            # Get ASG details
            response = autoscaling.describe_auto_scaling_groups(
                AutoScalingGroupNames=[asg_name]
            )
            
            if not response['AutoScalingGroups']:
                return {}
            
            asg = response['AutoScalingGroups'][0]
            
            # Check scaling metrics
            end_time = time.time()
            start_time = end_time - 3600  # Last hour
            
            # Get CloudWatch metrics for the ASG
            response = cloudwatch.get_metric_statistics(
                Namespace='AWS/AutoScaling',
                MetricName='GroupTotalInstances',
                Dimensions=[
                    {
                        'Name': 'AutoScalingGroupName',
                        'Value': asg_name
                    }
                ],
                StartTime=start_time,
                EndTime=end_time,
                Period=300,
                Statistics=['Average', 'Maximum', 'Minimum']
            )
            
            scaling_metrics = {}
            if response['Datapoints']:
                latest_datapoint = max(response['Datapoints'], key=lambda x: x['Timestamp'])
                scaling_metrics = {
                    'current_instances': latest_datapoint['Average'],
                    'max_instances': latest_datapoint['Maximum'],
                    'min_instances': latest_datapoint['Minimum']
                }
            
            # Add scaling performance metrics
            self.metrics.extend([
                PerformanceMetric("asg_current_capacity", asg['DesiredCapacity'], "count"),
                PerformanceMetric("asg_min_size", asg['MinSize'], "count"),
                PerformanceMetric("asg_max_size", asg['MaxSize'], "count"),
            ])
            
            return {
                'asg_name': asg_name,
                'desired_capacity': asg['DesiredCapacity'],
                'min_size': asg['MinSize'],
                'max_size': asg['MaxSize'],
                'current_instances': len(asg['Instances']),
                'scaling_metrics': scaling_metrics
            }
            
        except Exception as e:
            print(f"Error testing auto scaling performance: {e}")
            return {}
    
    def evaluate_performance_thresholds(self):
        """Evaluate all metrics against their thresholds"""
        for metric in self.metrics:
            if metric.threshold is not None:
                if metric.metric_name.endswith('_rate'):
                    # For rates, higher is better
                    metric.passed = metric.value >= metric.threshold
                elif 'latency' in metric.metric_name or 'response_time' in metric.metric_name:
                    # For latency/response time, lower is better
                    metric.passed = metric.value <= metric.threshold
                elif 'utilization' in metric.metric_name:
                    # For utilization, lower is better (below threshold)
                    metric.passed = metric.value <= metric.threshold
                else:
                    # Default: lower is better
                    metric.passed = metric.value <= metric.threshold
    
    def generate_performance_report(self) -> Dict:
        """Generate comprehensive performance report"""
        self.evaluate_performance_thresholds()
        
        passed_metrics = [m for m in self.metrics if m.passed is True]
        failed_metrics = [m for m in self.metrics if m.passed is False]
        
        return {
            'timestamp': time.time(),
            'total_metrics': len(self.metrics),
            'passed_metrics': len(passed_metrics),
            'failed_metrics': len(failed_metrics),
            'success_rate': len(passed_metrics) / len(self.metrics) * 100 if self.metrics else 0,
            'metrics': [
                {
                    'name': m.metric_name,
                    'value': m.value,
                    'unit': m.unit,
                    'threshold': m.threshold,
                    'passed': m.passed
                }
                for m in self.metrics
            ],
            'failed_tests': [
                {
                    'name': m.metric_name,
                    'value': m.value,
                    'threshold': m.threshold,
                    'unit': m.unit
                }
                for m in failed_metrics
            ]
        }

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='Infrastructure performance tester')
    parser.add_argument('--terraform-outputs', required=True, help='Terraform outputs JSON file')
    parser.add_argument('--output', required=True, help='Output file for performance report')
    parser.add_argument('--load-test-duration', type=int, default=60, help='Load test duration in seconds')
    parser.add_argument('--concurrent-users', type=int, default=10, help='Number of concurrent users for load testing')
    
    args = parser.parse_args()
    
    # Load Terraform outputs
    with open(args.terraform_outputs, 'r') as f:
        terraform_outputs = json.load(f)
    
    tester = InfrastructurePerformanceTester(terraform_outputs)
    
    # Run performance tests based on available outputs
    if 'load_balancer_dns_name' in terraform_outputs:
        url = f"http://{terraform_outputs['load_balancer_dns_name']['value']}"
        tester.test_web_application_performance(
            url, 
            args.concurrent_users, 
            args.load_test_duration
        )
    
    if 'database_endpoint' in terraform_outputs:
        db_endpoint = terraform_outputs['database_endpoint']['value']
        db_name = terraform_outputs.get('database_name', {}).get('value', 'main')
        tester.test_database_performance(db_endpoint, db_name)
    
    if 'asg_name' in terraform_outputs:
        asg_name = terraform_outputs['asg_name']['value']
        tester.test_auto_scaling_performance(asg_name)
    
    # Generate report
    report = tester.generate_performance_report()
    
    with open(args.output, 'w') as f:
        json.dump(report, f, indent=2)
    
    print(f"Performance testing complete:")
    print(f"  Total metrics: {report['total_metrics']}")
    print(f"  Passed: {report['passed_metrics']}")
    print(f"  Failed: {report['failed_metrics']}")
    print(f"  Success rate: {report['success_rate']:.1f}%")
    
    if report['failed_tests']:
        print("\nFailed performance tests:")
        for test in report['failed_tests']:
            print(f"  - {test['name']}: {test['value']} {test['unit']} (threshold: {test['threshold']})")
        exit(1)

if __name__ == "__main__":
    main()

Resource Optimization Analysis

Analyze resource configurations for optimization opportunities:

#!/bin/bash
# scripts/optimization-analyzer.sh

set -e

TERRAFORM_DIR=${1:-"infrastructure"}
OUTPUT_DIR=${2:-"optimization-reports"}

mkdir -p "$OUTPUT_DIR"

echo "Analyzing Terraform configurations for optimization opportunities..."

# Generate Terraform plans for analysis
find "$TERRAFORM_DIR" -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
    echo "Analyzing $dir for optimization opportunities..."
    
    cd "$dir"
    terraform init -backend=false
    terraform plan -out=optimization.tfplan
    terraform show -json optimization.tfplan > optimization-plan.json
    cd - > /dev/null
    
    # Run optimization analysis
    python3 scripts/resource_optimizer.py \
        --plan-file "$dir/optimization-plan.json" \
        --output "$OUTPUT_DIR/$(basename "$dir")-optimization.json"
done

# Generate consolidated optimization report
python3 scripts/consolidate_optimization_reports.py \
    --reports-dir "$OUTPUT_DIR" \
    --output "$OUTPUT_DIR/consolidated-optimization-report.json"

echo "Optimization analysis complete. Reports saved to $OUTPUT_DIR/"

What’s Next

Performance and cost testing complete the comprehensive testing strategy by validating the efficiency and economic impact of your infrastructure. Combined with functional, security, and policy testing, these techniques ensure your infrastructure meets all requirements for production deployment.

In the final part, we’ll integrate all these testing strategies into comprehensive CI/CD pipelines that automate the entire testing workflow, from static analysis through performance validation, creating a complete quality assurance framework for infrastructure as code.

CI/CD Integration

Integrating Terraform testing into CI/CD pipelines ensures that every infrastructure change is validated before reaching production. A well-designed pipeline combines static analysis, unit testing, integration testing, and policy validation to create a comprehensive quality gate that prevents infrastructure failures and security issues.

This final part demonstrates how to build robust CI/CD pipelines that automate Terraform testing and deployment workflows.

GitHub Actions Pipeline

A comprehensive GitHub Actions workflow for Terraform testing:

# .github/workflows/terraform-test.yml
name: Terraform Test and Deploy

on:
  pull_request:
    paths: ['infrastructure/**', 'modules/**']
  push:
    branches: [main]
    paths: ['infrastructure/**', 'modules/**']

env:
  TF_VERSION: 1.6.0
  AWS_REGION: us-west-2

jobs:
  static-analysis:
    name: Static Analysis
    runs-on: ubuntu-latest
    steps:
      - name: Checkout
        uses: actions/checkout@v4
      
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: ${{ env.TF_VERSION }}
      
      - name: Terraform Format Check
        run: terraform fmt -check -recursive
      
      - name: Terraform Validate
        run: |
          find . -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
            echo "Validating $dir"
            cd "$dir"
            terraform init -backend=false
            terraform validate
            cd - > /dev/null
          done
      
      - name: Setup TFLint
        uses: terraform-linters/setup-tflint@v4
        with:
          tflint_version: v0.50.0
      
      - name: Run TFLint
        run: |
          tflint --init
          tflint --recursive
      
      - name: Run Checkov
        uses: bridgecrewio/checkov-action@master
        with:
          directory: .
          framework: terraform
          output_format: sarif
          output_file_path: checkov.sarif
      
      - name: Upload SARIF file
        uses: github/codeql-action/upload-sarif@v3
        if: always()
        with:
          sarif_file: checkov.sarif

  unit-tests:
    name: Unit Tests
    runs-on: ubuntu-latest
    needs: static-analysis
    steps:
      - name: Checkout
        uses: actions/checkout@v4
      
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: ${{ env.TF_VERSION }}
      
      - name: Run Unit Tests
        run: |
          cd test/unit
          for test_dir in */; do
            echo "Running unit tests in $test_dir"
            cd "$test_dir"
            terraform init -backend=false
            terraform plan -out=test.tfplan
            terraform show -json test.tfplan > plan.json
            # Add custom validation logic here
            cd ..
          done

  integration-tests:
    name: Integration Tests
    runs-on: ubuntu-latest
    needs: unit-tests
    if: github.event_name == 'pull_request'
    environment: testing
    steps:
      - name: Checkout
        uses: actions/checkout@v4
      
      - name: Setup Go
        uses: actions/setup-go@v4
        with:
          go-version: '1.21'
      
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: ${{ env.TF_VERSION }}
      
      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_ROLE_ARN }}
          aws-region: ${{ env.AWS_REGION }}
      
      - name: Run Integration Tests
        run: |
          cd test/integration
          go mod download
          go test -v -timeout 30m ./...
        env:
          AWS_DEFAULT_REGION: ${{ env.AWS_REGION }}

  policy-validation:
    name: Policy Validation
    runs-on: ubuntu-latest
    needs: static-analysis
    steps:
      - name: Checkout
        uses: actions/checkout@v4
      
      - name: Setup OPA
        uses: open-policy-agent/setup-opa@v2
        with:
          version: latest
      
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: ${{ env.TF_VERSION }}
      
      - name: Generate Terraform Plans
        run: |
          find infrastructure -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
            echo "Generating plan for $dir"
            cd "$dir"
            terraform init -backend=false
            terraform plan -out=plan.tfplan
            terraform show -json plan.tfplan > plan.json
            cd - > /dev/null
          done
      
      - name: Run Policy Tests
        run: |
          find infrastructure -name "plan.json" | while read plan; do
            echo "Validating policy for $plan"
            opa eval -d policies/ -i "$plan" "data.terraform.deny[x]"
          done

  deploy-staging:
    name: Deploy to Staging
    runs-on: ubuntu-latest
    needs: [unit-tests, policy-validation]
    if: github.ref == 'refs/heads/main'
    environment: staging
    steps:
      - name: Checkout
        uses: actions/checkout@v4
      
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: ${{ env.TF_VERSION }}
      
      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_STAGING_ROLE_ARN }}
          aws-region: ${{ env.AWS_REGION }}
      
      - name: Terraform Plan
        run: |
          cd infrastructure/staging
          terraform init
          terraform plan -out=staging.tfplan
      
      - name: Terraform Apply
        run: |
          cd infrastructure/staging
          terraform apply staging.tfplan
      
      - name: Run Smoke Tests
        run: |
          cd test/smoke
          go test -v -timeout 10m ./...

  deploy-production:
    name: Deploy to Production
    runs-on: ubuntu-latest
    needs: deploy-staging
    if: github.ref == 'refs/heads/main'
    environment: production
    steps:
      - name: Checkout
        uses: actions/checkout@v4
      
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: ${{ env.TF_VERSION }}
      
      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_PRODUCTION_ROLE_ARN }}
          aws-region: ${{ env.AWS_REGION }}
      
      - name: Terraform Plan
        run: |
          cd infrastructure/production
          terraform init
          terraform plan -out=production.tfplan
      
      - name: Manual Approval
        uses: trstringer/manual-approval@v1
        with:
          secret: ${{ github.TOKEN }}
          approvers: platform-team
          minimum-approvals: 2
          issue-title: "Production Deployment Approval"
      
      - name: Terraform Apply
        run: |
          cd infrastructure/production
          terraform apply production.tfplan
      
      - name: Run Production Tests
        run: |
          cd test/production
          go test -v -timeout 15m ./...

GitLab CI Pipeline

A comprehensive GitLab CI pipeline with multiple stages:

# .gitlab-ci.yml
stages:
  - validate
  - test
  - security
  - deploy-staging
  - deploy-production

variables:
  TF_VERSION: "1.6.0"
  TF_ROOT: ${CI_PROJECT_DIR}
  TF_ADDRESS: ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/terraform/state/${CI_COMMIT_REF_SLUG}

cache:
  key: "${TF_ROOT}"
  paths:
    - ${TF_ROOT}/.terraform

before_script:
  - apt-get update -qq && apt-get install -y -qq git curl unzip
  - curl -fsSL https://releases.hashicorp.com/terraform/${TF_VERSION}/terraform_${TF_VERSION}_linux_amd64.zip -o terraform.zip
  - unzip terraform.zip && mv terraform /usr/local/bin/
  - terraform --version

validate:
  stage: validate
  script:
    - terraform fmt -check -recursive
    - find . -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
        cd "$dir"
        terraform init -backend=false
        terraform validate
        cd - > /dev/null
      done
  rules:
    - changes:
      - "**/*.tf"
      - "**/*.tfvars"

unit-test:
  stage: test
  script:
    - cd test/unit
    - for test_dir in */; do
        echo "Testing $test_dir"
        cd "$test_dir"
        terraform init -backend=false
        terraform plan -detailed-exitcode
        cd ..
      done
  rules:
    - changes:
      - "**/*.tf"
      - "**/*.tfvars"

integration-test:
  stage: test
  image: golang:1.21
  services:
    - docker:dind
  before_script:
    - apt-get update -qq && apt-get install -y -qq curl unzip
    - curl -fsSL https://releases.hashicorp.com/terraform/${TF_VERSION}/terraform_${TF_VERSION}_linux_amd64.zip -o terraform.zip
    - unzip terraform.zip && mv terraform /usr/local/bin/
  script:
    - cd test/integration
    - go mod download
    - go test -v -timeout 30m ./...
  rules:
    - if: $CI_PIPELINE_SOURCE == "merge_request_event"
      changes:
        - "**/*.tf"
        - "**/*.tfvars"

security-scan:
  stage: security
  image: bridgecrew/checkov:latest
  script:
    - checkov -d . --framework terraform --output cli --output json --output-file-path console,checkov-report.json
  artifacts:
    reports:
      sast: checkov-report.json
    expire_in: 1 week
  rules:
    - changes:
      - "**/*.tf"
      - "**/*.tfvars"

policy-check:
  stage: security
  image: openpolicyagent/opa:latest
  before_script:
    - apk add --no-cache curl unzip
    - curl -fsSL https://releases.hashicorp.com/terraform/${TF_VERSION}/terraform_${TF_VERSION}_linux_amd64.zip -o terraform.zip
    - unzip terraform.zip && mv terraform /usr/local/bin/
  script:
    - find infrastructure -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
        cd "$dir"
        terraform init -backend=false
        terraform plan -out=plan.tfplan
        terraform show -json plan.tfplan > plan.json
        opa eval -d ../../policies/ -i plan.json "data.terraform.deny[x]"
        cd - > /dev/null
      done
  rules:
    - changes:
      - "**/*.tf"
      - "**/*.tfvars"
      - "policies/**/*.rego"

deploy-staging:
  stage: deploy-staging
  environment:
    name: staging
    url: https://staging.example.com
  before_script:
    - echo $AWS_STAGING_CREDENTIALS | base64 -d > ~/.aws/credentials
  script:
    - cd infrastructure/staging
    - terraform init -backend-config="address=${TF_ADDRESS}-staging"
    - terraform plan -out=staging.tfplan
    - terraform apply staging.tfplan
  after_script:
    - cd test/smoke
    - go test -v -timeout 10m ./...
  rules:
    - if: $CI_COMMIT_BRANCH == "main"
      changes:
        - "**/*.tf"
        - "**/*.tfvars"

deploy-production:
  stage: deploy-production
  environment:
    name: production
    url: https://production.example.com
  before_script:
    - echo $AWS_PRODUCTION_CREDENTIALS | base64 -d > ~/.aws/credentials
  script:
    - cd infrastructure/production
    - terraform init -backend-config="address=${TF_ADDRESS}-production"
    - terraform plan -out=production.tfplan
    - terraform apply production.tfplan
  after_script:
    - cd test/production
    - go test -v -timeout 15m ./...
  when: manual
  rules:
    - if: $CI_COMMIT_BRANCH == "main"
      changes:
        - "**/*.tf"
        - "**/*.tfvars"

Azure DevOps Pipeline

A comprehensive Azure DevOps pipeline:

# azure-pipelines.yml
trigger:
  branches:
    include:
    - main
  paths:
    include:
    - infrastructure/*
    - modules/*

pr:
  branches:
    include:
    - main
  paths:
    include:
    - infrastructure/*
    - modules/*

variables:
  terraformVersion: '1.6.0'
  awsRegion: 'us-west-2'

stages:
- stage: Validate
  displayName: 'Validate and Test'
  jobs:
  - job: StaticAnalysis
    displayName: 'Static Analysis'
    pool:
      vmImage: 'ubuntu-latest'
    steps:
    - task: TerraformInstaller@0
      displayName: 'Install Terraform'
      inputs:
        terraformVersion: $(terraformVersion)
    
    - script: |
        terraform fmt -check -recursive
      displayName: 'Terraform Format Check'
    
    - script: |
        find . -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
          echo "Validating $dir"
          cd "$dir"
          terraform init -backend=false
          terraform validate
          cd - > /dev/null
        done
      displayName: 'Terraform Validate'
    
    - script: |
        curl -s https://raw.githubusercontent.com/terraform-linters/tflint/master/install_linux.sh | bash
        tflint --init
        tflint --recursive
      displayName: 'TFLint'
    
    - script: |
        pip install checkov
        checkov -d . --framework terraform --output cli --output sarif --output-file-path console,checkov.sarif
      displayName: 'Checkov Security Scan'
    
    - task: PublishTestResults@2
      condition: always()
      inputs:
        testResultsFormat: 'JUnit'
        testResultsFiles: 'checkov.sarif'
        testRunTitle: 'Security Scan Results'

  - job: UnitTests
    displayName: 'Unit Tests'
    dependsOn: StaticAnalysis
    pool:
      vmImage: 'ubuntu-latest'
    steps:
    - task: TerraformInstaller@0
      inputs:
        terraformVersion: $(terraformVersion)
    
    - script: |
        cd test/unit
        for test_dir in */; do
          echo "Running unit tests in $test_dir"
          cd "$test_dir"
          terraform init -backend=false
          terraform plan -out=test.tfplan
          cd ..
        done
      displayName: 'Run Unit Tests'

- stage: IntegrationTest
  displayName: 'Integration Testing'
  condition: eq(variables['Build.Reason'], 'PullRequest')
  dependsOn: Validate
  jobs:
  - job: IntegrationTests
    displayName: 'Integration Tests'
    pool:
      vmImage: 'ubuntu-latest'
    steps:
    - task: GoTool@0
      inputs:
        version: '1.21'
    
    - task: TerraformInstaller@0
      inputs:
        terraformVersion: $(terraformVersion)
    
    - task: AWSShellScript@1
      inputs:
        awsCredentials: 'AWS-Testing'
        regionName: $(awsRegion)
        scriptType: 'inline'
        inlineScript: |
          cd test/integration
          go mod download
          go test -v -timeout 30m ./...
      displayName: 'Run Integration Tests'

- stage: DeployStaging
  displayName: 'Deploy to Staging'
  condition: and(succeeded(), eq(variables['Build.SourceBranch'], 'refs/heads/main'))
  dependsOn: Validate
  jobs:
  - deployment: DeployStaging
    displayName: 'Deploy to Staging'
    environment: 'staging'
    pool:
      vmImage: 'ubuntu-latest'
    strategy:
      runOnce:
        deploy:
          steps:
          - task: TerraformInstaller@0
            inputs:
              terraformVersion: $(terraformVersion)
          
          - task: TerraformTaskV4@4
            inputs:
              provider: 'aws'
              command: 'init'
              workingDirectory: 'infrastructure/staging'
              backendServiceAWS: 'AWS-Staging'
              backendAWSBucketName: 'terraform-state-staging'
              backendAWSKey: 'infrastructure/terraform.tfstate'
          
          - task: TerraformTaskV4@4
            inputs:
              provider: 'aws'
              command: 'plan'
              workingDirectory: 'infrastructure/staging'
              environmentServiceNameAWS: 'AWS-Staging'
          
          - task: TerraformTaskV4@4
            inputs:
              provider: 'aws'
              command: 'apply'
              workingDirectory: 'infrastructure/staging'
              environmentServiceNameAWS: 'AWS-Staging'
          
          - script: |
              cd test/smoke
              go test -v -timeout 10m ./...
            displayName: 'Run Smoke Tests'

- stage: DeployProduction
  displayName: 'Deploy to Production'
  condition: and(succeeded(), eq(variables['Build.SourceBranch'], 'refs/heads/main'))
  dependsOn: DeployStaging
  jobs:
  - deployment: DeployProduction
    displayName: 'Deploy to Production'
    environment: 'production'
    pool:
      vmImage: 'ubuntu-latest'
    strategy:
      runOnce:
        deploy:
          steps:
          - task: TerraformInstaller@0
            inputs:
              terraformVersion: $(terraformVersion)
          
          - task: TerraformTaskV4@4
            inputs:
              provider: 'aws'
              command: 'init'
              workingDirectory: 'infrastructure/production'
              backendServiceAWS: 'AWS-Production'
              backendAWSBucketName: 'terraform-state-production'
              backendAWSKey: 'infrastructure/terraform.tfstate'
          
          - task: TerraformTaskV4@4
            inputs:
              provider: 'aws'
              command: 'plan'
              workingDirectory: 'infrastructure/production'
              environmentServiceNameAWS: 'AWS-Production'
          
          - task: ManualValidation@0
            inputs:
              notifyUsers: '[email protected]'
              instructions: 'Please review the Terraform plan and approve the production deployment'
          
          - task: TerraformTaskV4@4
            inputs:
              provider: 'aws'
              command: 'apply'
              workingDirectory: 'infrastructure/production'
              environmentServiceNameAWS: 'AWS-Production'
          
          - script: |
              cd test/production
              go test -v -timeout 15m ./...
            displayName: 'Run Production Tests'

Testing Pipeline Optimization

Optimize pipeline performance and reliability:

#!/bin/bash
# scripts/optimize-pipeline.sh

# Parallel test execution
run_tests_parallel() {
    local test_dirs=("$@")
    local pids=()
    
    for dir in "${test_dirs[@]}"; do
        (
            echo "Running tests in $dir"
            cd "$dir"
            terraform init -backend=false
            terraform plan -out=test.tfplan
            terraform show -json test.tfplan > plan.json
            # Run custom validations
            python3 ../../scripts/validate-plan.py plan.json
        ) &
        pids+=($!)
    done
    
    # Wait for all tests to complete
    for pid in "${pids[@]}"; do
        wait $pid || exit 1
    done
}

# Cache Terraform providers
cache_providers() {
    local cache_dir="$HOME/.terraform.d/plugin-cache"
    mkdir -p "$cache_dir"
    export TF_PLUGIN_CACHE_DIR="$cache_dir"
    
    # Pre-download common providers
    terraform providers mirror "$cache_dir"
}

# Selective testing based on changes
selective_testing() {
    local changed_files=$(git diff --name-only HEAD~1)
    local test_modules=()
    
    for file in $changed_files; do
        if [[ $file == modules/* ]]; then
            module_name=$(echo "$file" | cut -d'/' -f2)
            test_modules+=("test/unit/$module_name")
        fi
    done
    
    if [ ${#test_modules[@]} -gt 0 ]; then
        run_tests_parallel "${test_modules[@]}"
    else
        echo "No module changes detected, running full test suite"
        run_tests_parallel test/unit/*/
    fi
}

# Main execution
main() {
    cache_providers
    selective_testing
}

main "$@"

Conclusion

Comprehensive CI/CD integration ensures that every infrastructure change is thoroughly tested before reaching production. The combination of static analysis, unit testing, integration testing, policy validation, and automated deployment creates a robust quality gate that prevents infrastructure failures and maintains security standards.

The key to successful Terraform testing in CI/CD is balancing thoroughness with speed, using parallel execution, caching, and selective testing to maintain fast feedback cycles while ensuring comprehensive validation of your infrastructure code.