Terraform Testing and Validation: Quality Infrastructure Code
Infrastructure code needs the same quality assurance practices as application code, but testing infrastructure presents unique challenges. How do you unit test a VPC? How do you validate that your security policies actually work? How do you catch configuration errors before they reach production?
This guide covers the complete spectrum of Terraform testing and validation, from static analysis and policy validation to integration testing with real cloud resources.
Static Analysis and Linting
Static analysis catches errors before you even run Terraform, identifying syntax issues, security problems, and style inconsistencies that could cause problems later. Unlike application code, infrastructure code mistakes can be expensive—literally. A misconfigured security group or an oversized instance type can cost money and create security vulnerabilities.
The tools and practices in this part form the first line of defense against infrastructure code problems, catching issues in your editor and CI pipeline before they reach cloud resources.
Terraform Built-in Validation
Terraform includes several built-in validation commands that should be part of every workflow:
# Format code consistently
terraform fmt -recursive
# Check for syntax errors and validate configuration
terraform validate
# Generate and review execution plans
terraform plan -out=tfplan
# Show plan in human-readable format
terraform show tfplan
# Show plan in JSON for automated analysis
terraform show -json tfplan | jq '.planned_values'
Automated formatting ensures consistent code style:
# Check if files need formatting (exits with code 3 if changes needed)
terraform fmt -check -recursive
# Format all files in current directory and subdirectories
terraform fmt -recursive
# Show what would be formatted without making changes
terraform fmt -diff -check
Configuration validation catches syntax and logic errors:
# Validate configuration syntax
terraform validate
# Validate with specific variable values
terraform validate -var="environment=prod"
# Validate without initializing providers
terraform validate -backend=false
TFLint for Advanced Linting
TFLint provides deeper analysis than Terraform’s built-in validation:
# Install TFLint
curl -s https://raw.githubusercontent.com/terraform-linters/tflint/master/install_linux.sh | bash
# Initialize TFLint with plugins
tflint --init
# Run linting
tflint
# Run with specific ruleset
tflint --enable-rule=terraform_unused_declarations
TFLint configuration (.tflint.hcl
):
config {
module = true
force = false
}
plugin "aws" {
enabled = true
version = "0.24.1"
source = "github.com/terraform-linters/tflint-ruleset-aws"
}
rule "terraform_deprecated_interpolation" {
enabled = true
}
rule "terraform_unused_declarations" {
enabled = true
}
rule "terraform_comment_syntax" {
enabled = true
}
rule "terraform_documented_outputs" {
enabled = true
}
rule "terraform_documented_variables" {
enabled = true
}
rule "terraform_typed_variables" {
enabled = true
}
rule "terraform_module_pinned_source" {
enabled = true
}
rule "terraform_naming_convention" {
enabled = true
format = "snake_case"
}
rule "terraform_standard_module_structure" {
enabled = true
}
AWS-specific rules catch cloud-specific issues:
# Check for deprecated instance types
tflint --enable-rule=aws_instance_previous_type
# Validate security group rules
tflint --enable-rule=aws_security_group_rule_description
# Check for invalid AMI IDs
tflint --enable-rule=aws_instance_invalid_ami
Checkov for Security Scanning
Checkov scans for security and compliance issues:
# Install Checkov
pip install checkov
# Scan Terraform files
checkov -f main.tf
# Scan entire directory
checkov -d .
# Output in different formats
checkov -d . --output json
checkov -d . --output sarif
# Skip specific checks
checkov -d . --skip-check CKV_AWS_23
# Run only specific frameworks
checkov -d . --framework terraform
Custom Checkov policies for organization-specific rules:
# custom_checks/RequireOwnerTag.py
from checkov.common.models.enums import TRUE_VALUES
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
class RequireOwnerTag(BaseResourceCheck):
def __init__(self):
name = "Ensure all resources have Owner tag"
id = "CKV_CUSTOM_1"
supported_resources = ['*']
categories = [CheckCategories.GENERAL_SECURITY]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)
def scan_resource_conf(self, conf):
"""
Looks for Owner tag in resource configuration
"""
if 'tags' in conf:
tags = conf['tags'][0]
if isinstance(tags, dict) and 'Owner' in tags:
return CheckResult.PASSED
return CheckResult.FAILED
check = RequireOwnerTag()
Terraform Docs for Documentation
Terraform-docs generates documentation from your code:
# Install terraform-docs
curl -sSLo ./terraform-docs.tar.gz https://terraform-docs.io/dl/v0.16.0/terraform-docs-v0.16.0-$(uname)-amd64.tar.gz
tar -xzf terraform-docs.tar.gz
chmod +x terraform-docs
sudo mv terraform-docs /usr/local/bin/terraform-docs
# Generate documentation
terraform-docs markdown table . > README.md
# Generate with custom template
terraform-docs markdown table --output-file README.md .
Configuration file (.terraform-docs.yml
):
formatter: "markdown table"
header-from: main.tf
footer-from: ""
recursive:
enabled: false
path: modules
sections:
hide: []
show: []
content: |-
# {{ .Header }}
{{ .Requirements }}
{{ .Providers }}
{{ .Modules }}
{{ .Resources }}
{{ .Inputs }}
{{ .Outputs }}
output:
file: "README.md"
mode: inject
template: |-
<!-- BEGIN_TF_DOCS -->
{{ .Content }}
<!-- END_TF_DOCS -->
sort:
enabled: true
by: name
settings:
anchor: true
color: true
default: true
description: false
escape: true
hide-empty: false
html: true
indent: 2
lockfile: true
read-comments: true
required: true
sensitive: true
type: true
Pre-commit Hooks
Pre-commit hooks run validation automatically before commits:
# Install pre-commit
pip install pre-commit
# Install hooks
pre-commit install
Pre-commit configuration (.pre-commit-config.yaml
):
repos:
- repo: https://github.com/antonbabenko/pre-commit-terraform
rev: v1.81.0
hooks:
- id: terraform_fmt
- id: terraform_validate
- id: terraform_docs
args:
- --hook-config=--path-to-file=README.md
- --hook-config=--add-to-existing-file=true
- --hook-config=--create-file-if-not-exist=true
- id: terraform_tflint
args:
- --args=--only=terraform_deprecated_interpolation
- --args=--only=terraform_unused_declarations
- --args=--only=terraform_comment_syntax
- --args=--only=terraform_documented_outputs
- --args=--only=terraform_documented_variables
- --args=--only=terraform_typed_variables
- --args=--only=terraform_module_pinned_source
- --args=--only=terraform_naming_convention
- --args=--only=terraform_standard_module_structure
- id: terraform_tfsec
- id: terraform_checkov
args:
- --args=--skip-check CKV2_AWS_6
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
TFSec for Security Analysis
TFSec focuses specifically on security issues:
# Install tfsec
curl -s https://raw.githubusercontent.com/aquasecurity/tfsec/master/scripts/install_linux.sh | bash
# Run security scan
tfsec .
# Output in different formats
tfsec --format json .
tfsec --format sarif .
# Exclude specific checks
tfsec --exclude aws-s3-enable-logging .
# Run with custom checks
tfsec --custom-check-dir ./custom-checks .
Custom TFSec rules:
package custom
import (
"github.com/aquasecurity/tfsec/pkg/result"
"github.com/aquasecurity/tfsec/pkg/severity"
"github.com/aquasecurity/tfsec/pkg/state"
"github.com/aquasecurity/tfsec/pkg/rule"
)
var RequireOwnerTag = rule.Rule{
LegacyID: "CUS001",
BadExample: []string{`
resource "aws_instance" "bad_example" {
ami = "ami-12345678"
instance_type = "t2.micro"
}
`},
GoodExample: []string{`
resource "aws_instance" "good_example" {
ami = "ami-12345678"
instance_type = "t2.micro"
tags = {
Owner = "team-name"
}
}
`},
Links: []string{
"https://example.com/tagging-policy",
},
RequiredTypes: []string{"resource"},
RequiredLabels: []string{"aws_instance"},
Base: rule.Base{
Rule: result.Rule{
AVDID: "AVD-CUS-0001",
Provider: "aws",
Service: "ec2",
ShortCode: "require-owner-tag",
Summary: "Resource should have Owner tag",
Impact: "Resources without Owner tag cannot be tracked for cost allocation",
Resolution: "Add Owner tag to resource",
Explanation: "All resources should have an Owner tag for cost allocation and management purposes",
Severity: severity.Medium,
},
},
}
Automated Quality Gates
Integrate static analysis into CI/CD pipelines:
# GitHub Actions workflow
name: Terraform Quality Gates
on:
pull_request:
paths: ['**.tf', '**.tfvars']
jobs:
quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Terraform
uses: hashicorp/setup-terraform@v2
with:
terraform_version: 1.6.0
- name: Terraform Format Check
run: terraform fmt -check -recursive
- name: Terraform Validate
run: |
terraform init -backend=false
terraform validate
- name: Run TFLint
uses: terraform-linters/setup-tflint@v3
with:
tflint_version: v0.47.0
- run: tflint --init
- run: tflint -f compact
- name: Run Checkov
uses: bridgecrewio/checkov-action@master
with:
directory: .
framework: terraform
output_format: sarif
output_file_path: checkov.sarif
- name: Run TFSec
uses: aquasecurity/[email protected]
with:
sarif_file: tfsec.sarif
- name: Upload SARIF files
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: |
checkov.sarif
tfsec.sarif
Quality Metrics and Reporting
Track code quality metrics over time:
#!/bin/bash
# quality-report.sh
echo "=== Terraform Quality Report ==="
echo "Generated: $(date)"
echo
echo "=== Format Check ==="
terraform fmt -check -recursive
FORMAT_EXIT=$?
echo "=== Validation ==="
terraform validate
VALIDATE_EXIT=$?
echo "=== TFLint Results ==="
tflint --format compact
TFLINT_EXIT=$?
echo "=== Security Scan ==="
tfsec --format table
TFSEC_EXIT=$?
echo "=== Documentation Check ==="
terraform-docs markdown table . > /tmp/docs.md
if diff -q README.md /tmp/docs.md > /dev/null; then
echo "Documentation is up to date"
DOCS_EXIT=0
else
echo "Documentation needs updating"
DOCS_EXIT=1
fi
echo
echo "=== Summary ==="
echo "Format: $([ $FORMAT_EXIT -eq 0 ] && echo "PASS" || echo "FAIL")"
echo "Validation: $([ $VALIDATE_EXIT -eq 0 ] && echo "PASS" || echo "FAIL")"
echo "Linting: $([ $TFLINT_EXIT -eq 0 ] && echo "PASS" || echo "FAIL")"
echo "Security: $([ $TFSEC_EXIT -eq 0 ] && echo "PASS" || echo "FAIL")"
echo "Documentation: $([ $DOCS_EXIT -eq 0 ] && echo "PASS" || echo "FAIL")"
OVERALL_EXIT=$((FORMAT_EXIT + VALIDATE_EXIT + TFLINT_EXIT + TFSEC_EXIT + DOCS_EXIT))
exit $OVERALL_EXIT
What’s Next
Static analysis provides the foundation for infrastructure code quality, but it can only catch certain types of issues. To validate that your infrastructure actually works as intended, you need testing strategies that go beyond syntax checking.
In the next part, we’ll explore unit testing strategies for Terraform modules, including techniques for testing logic without creating real cloud resources.
Unit Testing Strategies
Unit testing Terraform modules presents unique challenges since infrastructure code ultimately creates real cloud resources. However, you can test much of your Terraform logic—variable validation, conditional expressions, and output calculations—without provisioning actual infrastructure. These techniques catch logic errors early and make your modules more reliable.
This part covers strategies for testing Terraform modules in isolation, validating configuration logic, and ensuring your modules behave correctly across different input scenarios.
Testing Module Logic with Validation
Terraform’s validation blocks provide the first line of defense for unit testing:
# modules/vpc/variables.tf
variable "cidr_block" {
description = "CIDR block for the VPC"
type = string
validation {
condition = can(cidrhost(var.cidr_block, 0))
error_message = "The cidr_block must be a valid CIDR block."
}
validation {
condition = can(regex("^10\\.|^172\\.(1[6-9]|2[0-9]|3[0-1])\\.|^192\\.168\\.", var.cidr_block))
error_message = "The cidr_block must use private IP address space (10.x.x.x, 172.16-31.x.x, or 192.168.x.x)."
}
}
variable "availability_zones" {
description = "List of availability zones"
type = list(string)
validation {
condition = length(var.availability_zones) >= 2
error_message = "At least 2 availability zones must be specified for high availability."
}
validation {
condition = length(var.availability_zones) <= 6
error_message = "Maximum of 6 availability zones supported."
}
}
variable "environment" {
description = "Environment name"
type = string
validation {
condition = contains(["dev", "staging", "prod"], var.environment)
error_message = "Environment must be one of: dev, staging, prod."
}
}
variable "subnet_configuration" {
description = "Subnet configuration"
type = object({
public_subnets = list(string)
private_subnets = list(string)
})
validation {
condition = length(var.subnet_configuration.public_subnets) == length(var.subnet_configuration.private_subnets)
error_message = "Number of public and private subnets must be equal."
}
validation {
condition = alltrue([
for cidr in concat(var.subnet_configuration.public_subnets, var.subnet_configuration.private_subnets) :
can(cidrhost(cidr, 0))
])
error_message = "All subnet CIDR blocks must be valid."
}
}
Testing with Terraform Plan
Use terraform plan
to test module logic without creating resources:
#!/bin/bash
# test-module.sh
set -e
MODULE_DIR="modules/vpc"
TEST_DIR="test/unit"
# Create test directory
mkdir -p "$TEST_DIR"
# Test case 1: Valid configuration
cat > "$TEST_DIR/valid-config.tf" << EOF
module "vpc_test" {
source = "../../$MODULE_DIR"
name = "test-vpc"
cidr_block = "10.0.0.0/16"
availability_zones = ["us-west-2a", "us-west-2b"]
environment = "dev"
subnet_configuration = {
public_subnets = ["10.0.1.0/24", "10.0.2.0/24"]
private_subnets = ["10.0.11.0/24", "10.0.12.0/24"]
}
}
output "test_outputs" {
value = {
vpc_id = module.vpc_test.vpc_id
public_subnet_ids = module.vpc_test.public_subnet_ids
private_subnet_ids = module.vpc_test.private_subnet_ids
}
}
EOF
# Test case 2: Invalid CIDR
cat > "$TEST_DIR/invalid-cidr.tf" << EOF
module "vpc_test_invalid" {
source = "../../$MODULE_DIR"
name = "test-vpc"
cidr_block = "invalid-cidr"
availability_zones = ["us-west-2a", "us-west-2b"]
environment = "dev"
subnet_configuration = {
public_subnets = ["10.0.1.0/24", "10.0.2.0/24"]
private_subnets = ["10.0.11.0/24", "10.0.12.0/24"]
}
}
EOF
echo "Testing valid configuration..."
cd "$TEST_DIR"
terraform init -backend=false
terraform validate
terraform plan -out=valid.tfplan
echo "Testing invalid configuration..."
if terraform validate -var-file=<(echo 'cidr_block = "invalid-cidr"') 2>/dev/null; then
echo "ERROR: Invalid configuration should have failed validation"
exit 1
else
echo "SUCCESS: Invalid configuration correctly rejected"
fi
echo "All unit tests passed!"
Testing Local Values and Expressions
Test complex local value calculations:
# modules/networking/locals-test.tf
locals {
# Test subnet CIDR calculation
test_vpc_cidr = "10.0.0.0/16"
test_az_count = 3
# Calculate subnet CIDRs
public_subnet_cidrs = [
for i in range(local.test_az_count) :
cidrsubnet(local.test_vpc_cidr, 8, i + 1)
]
private_subnet_cidrs = [
for i in range(local.test_az_count) :
cidrsubnet(local.test_vpc_cidr, 8, i + 11)
]
# Test naming conventions
resource_names = {
for i in range(local.test_az_count) :
"subnet-${i}" => {
public = "public-subnet-${i + 1}"
private = "private-subnet-${i + 1}"
}
}
# Test conditional logic
environment_config = {
dev = {
instance_type = "t3.micro"
min_size = 1
max_size = 3
}
prod = {
instance_type = "t3.large"
min_size = 3
max_size = 10
}
}
selected_config = local.environment_config[var.environment]
}
# Output calculated values for testing
output "calculated_subnets" {
value = {
public_cidrs = local.public_subnet_cidrs
private_cidrs = local.private_subnet_cidrs
}
}
output "resource_names" {
value = local.resource_names
}
output "environment_config" {
value = local.selected_config
}
Mock Testing with Null Resources
Use null resources to test logic without creating real infrastructure:
# test/unit/mock-test.tf
variable "test_scenarios" {
description = "Test scenarios for validation"
type = map(object({
environment = string
region = string
az_count = number
}))
default = {
scenario_1 = {
environment = "dev"
region = "us-west-2"
az_count = 2
}
scenario_2 = {
environment = "prod"
region = "us-east-1"
az_count = 3
}
}
}
# Mock data sources
locals {
mock_availability_zones = {
"us-west-2" = ["us-west-2a", "us-west-2b", "us-west-2c"]
"us-east-1" = ["us-east-1a", "us-east-1b", "us-east-1c"]
}
}
# Test module logic with null resources
resource "null_resource" "test_scenarios" {
for_each = var.test_scenarios
triggers = {
environment = each.value.environment
region = each.value.region
az_count = each.value.az_count
# Test subnet calculation
vpc_cidr = "10.0.0.0/16"
public_subnets = jsonencode([
for i in range(each.value.az_count) :
cidrsubnet("10.0.0.0/16", 8, i + 1)
])
private_subnets = jsonencode([
for i in range(each.value.az_count) :
cidrsubnet("10.0.0.0/16", 8, i + 11)
])
# Test naming
resource_prefix = "${each.value.environment}-${each.value.region}"
# Test availability zones
selected_azs = jsonencode(slice(
local.mock_availability_zones[each.value.region],
0,
each.value.az_count
))
}
}
output "test_results" {
value = {
for scenario, resource in null_resource.test_scenarios :
scenario => {
environment = resource.triggers.environment
region = resource.triggers.region
vpc_cidr = resource.triggers.vpc_cidr
public_subnets = jsondecode(resource.triggers.public_subnets)
private_subnets = jsondecode(resource.triggers.private_subnets)
resource_prefix = resource.triggers.resource_prefix
selected_azs = jsondecode(resource.triggers.selected_azs)
}
}
}
Testing with Terraform Console
Use Terraform console for interactive testing:
# test-console.sh
#!/bin/bash
# Start terraform console with test variables
terraform console << 'EOF'
# Test CIDR calculations
cidrsubnet("10.0.0.0/16", 8, 1)
cidrsubnet("10.0.0.0/16", 8, 11)
# Test list operations
[for i in range(3) : "subnet-${i + 1}"]
# Test conditional expressions
"dev" == "prod" ? "t3.large" : "t3.micro"
# Test validation functions
can(cidrhost("10.0.0.0/16", 0))
can(cidrhost("invalid-cidr", 0))
# Test string operations
replace("my-resource-name", "-", "_")
upper("environment")
lower("PRODUCTION")
# Test map operations
merge({"a" = 1}, {"b" = 2})
# Test complex expressions
{
for env in ["dev", "staging", "prod"] :
env => {
instance_type = env == "prod" ? "t3.large" : "t3.micro"
min_size = env == "prod" ? 3 : 1
}
}
EOF
Automated Unit Test Suite
Create an automated test suite for your modules:
#!/usr/bin/env python3
# test_terraform_modules.py
import subprocess
import json
import os
import tempfile
import shutil
from pathlib import Path
class TerraformModuleTester:
def __init__(self, module_path):
self.module_path = Path(module_path)
self.test_dir = None
def setup_test_environment(self):
"""Create temporary test directory"""
self.test_dir = Path(tempfile.mkdtemp())
return self.test_dir
def cleanup_test_environment(self):
"""Clean up temporary test directory"""
if self.test_dir and self.test_dir.exists():
shutil.rmtree(self.test_dir)
def create_test_config(self, config_content):
"""Create test configuration file"""
config_file = self.test_dir / "test.tf"
config_file.write_text(config_content)
return config_file
def run_terraform_command(self, command, cwd=None):
"""Run terraform command and return result"""
if cwd is None:
cwd = self.test_dir
try:
result = subprocess.run(
["terraform"] + command,
cwd=cwd,
capture_output=True,
text=True,
check=True
)
return {"success": True, "stdout": result.stdout, "stderr": result.stderr}
except subprocess.CalledProcessError as e:
return {"success": False, "stdout": e.stdout, "stderr": e.stderr}
def test_valid_configuration(self, config):
"""Test that valid configuration passes validation"""
self.setup_test_environment()
try:
self.create_test_config(config)
# Initialize
init_result = self.run_terraform_command(["init", "-backend=false"])
if not init_result["success"]:
return False, f"Init failed: {init_result['stderr']}"
# Validate
validate_result = self.run_terraform_command(["validate"])
if not validate_result["success"]:
return False, f"Validation failed: {validate_result['stderr']}"
# Plan
plan_result = self.run_terraform_command(["plan", "-out=test.tfplan"])
if not plan_result["success"]:
return False, f"Plan failed: {plan_result['stderr']}"
return True, "Configuration is valid"
finally:
self.cleanup_test_environment()
def test_invalid_configuration(self, config, expected_error=None):
"""Test that invalid configuration fails validation"""
self.setup_test_environment()
try:
self.create_test_config(config)
# Initialize
init_result = self.run_terraform_command(["init", "-backend=false"])
if not init_result["success"]:
return True, f"Init correctly failed: {init_result['stderr']}"
# Validate
validate_result = self.run_terraform_command(["validate"])
if not validate_result["success"]:
if expected_error and expected_error in validate_result["stderr"]:
return True, f"Validation correctly failed with expected error"
return True, f"Validation correctly failed: {validate_result['stderr']}"
return False, "Configuration should have failed validation"
finally:
self.cleanup_test_environment()
def test_output_values(self, config, expected_outputs):
"""Test that outputs match expected values"""
self.setup_test_environment()
try:
self.create_test_config(config)
# Initialize and plan
self.run_terraform_command(["init", "-backend=false"])
plan_result = self.run_terraform_command(["plan", "-out=test.tfplan"])
if not plan_result["success"]:
return False, f"Plan failed: {plan_result['stderr']}"
# Get planned outputs
show_result = self.run_terraform_command(["show", "-json", "test.tfplan"])
if not show_result["success"]:
return False, f"Show failed: {show_result['stderr']}"
plan_data = json.loads(show_result["stdout"])
planned_outputs = plan_data.get("planned_values", {}).get("outputs", {})
# Compare outputs
for output_name, expected_value in expected_outputs.items():
if output_name not in planned_outputs:
return False, f"Output '{output_name}' not found"
actual_value = planned_outputs[output_name]["value"]
if actual_value != expected_value:
return False, f"Output '{output_name}': expected {expected_value}, got {actual_value}"
return True, "All outputs match expected values"
finally:
self.cleanup_test_environment()
# Test cases
def test_vpc_module():
tester = TerraformModuleTester("modules/vpc")
# Test valid configuration
valid_config = '''
module "vpc_test" {
source = "../../modules/vpc"
name = "test-vpc"
cidr_block = "10.0.0.0/16"
availability_zones = ["us-west-2a", "us-west-2b"]
environment = "dev"
}
output "vpc_cidr" {
value = module.vpc_test.vpc_cidr_block
}
'''
success, message = tester.test_valid_configuration(valid_config)
print(f"Valid configuration test: {'PASS' if success else 'FAIL'} - {message}")
# Test invalid CIDR
invalid_config = '''
module "vpc_test" {
source = "../../modules/vpc"
name = "test-vpc"
cidr_block = "invalid-cidr"
availability_zones = ["us-west-2a", "us-west-2b"]
environment = "dev"
}
'''
success, message = tester.test_invalid_configuration(invalid_config, "valid CIDR block")
print(f"Invalid CIDR test: {'PASS' if success else 'FAIL'} - {message}")
if __name__ == "__main__":
test_vpc_module()
Property-Based Testing
Use property-based testing for comprehensive validation:
#!/usr/bin/env python3
# property_based_tests.py
import hypothesis
from hypothesis import given, strategies as st
import ipaddress
import subprocess
import tempfile
import json
# Property-based test for CIDR calculations
@given(
vpc_cidr=st.from_regex(r"10\.\d{1,3}\.\d{1,3}\.0/16"),
subnet_count=st.integers(min_value=1, max_value=10)
)
def test_subnet_cidr_calculation(vpc_cidr, subnet_count):
"""Test that subnet CIDR calculations are valid"""
# Validate VPC CIDR
try:
vpc_network = ipaddress.IPv4Network(vpc_cidr)
except ValueError:
return # Skip invalid CIDR
# Calculate subnet CIDRs (simulating Terraform logic)
subnet_cidrs = []
for i in range(subnet_count):
try:
subnet = list(vpc_network.subnets(new_prefix=24))[i]
subnet_cidrs.append(str(subnet))
except IndexError:
break # Not enough subnets available
# Verify all subnets are within VPC CIDR
for subnet_cidr in subnet_cidrs:
subnet_network = ipaddress.IPv4Network(subnet_cidr)
assert subnet_network.subnet_of(vpc_network), f"Subnet {subnet_cidr} not within VPC {vpc_cidr}"
# Verify no subnet overlap
for i, subnet1 in enumerate(subnet_cidrs):
for subnet2 in subnet_cidrs[i+1:]:
net1 = ipaddress.IPv4Network(subnet1)
net2 = ipaddress.IPv4Network(subnet2)
assert not net1.overlaps(net2), f"Subnets {subnet1} and {subnet2} overlap"
# Property-based test for resource naming
@given(
environment=st.sampled_from(["dev", "staging", "prod"]),
region=st.sampled_from(["us-west-2", "us-east-1", "eu-west-1"]),
resource_type=st.sampled_from(["vpc", "subnet", "sg", "instance"])
)
def test_resource_naming_convention(environment, region, resource_type):
"""Test that resource names follow conventions"""
# Simulate Terraform naming logic
resource_name = f"{environment}-{region}-{resource_type}"
# Verify naming conventions
assert len(resource_name) <= 63, "Resource name too long"
assert resource_name.replace("-", "").replace("_", "").isalnum(), "Resource name contains invalid characters"
assert not resource_name.startswith("-"), "Resource name cannot start with hyphen"
assert not resource_name.endswith("-"), "Resource name cannot end with hyphen"
if __name__ == "__main__":
# Run property-based tests
test_subnet_cidr_calculation()
test_resource_naming_convention()
print("All property-based tests passed!")
What’s Next
Unit testing strategies help you catch logic errors and validate module behavior without provisioning real infrastructure. However, some issues only surface when your modules interact with actual cloud services and real network conditions.
In the next part, we’ll explore integration testing with Terratest and other tools that provision real cloud resources to validate that your infrastructure works correctly in practice.
Integration Testing
Integration testing validates that your Terraform modules work correctly with real cloud resources, handling the complexity of actual API interactions, network configurations, and service dependencies. While unit tests catch logic errors, integration tests ensure your infrastructure actually functions as intended in real environments.
This part covers comprehensive integration testing strategies using Terratest, custom testing frameworks, and cloud-native testing approaches.
Terratest Fundamentals
Terratest is the most popular framework for testing Terraform modules with real infrastructure:
// test/integration/vpc_test.go
package test
import (
"testing"
"github.com/gruntwork-io/terratest/modules/terraform"
"github.com/gruntwork-io/terratest/modules/aws"
"github.com/stretchr/testify/assert"
)
func TestVPCModule(t *testing.T) {
t.Parallel()
// Pick a random AWS region to test in
awsRegion := aws.GetRandomStableRegion(t, nil, nil)
terraformOptions := &terraform.Options{
TerraformDir: "../modules/vpc",
Vars: map[string]interface{}{
"name": "test-vpc",
"cidr_block": "10.0.0.0/16",
"availability_zones": []string{awsRegion + "a", awsRegion + "b"},
"environment": "test",
},
EnvVars: map[string]string{
"AWS_DEFAULT_REGION": awsRegion,
},
}
// Clean up resources with "defer"
defer terraform.Destroy(t, terraformOptions)
// Deploy the infrastructure
terraform.InitAndApply(t, terraformOptions)
// Validate the infrastructure
vpcId := terraform.Output(t, terraformOptions, "vpc_id")
assert.NotEmpty(t, vpcId)
publicSubnetIds := terraform.OutputList(t, terraformOptions, "public_subnet_ids")
assert.Len(t, publicSubnetIds, 2)
privateSubnetIds := terraform.OutputList(t, terraformOptions, "private_subnet_ids")
assert.Len(t, privateSubnetIds, 2)
// Validate VPC properties using AWS SDK
vpc := aws.GetVpcById(t, vpcId, awsRegion)
assert.Equal(t, "10.0.0.0/16", *vpc.CidrBlock)
assert.True(t, *vpc.EnableDnsHostnames)
assert.True(t, *vpc.EnableDnsSupport)
// Validate subnets
for _, subnetId := range publicSubnetIds {
subnet := aws.GetSubnetById(t, subnetId, awsRegion)
assert.True(t, *subnet.MapPublicIpOnLaunch)
assert.Contains(t, []string{awsRegion + "a", awsRegion + "b"}, *subnet.AvailabilityZone)
}
for _, subnetId := range privateSubnetIds {
subnet := aws.GetSubnetById(t, subnetId, awsRegion)
assert.False(t, *subnet.MapPublicIpOnLaunch)
}
}
Testing Complex Infrastructure
Test complete application stacks with multiple components:
// test/integration/complete_app_test.go
package test
import (
"fmt"
"testing"
"time"
"github.com/gruntwork-io/terratest/modules/terraform"
"github.com/gruntwork-io/terratest/modules/aws"
"github.com/gruntwork-io/terratest/modules/http-helper"
"github.com/gruntwork-io/terratest/modules/retry"
"github.com/stretchr/testify/assert"
)
func TestCompleteApplication(t *testing.T) {
t.Parallel()
awsRegion := aws.GetRandomStableRegion(t, nil, nil)
terraformOptions := &terraform.Options{
TerraformDir: "../examples/complete-app",
Vars: map[string]interface{}{
"name": "test-app",
"environment": "test",
"region": awsRegion,
},
EnvVars: map[string]string{
"AWS_DEFAULT_REGION": awsRegion,
},
}
defer terraform.Destroy(t, terraformOptions)
terraform.InitAndApply(t, terraformOptions)
// Test VPC
vpcId := terraform.Output(t, terraformOptions, "vpc_id")
assert.NotEmpty(t, vpcId)
// Test RDS
dbEndpoint := terraform.Output(t, terraformOptions, "database_endpoint")
assert.NotEmpty(t, dbEndpoint)
// Test Load Balancer
albDnsName := terraform.Output(t, terraformOptions, "load_balancer_dns_name")
assert.NotEmpty(t, albDnsName)
// Test application health endpoint
url := fmt.Sprintf("http://%s/health", albDnsName)
// Retry the health check as the application may take time to start
retry.DoWithRetry(t, "Check application health", 30, 10*time.Second, func() (string, error) {
statusCode, body := http_helper.HttpGet(t, url, nil)
if statusCode != 200 {
return "", fmt.Errorf("Expected status 200, got %d", statusCode)
}
assert.Contains(t, body, "healthy")
return body, nil
})
// Test database connectivity through application
dbTestUrl := fmt.Sprintf("http://%s/db-test", albDnsName)
statusCode, body := http_helper.HttpGet(t, dbTestUrl, nil)
assert.Equal(t, 200, statusCode)
assert.Contains(t, body, "database_connected")
}
Testing with Multiple Environments
Test modules across different environment configurations:
// test/integration/multi_environment_test.go
package test
import (
"testing"
"github.com/gruntwork-io/terratest/modules/terraform"
"github.com/gruntwork-io/terratest/modules/aws"
"github.com/stretchr/testify/assert"
)
func TestMultiEnvironmentDeployment(t *testing.T) {
environments := []struct {
name string
instanceType string
minSize int
maxSize int
}{
{"dev", "t3.micro", 1, 3},
{"staging", "t3.small", 2, 5},
{"prod", "t3.medium", 3, 10},
}
for _, env := range environments {
env := env // Capture range variable
t.Run(env.name, func(t *testing.T) {
t.Parallel()
awsRegion := aws.GetRandomStableRegion(t, nil, nil)
terraformOptions := &terraform.Options{
TerraformDir: "../modules/auto-scaling-group",
Vars: map[string]interface{}{
"name": fmt.Sprintf("test-asg-%s", env.name),
"environment": env.name,
"instance_type": env.instanceType,
"min_size": env.minSize,
"max_size": env.maxSize,
"vpc_id": getTestVpcId(t, awsRegion),
"subnet_ids": getTestSubnetIds(t, awsRegion),
},
EnvVars: map[string]string{
"AWS_DEFAULT_REGION": awsRegion,
},
}
defer terraform.Destroy(t, terraformOptions)
terraform.InitAndApply(t, terraformOptions)
// Validate Auto Scaling Group
asgName := terraform.Output(t, terraformOptions, "asg_name")
asg := aws.GetAsgByName(t, asgName, awsRegion)
assert.Equal(t, int64(env.minSize), *asg.MinSize)
assert.Equal(t, int64(env.maxSize), *asg.MaxSize)
// Validate Launch Template
launchTemplateId := terraform.Output(t, terraformOptions, "launch_template_id")
launchTemplate := aws.GetLaunchTemplate(t, launchTemplateId, awsRegion)
assert.Equal(t, env.instanceType, *launchTemplate.LaunchTemplateData.InstanceType)
})
}
}
Testing Security Configurations
Validate security group rules and IAM policies:
// test/integration/security_test.go
package test
import (
"testing"
"github.com/gruntwork-io/terratest/modules/terraform"
"github.com/gruntwork-io/terratest/modules/aws"
"github.com/stretchr/testify/assert"
)
func TestSecurityGroupConfiguration(t *testing.T) {
t.Parallel()
awsRegion := aws.GetRandomStableRegion(t, nil, nil)
terraformOptions := &terraform.Options{
TerraformDir: "../modules/web-security-group",
Vars: map[string]interface{}{
"name": "test-web-sg",
"vpc_id": getTestVpcId(t, awsRegion),
"allowed_cidr_blocks": []string{"10.0.0.0/8"},
},
EnvVars: map[string]string{
"AWS_DEFAULT_REGION": awsRegion,
},
}
defer terraform.Destroy(t, terraformOptions)
terraform.InitAndApply(t, terraformOptions)
// Get security group
sgId := terraform.Output(t, terraformOptions, "security_group_id")
sg := aws.GetSecurityGroupById(t, sgId, awsRegion)
// Validate ingress rules
assert.Len(t, sg.IpPermissions, 2) // HTTP and HTTPS
for _, rule := range sg.IpPermissions {
if *rule.FromPort == 80 {
assert.Equal(t, int64(80), *rule.ToPort)
assert.Equal(t, "tcp", *rule.IpProtocol)
assert.Len(t, rule.IpRanges, 1)
assert.Equal(t, "10.0.0.0/8", *rule.IpRanges[0].CidrIp)
} else if *rule.FromPort == 443 {
assert.Equal(t, int64(443), *rule.ToPort)
assert.Equal(t, "tcp", *rule.IpProtocol)
}
}
// Validate egress rules
assert.Len(t, sg.IpPermissionsEgress, 1)
egressRule := sg.IpPermissionsEgress[0]
assert.Equal(t, int64(0), *egressRule.FromPort)
assert.Equal(t, int64(0), *egressRule.ToPort)
assert.Equal(t, "-1", *egressRule.IpProtocol)
}
func TestIAMRoleConfiguration(t *testing.T) {
t.Parallel()
terraformOptions := &terraform.Options{
TerraformDir: "../modules/iam-role",
Vars: map[string]interface{}{
"role_name": "test-role",
"service": "ec2.amazonaws.com",
"policies": []string{
"arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess",
},
},
}
defer terraform.Destroy(t, terraformOptions)
terraform.InitAndApply(t, terraformOptions)
// Validate IAM role
roleName := terraform.Output(t, terraformOptions, "role_name")
role := aws.GetIamRole(t, roleName)
assert.Equal(t, roleName, *role.RoleName)
assert.Contains(t, *role.AssumeRolePolicyDocument, "ec2.amazonaws.com")
// Validate attached policies
attachedPolicies := aws.GetIamRoleAttachedPolicies(t, roleName)
assert.Len(t, attachedPolicies, 1)
assert.Equal(t, "AmazonS3ReadOnlyAccess", *attachedPolicies[0].PolicyName)
}
Performance and Load Testing
Test infrastructure under load:
// test/integration/performance_test.go
package test
import (
"fmt"
"testing"
"time"
"sync"
"github.com/gruntwork-io/terratest/modules/terraform"
"github.com/gruntwork-io/terratest/modules/http-helper"
"github.com/stretchr/testify/assert"
)
func TestLoadBalancerPerformance(t *testing.T) {
t.Parallel()
terraformOptions := &terraform.Options{
TerraformDir: "../examples/load-balanced-app",
Vars: map[string]interface{}{
"name": "perf-test-app",
"environment": "test",
"min_capacity": 3,
"max_capacity": 10,
},
}
defer terraform.Destroy(t, terraformOptions)
terraform.InitAndApply(t, terraformOptions)
// Get load balancer DNS name
albDnsName := terraform.Output(t, terraformOptions, "load_balancer_dns_name")
url := fmt.Sprintf("http://%s/", albDnsName)
// Wait for application to be ready
http_helper.HttpGetWithRetry(t, url, nil, 200, "OK", 30, 10*time.Second)
// Perform load test
concurrentRequests := 50
requestsPerWorker := 20
var wg sync.WaitGroup
results := make(chan int, concurrentRequests*requestsPerWorker)
for i := 0; i < concurrentRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < requestsPerWorker; j++ {
statusCode, _ := http_helper.HttpGet(t, url, nil)
results <- statusCode
}
}()
}
wg.Wait()
close(results)
// Analyze results
successCount := 0
totalRequests := 0
for statusCode := range results {
totalRequests++
if statusCode == 200 {
successCount++
}
}
successRate := float64(successCount) / float64(totalRequests)
assert.GreaterOrEqual(t, successRate, 0.95, "Success rate should be at least 95%")
t.Logf("Load test results: %d/%d requests successful (%.2f%%)",
successCount, totalRequests, successRate*100)
}
Custom Testing Framework
Build a custom testing framework for specific needs:
#!/usr/bin/env python3
# custom_terraform_tester.py
import boto3
import subprocess
import json
import time
import requests
from typing import Dict, List, Any, Optional
class TerraformIntegrationTester:
def __init__(self, terraform_dir: str, aws_region: str = "us-west-2"):
self.terraform_dir = terraform_dir
self.aws_region = aws_region
self.aws_session = boto3.Session(region_name=aws_region)
self.outputs = {}
def deploy(self, variables: Dict[str, Any]) -> bool:
"""Deploy infrastructure with Terraform"""
try:
# Create tfvars file
tfvars_content = "\n".join([
f'{key} = {json.dumps(value)}'
for key, value in variables.items()
])
with open(f"{self.terraform_dir}/test.tfvars", "w") as f:
f.write(tfvars_content)
# Initialize
subprocess.run(
["terraform", "init"],
cwd=self.terraform_dir,
check=True,
capture_output=True
)
# Apply
result = subprocess.run(
["terraform", "apply", "-var-file=test.tfvars", "-auto-approve"],
cwd=self.terraform_dir,
check=True,
capture_output=True,
text=True
)
# Get outputs
output_result = subprocess.run(
["terraform", "output", "-json"],
cwd=self.terraform_dir,
check=True,
capture_output=True,
text=True
)
self.outputs = json.loads(output_result.stdout)
return True
except subprocess.CalledProcessError as e:
print(f"Terraform deployment failed: {e.stderr}")
return False
def destroy(self) -> bool:
"""Destroy infrastructure"""
try:
subprocess.run(
["terraform", "destroy", "-var-file=test.tfvars", "-auto-approve"],
cwd=self.terraform_dir,
check=True,
capture_output=True
)
return True
except subprocess.CalledProcessError as e:
print(f"Terraform destroy failed: {e.stderr}")
return False
def get_output(self, key: str) -> Any:
"""Get Terraform output value"""
return self.outputs.get(key, {}).get("value")
def test_vpc_configuration(self, expected_cidr: str) -> bool:
"""Test VPC configuration"""
vpc_id = self.get_output("vpc_id")
if not vpc_id:
return False
ec2 = self.aws_session.client("ec2")
response = ec2.describe_vpcs(VpcIds=[vpc_id])
if not response["Vpcs"]:
return False
vpc = response["Vpcs"][0]
return vpc["CidrBlock"] == expected_cidr
def test_application_health(self, timeout: int = 300) -> bool:
"""Test application health endpoint"""
load_balancer_dns = self.get_output("load_balancer_dns_name")
if not load_balancer_dns:
return False
url = f"http://{load_balancer_dns}/health"
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
return True
except requests.RequestException:
pass
time.sleep(10)
return False
def test_database_connectivity(self) -> bool:
"""Test database connectivity"""
db_endpoint = self.get_output("database_endpoint")
if not db_endpoint:
return False
# This would typically involve connecting to the database
# and running a simple query
rds = self.aws_session.client("rds")
try:
response = rds.describe_db_instances()
for db in response["DBInstances"]:
if db["Endpoint"]["Address"] == db_endpoint:
return db["DBInstanceStatus"] == "available"
except Exception as e:
print(f"Database connectivity test failed: {e}")
return False
# Example usage
def test_complete_application():
tester = TerraformIntegrationTester("../examples/complete-app")
variables = {
"name": "integration-test",
"environment": "test",
"instance_type": "t3.micro",
"min_size": 2,
"max_size": 4
}
try:
# Deploy
assert tester.deploy(variables), "Deployment failed"
# Test VPC
assert tester.test_vpc_configuration("10.0.0.0/16"), "VPC test failed"
# Test application
assert tester.test_application_health(), "Application health test failed"
# Test database
assert tester.test_database_connectivity(), "Database test failed"
print("All integration tests passed!")
finally:
# Clean up
tester.destroy()
if __name__ == "__main__":
test_complete_application()
What’s Next
Integration testing validates that your infrastructure works correctly with real cloud resources, but ensuring compliance and implementing governance requires policy-based validation that goes beyond functional testing.
In the next part, we’ll explore policy as code using Open Policy Agent (OPA) and Sentinel to implement automated governance, compliance validation, and security policy enforcement for your Terraform configurations.
Policy as Code
Policy as code transforms governance from manual reviews to automated enforcement, ensuring that infrastructure changes comply with organizational standards before they reach production. Open Policy Agent (OPA) and HashiCorp Sentinel provide powerful frameworks for implementing policy validation that integrates seamlessly with Terraform workflows.
This part covers implementing comprehensive policy frameworks that enforce security, compliance, and operational standards across your Terraform configurations.
Open Policy Agent (OPA) Fundamentals
OPA uses the Rego language to define policies that can evaluate JSON data:
# policies/terraform/security.rego
package terraform.security
# Deny resources that allow unrestricted SSH access
deny[msg] {
resource := input.planned_values.root_module.resources[_]
resource.type == "aws_security_group_rule"
resource.values.type == "ingress"
resource.values.from_port <= 22
resource.values.to_port >= 22
resource.values.cidr_blocks[_] == "0.0.0.0/0"
msg := sprintf("Security group rule allows SSH from anywhere: %v", [resource.address])
}
# Require encryption for S3 buckets
deny[msg] {
resource := input.planned_values.root_module.resources[_]
resource.type == "aws_s3_bucket"
not has_encryption(resource)
msg := sprintf("S3 bucket must have encryption enabled: %v", [resource.address])
}
has_encryption(resource) {
resource.values.server_side_encryption_configuration[_]
}
# Require specific tags
required_tags := ["Environment", "Project", "Owner"]
deny[msg] {
resource := input.planned_values.root_module.resources[_]
resource.type == "aws_instance"
missing_tags := required_tags - object.get(resource.values, "tags", {})
count(missing_tags) > 0
msg := sprintf("Resource missing required tags %v: %v", [missing_tags, resource.address])
}
# Cost control policies
deny[msg] {
resource := input.planned_values.root_module.resources[_]
resource.type == "aws_instance"
expensive_types := ["m5.4xlarge", "m5.8xlarge", "m5.12xlarge", "m5.16xlarge"]
resource.values.instance_type in expensive_types
msg := sprintf("Instance type %v is not allowed: %v", [resource.values.instance_type, resource.address])
}
Advanced OPA Policies
Implement complex governance rules:
# policies/terraform/compliance.rego
package terraform.compliance
import future.keywords.in
# GDPR compliance - ensure data residency
deny[msg] {
resource := input.planned_values.root_module.resources[_]
resource.type in ["aws_s3_bucket", "aws_db_instance", "aws_rds_cluster"]
# Check if resource is in EU region for GDPR compliance
provider_config := input.configuration.provider_config.aws
region := provider_config.expressions.region.constant_value
not startswith(region, "eu-")
# Check if resource handles personal data
tags := object.get(resource.values, "tags", {})
tags.DataClassification in ["personal", "sensitive"]
msg := sprintf("GDPR: Personal data resource must be in EU region: %v", [resource.address])
}
# SOC2 compliance - audit logging
deny[msg] {
resource := input.planned_values.root_module.resources[_]
resource.type == "aws_s3_bucket"
not has_access_logging(resource)
msg := sprintf("SOC2: S3 bucket must have access logging enabled: %v", [resource.address])
}
has_access_logging(resource) {
resource.values.logging[_]
}
# PCI DSS compliance - network segmentation
deny[msg] {
resource := input.planned_values.root_module.resources[_]
resource.type == "aws_security_group_rule"
resource.values.type == "ingress"
# Check if this is a PCI environment
tags := object.get(resource.values, "tags", {})
tags.PCIScope == "true"
# Ensure no broad network access in PCI scope
"0.0.0.0/0" in resource.values.cidr_blocks
msg := sprintf("PCI DSS: Broad network access not allowed in PCI scope: %v", [resource.address])
}
# Data retention policies
deny[msg] {
resource := input.planned_values.root_module.resources[_]
resource.type == "aws_s3_bucket"
tags := object.get(resource.values, "tags", {})
data_retention := object.get(tags, "DataRetention", "")
# Require data retention policy for certain data types
tags.DataType in ["logs", "backups", "archives"]
data_retention == ""
msg := sprintf("Data retention policy required for %v: %v", [tags.DataType, resource.address])
}
Testing OPA Policies
Create comprehensive tests for your policies:
# policies/terraform/security_test.rego
package terraform.security
# Test SSH restriction policy
test_deny_ssh_from_anywhere {
deny[_] with input as {
"planned_values": {
"root_module": {
"resources": [{
"address": "aws_security_group_rule.bad_ssh",
"type": "aws_security_group_rule",
"values": {
"type": "ingress",
"from_port": 22,
"to_port": 22,
"cidr_blocks": ["0.0.0.0/0"]
}
}]
}
}
}
}
test_allow_ssh_from_specific_cidr {
count(deny) == 0 with input as {
"planned_values": {
"root_module": {
"resources": [{
"address": "aws_security_group_rule.good_ssh",
"type": "aws_security_group_rule",
"values": {
"type": "ingress",
"from_port": 22,
"to_port": 22,
"cidr_blocks": ["10.0.0.0/8"]
}
}]
}
}
}
}
# Test S3 encryption policy
test_deny_unencrypted_s3 {
deny[_] with input as {
"planned_values": {
"root_module": {
"resources": [{
"address": "aws_s3_bucket.unencrypted",
"type": "aws_s3_bucket",
"values": {
"bucket": "my-bucket"
}
}]
}
}
}
}
test_allow_encrypted_s3 {
count(deny) == 0 with input as {
"planned_values": {
"root_module": {
"resources": [{
"address": "aws_s3_bucket.encrypted",
"type": "aws_s3_bucket",
"values": {
"bucket": "my-bucket",
"server_side_encryption_configuration": [{
"rule": [{
"apply_server_side_encryption_by_default": [{
"sse_algorithm": "AES256"
}]
}]
}]
}
}]
}
}
}
}
# Test required tags policy
test_deny_missing_required_tags {
deny[_] with input as {
"planned_values": {
"root_module": {
"resources": [{
"address": "aws_instance.no_tags",
"type": "aws_instance",
"values": {
"instance_type": "t3.micro",
"tags": {
"Name": "test-instance"
}
}
}]
}
}
}
}
test_allow_all_required_tags {
count(deny) == 0 with input as {
"planned_values": {
"root_module": {
"resources": [{
"address": "aws_instance.with_tags",
"type": "aws_instance",
"values": {
"instance_type": "t3.micro",
"tags": {
"Name": "test-instance",
"Environment": "dev",
"Project": "test-project",
"Owner": "team-name"
}
}
}]
}
}
}
}
Sentinel Policies
HashiCorp Sentinel provides another powerful policy framework:
# policies/sentinel/aws-security.sentinel
import "tfplan/v2" as tfplan
import "strings"
# Get all AWS security group rules
security_group_rules = filter tfplan.resource_changes as _, rc {
rc.type is "aws_security_group_rule" and
rc.mode is "managed" and
(rc.change.actions contains "create" or rc.change.actions contains "update")
}
# Function to check if SSH is open to the world
ssh_open_to_world = func(rule) {
return rule.change.after.type is "ingress" and
rule.change.after.from_port <= 22 and
rule.change.after.to_port >= 22 and
"0.0.0.0/0" in rule.change.after.cidr_blocks
}
# Main rule
main = rule {
all security_group_rules as _, rule {
not ssh_open_to_world(rule)
}
}
# Violation messages
violations = [
{
"resource": rule.address,
"message": "SSH (port 22) should not be open to 0.0.0.0/0"
} for rule in security_group_rules if ssh_open_to_world(rule)
]
# Print violations
print("SSH Security Violations:")
for violations as violation {
print(" -", violation.resource, ":", violation.message)
}
Policy Integration with CI/CD
Integrate policy validation into your CI/CD pipeline:
#!/bin/bash
# scripts/policy-check.sh
set -e
TERRAFORM_DIR=${1:-"infrastructure"}
POLICY_DIR=${2:-"policies"}
echo "Running policy validation on Terraform configurations..."
# Find all Terraform directories
find "$TERRAFORM_DIR" -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
echo "Checking policies for $dir"
cd "$dir"
# Generate Terraform plan
terraform init -backend=false
terraform plan -out=plan.tfplan
terraform show -json plan.tfplan > plan.json
# Run OPA policy evaluation
echo "Running OPA policy checks..."
opa eval -d "../../$POLICY_DIR" -i plan.json "data.terraform.deny[x]" --format pretty
# Check if there are any policy violations
violations=$(opa eval -d "../../$POLICY_DIR" -i plan.json "data.terraform.deny[x]" --format raw)
if [ "$violations" != "[]" ]; then
echo "❌ Policy violations found in $dir"
echo "$violations" | jq -r '.[]'
exit 1
else
echo "✅ No policy violations found in $dir"
fi
cd - > /dev/null
done
echo "All policy checks passed!"
Dynamic Policy Configuration
Create policies that adapt to different environments:
# policies/terraform/environment_policies.rego
package terraform.environment
import future.keywords.in
# Environment-specific configurations
environment_config := {
"dev": {
"allowed_instance_types": ["t3.micro", "t3.small"],
"max_instance_count": 5,
"require_encryption": false
},
"staging": {
"allowed_instance_types": ["t3.small", "t3.medium", "m5.large"],
"max_instance_count": 10,
"require_encryption": true
},
"prod": {
"allowed_instance_types": ["t3.medium", "t3.large", "m5.large", "m5.xlarge"],
"max_instance_count": 50,
"require_encryption": true
}
}
# Get environment from tags or variables
get_environment(resource) = env {
env := resource.values.tags.Environment
} else = env {
# Fallback to terraform variables
env := input.variables.environment.value
} else = "dev" {
# Default to dev if no environment specified
true
}
# Check instance type restrictions
deny[msg] {
resource := input.planned_values.root_module.resources[_]
resource.type == "aws_instance"
env := get_environment(resource)
config := environment_config[env]
not resource.values.instance_type in config.allowed_instance_types
msg := sprintf("Instance type %v not allowed in %v environment. Allowed types: %v",
[resource.values.instance_type, env, config.allowed_instance_types])
}
# Check instance count limits
deny[msg] {
env := input.variables.environment.value
config := environment_config[env]
instances := [r | r := input.planned_values.root_module.resources[_]; r.type == "aws_instance"]
count(instances) > config.max_instance_count
msg := sprintf("Too many instances (%v) for %v environment. Maximum allowed: %v",
[count(instances), env, config.max_instance_count])
}
# Environment-specific encryption requirements
deny[msg] {
resource := input.planned_values.root_module.resources[_]
resource.type == "aws_s3_bucket"
env := get_environment(resource)
config := environment_config[env]
config.require_encryption == true
not has_encryption(resource)
msg := sprintf("S3 bucket encryption required in %v environment: %v", [env, resource.address])
}
has_encryption(resource) {
resource.values.server_side_encryption_configuration[_]
}
Policy Reporting and Metrics
Generate comprehensive policy compliance reports:
#!/usr/bin/env python3
# scripts/policy_report.py
import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
class PolicyReporter:
def __init__(self, terraform_dir, policy_dir):
self.terraform_dir = Path(terraform_dir)
self.policy_dir = Path(policy_dir)
self.results = []
def run_policy_check(self, tf_dir):
"""Run OPA policy check on a Terraform directory"""
try:
# Generate plan
subprocess.run(
["terraform", "init", "-backend=false"],
cwd=tf_dir,
check=True,
capture_output=True
)
subprocess.run(
["terraform", "plan", "-out=plan.tfplan"],
cwd=tf_dir,
check=True,
capture_output=True
)
subprocess.run(
["terraform", "show", "-json", "plan.tfplan"],
cwd=tf_dir,
check=True,
capture_output=True,
stdout=open(tf_dir / "plan.json", "w")
)
# Run OPA evaluation
result = subprocess.run(
["opa", "eval", "-d", str(self.policy_dir),
"-i", str(tf_dir / "plan.json"),
"data.terraform.deny[x]", "--format", "json"],
capture_output=True,
text=True
)
violations = json.loads(result.stdout)
return {
"directory": str(tf_dir.relative_to(self.terraform_dir)),
"violations": violations.get("result", []),
"status": "failed" if violations.get("result") else "passed"
}
except subprocess.CalledProcessError as e:
return {
"directory": str(tf_dir.relative_to(self.terraform_dir)),
"violations": [f"Error running policy check: {e}"],
"status": "error"
}
def generate_report(self):
"""Generate comprehensive policy compliance report"""
# Find all Terraform directories
tf_dirs = []
for tf_file in self.terraform_dir.rglob("*.tf"):
tf_dirs.append(tf_file.parent)
tf_dirs = list(set(tf_dirs)) # Remove duplicates
# Run policy checks
for tf_dir in tf_dirs:
result = self.run_policy_check(tf_dir)
self.results.append(result)
# Generate summary
total_dirs = len(self.results)
passed_dirs = len([r for r in self.results if r["status"] == "passed"])
failed_dirs = len([r for r in self.results if r["status"] == "failed"])
error_dirs = len([r for r in self.results if r["status"] == "error"])
total_violations = sum(len(r["violations"]) for r in self.results)
report = {
"timestamp": datetime.now().isoformat(),
"summary": {
"total_directories": total_dirs,
"passed": passed_dirs,
"failed": failed_dirs,
"errors": error_dirs,
"total_violations": total_violations,
"compliance_rate": (passed_dirs / total_dirs * 100) if total_dirs > 0 else 0
},
"results": self.results
}
return report
def save_report(self, report, filename="policy_report.json"):
"""Save report to file"""
with open(filename, "w") as f:
json.dump(report, f, indent=2)
print(f"Policy report saved to {filename}")
def print_summary(self, report):
"""Print report summary to console"""
summary = report["summary"]
print("\n" + "="*50)
print("POLICY COMPLIANCE REPORT")
print("="*50)
print(f"Timestamp: {report['timestamp']}")
print(f"Total Directories: {summary['total_directories']}")
print(f"Passed: {summary['passed']}")
print(f"Failed: {summary['failed']}")
print(f"Errors: {summary['errors']}")
print(f"Total Violations: {summary['total_violations']}")
print(f"Compliance Rate: {summary['compliance_rate']:.1f}%")
if summary['failed'] > 0:
print("\nFAILED DIRECTORIES:")
for result in report['results']:
if result['status'] == 'failed':
print(f" - {result['directory']}: {len(result['violations'])} violations")
for violation in result['violations']:
print(f" • {violation}")
print("="*50)
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: python3 policy_report.py <terraform_dir> <policy_dir>")
sys.exit(1)
reporter = PolicyReporter(sys.argv[1], sys.argv[2])
report = reporter.generate_report()
reporter.save_report(report)
reporter.print_summary(report)
# Exit with error code if there are violations
if report["summary"]["failed"] > 0 or report["summary"]["errors"] > 0:
sys.exit(1)
What’s Next
Policy as code transforms infrastructure governance from reactive reviews to proactive enforcement, ensuring compliance and security standards are met before resources are created. The combination of OPA and Sentinel provides powerful frameworks for implementing comprehensive governance that scales with your organization.
In the next part, we’ll explore security and compliance testing, including vulnerability scanning, compliance validation, and automated security assessments that complement policy enforcement with deeper security analysis.
Security and Compliance
Security testing goes beyond policy validation to identify vulnerabilities, misconfigurations, and compliance gaps in your infrastructure code. Automated security scanning catches issues that manual reviews might miss, while compliance testing ensures your infrastructure meets regulatory requirements like SOC 2, GDPR, and industry-specific standards.
This part covers comprehensive security testing strategies that integrate with your Terraform workflow to identify and remediate security issues before they reach production.
Infrastructure Security Scanning
Use specialized tools to scan for security vulnerabilities:
#!/bin/bash
# scripts/security-scan.sh
set -e
TERRAFORM_DIR=${1:-"infrastructure"}
REPORT_DIR=${2:-"security-reports"}
mkdir -p "$REPORT_DIR"
echo "Running comprehensive security scan on Terraform configurations..."
# Checkov - comprehensive security scanning
echo "Running Checkov security scan..."
checkov -d "$TERRAFORM_DIR" \
--framework terraform \
--output cli \
--output json \
--output-file-path console,"$REPORT_DIR/checkov-report.json" \
--soft-fail
# TFSec - Terraform-specific security scanner
echo "Running TFSec security scan..."
tfsec "$TERRAFORM_DIR" \
--format json \
--out "$REPORT_DIR/tfsec-report.json" \
--soft-fail
# Terrascan - policy-based security scanner
echo "Running Terrascan security scan..."
terrascan scan -t terraform \
-d "$TERRAFORM_DIR" \
-o json \
--output "$REPORT_DIR/terrascan-report.json" \
--non-recursive
# Custom security checks
echo "Running custom security validations..."
python3 scripts/custom_security_checks.py \
--terraform-dir "$TERRAFORM_DIR" \
--output "$REPORT_DIR/custom-security.json"
echo "Security scan complete. Reports saved to $REPORT_DIR/"
Custom Security Validation
Implement organization-specific security checks:
#!/usr/bin/env python3
# scripts/custom_security_checks.py
import json
import os
import re
import argparse
from pathlib import Path
class SecurityValidator:
def __init__(self, terraform_dir):
self.terraform_dir = Path(terraform_dir)
self.findings = []
def check_hardcoded_secrets(self):
"""Check for hardcoded secrets in Terraform files"""
secret_patterns = [
(r'password\s*=\s*"[^"]{8,}"', "Hardcoded password detected"),
(r'secret_key\s*=\s*"[A-Za-z0-9+/]{20,}"', "Hardcoded secret key detected"),
(r'api_key\s*=\s*"[A-Za-z0-9]{20,}"', "Hardcoded API key detected"),
(r'token\s*=\s*"[A-Za-z0-9]{20,}"', "Hardcoded token detected"),
]
for tf_file in self.terraform_dir.rglob("*.tf"):
content = tf_file.read_text()
for pattern, message in secret_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
line_num = content[:match.start()].count('\n') + 1
self.findings.append({
"type": "hardcoded_secret",
"severity": "HIGH",
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": line_num,
"message": message,
"code": match.group(0)
})
def check_public_resources(self):
"""Check for resources that might be publicly accessible"""
public_patterns = [
(r'cidr_blocks\s*=\s*\["0\.0\.0\.0/0"\]', "Resource allows access from anywhere"),
(r'publicly_accessible\s*=\s*true', "Resource is publicly accessible"),
(r'public_read_access\s*=\s*true', "Resource allows public read access"),
]
for tf_file in self.terraform_dir.rglob("*.tf"):
content = tf_file.read_text()
for pattern, message in public_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
line_num = content[:match.start()].count('\n') + 1
self.findings.append({
"type": "public_access",
"severity": "MEDIUM",
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": line_num,
"message": message,
"code": match.group(0)
})
def check_encryption_settings(self):
"""Check for missing encryption configurations"""
for tf_file in self.terraform_dir.rglob("*.tf"):
content = tf_file.read_text()
# Check S3 buckets without encryption
s3_buckets = re.finditer(r'resource\s+"aws_s3_bucket"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
for bucket in s3_buckets:
bucket_config = bucket.group(1)
if "server_side_encryption_configuration" not in bucket_config:
line_num = content[:bucket.start()].count('\n') + 1
self.findings.append({
"type": "missing_encryption",
"severity": "HIGH",
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": line_num,
"message": "S3 bucket missing encryption configuration",
"resource": bucket.group(0).split('"')[3]
})
# Check RDS instances without encryption
rds_instances = re.finditer(r'resource\s+"aws_db_instance"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
for instance in rds_instances:
instance_config = instance.group(1)
if "storage_encrypted" not in instance_config or "storage_encrypted = false" in instance_config:
line_num = content[:instance.start()].count('\n') + 1
self.findings.append({
"type": "missing_encryption",
"severity": "HIGH",
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": line_num,
"message": "RDS instance missing encryption",
"resource": instance.group(0).split('"')[3]
})
def check_network_security(self):
"""Check for network security issues"""
for tf_file in self.terraform_dir.rglob("*.tf"):
content = tf_file.read_text()
# Check for overly permissive security groups
sg_rules = re.finditer(r'resource\s+"aws_security_group_rule"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
for rule in sg_rules:
rule_config = rule.group(1)
# Check for SSH open to world
if ('from_port = 22' in rule_config and
'to_port = 22' in rule_config and
'cidr_blocks = ["0.0.0.0/0"]' in rule_config):
line_num = content[:rule.start()].count('\n') + 1
self.findings.append({
"type": "network_security",
"severity": "CRITICAL",
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": line_num,
"message": "SSH port open to the world",
"resource": rule.group(0).split('"')[3]
})
# Check for RDP open to world
if ('from_port = 3389' in rule_config and
'to_port = 3389' in rule_config and
'cidr_blocks = ["0.0.0.0/0"]' in rule_config):
line_num = content[:rule.start()].count('\n') + 1
self.findings.append({
"type": "network_security",
"severity": "CRITICAL",
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": line_num,
"message": "RDP port open to the world",
"resource": rule.group(0).split('"')[3]
})
def run_all_checks(self):
"""Run all security checks"""
self.check_hardcoded_secrets()
self.check_public_resources()
self.check_encryption_settings()
self.check_network_security()
return {
"total_findings": len(self.findings),
"critical": len([f for f in self.findings if f["severity"] == "CRITICAL"]),
"high": len([f for f in self.findings if f["severity"] == "HIGH"]),
"medium": len([f for f in self.findings if f["severity"] == "MEDIUM"]),
"low": len([f for f in self.findings if f["severity"] == "LOW"]),
"findings": self.findings
}
def main():
parser = argparse.ArgumentParser(description='Custom Terraform security validator')
parser.add_argument('--terraform-dir', required=True, help='Terraform directory to scan')
parser.add_argument('--output', required=True, help='Output file for results')
args = parser.parse_args()
validator = SecurityValidator(args.terraform_dir)
results = validator.run_all_checks()
with open(args.output, 'w') as f:
json.dump(results, f, indent=2)
print(f"Security validation complete. Found {results['total_findings']} issues.")
print(f"Critical: {results['critical']}, High: {results['high']}, Medium: {results['medium']}, Low: {results['low']}")
# Exit with error if critical or high severity issues found
if results['critical'] > 0 or results['high'] > 0:
exit(1)
if __name__ == "__main__":
main()
Compliance Framework Testing
Implement automated compliance validation:
#!/usr/bin/env python3
# scripts/compliance_validator.py
import json
import re
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class ComplianceCheck:
framework: str
control_id: str
description: str
severity: str
check_function: callable
class ComplianceValidator:
def __init__(self, terraform_dir: str):
self.terraform_dir = Path(terraform_dir)
self.findings = []
self.checks = self._initialize_checks()
def _initialize_checks(self) -> List[ComplianceCheck]:
"""Initialize compliance checks for various frameworks"""
return [
# SOC 2 Type II checks
ComplianceCheck(
framework="SOC2",
control_id="CC6.1",
description="Logical and physical access controls",
severity="HIGH",
check_function=self._check_access_controls
),
ComplianceCheck(
framework="SOC2",
control_id="CC6.7",
description="Data transmission and disposal",
severity="HIGH",
check_function=self._check_data_encryption
),
# GDPR checks
ComplianceCheck(
framework="GDPR",
control_id="Art.32",
description="Security of processing",
severity="CRITICAL",
check_function=self._check_data_security
),
ComplianceCheck(
framework="GDPR",
control_id="Art.17",
description="Right to erasure",
severity="MEDIUM",
check_function=self._check_data_retention
),
# PCI DSS checks
ComplianceCheck(
framework="PCI_DSS",
control_id="1.1.4",
description="Network segmentation",
severity="CRITICAL",
check_function=self._check_network_segmentation
),
ComplianceCheck(
framework="PCI_DSS",
control_id="3.4",
description="Encryption of cardholder data",
severity="CRITICAL",
check_function=self._check_cardholder_data_encryption
),
]
def _check_access_controls(self) -> List[Dict[str, Any]]:
"""SOC 2 - Check for proper access controls"""
findings = []
for tf_file in self.terraform_dir.rglob("*.tf"):
content = tf_file.read_text()
# Check for IAM policies with overly broad permissions
iam_policies = re.finditer(r'resource\s+"aws_iam_policy"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
for policy in iam_policies:
policy_config = policy.group(1)
if '"*"' in policy_config and '"Action"' in policy_config:
findings.append({
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": content[:policy.start()].count('\n') + 1,
"message": "IAM policy grants overly broad permissions",
"resource": policy.group(0).split('"')[3]
})
return findings
def _check_data_encryption(self) -> List[Dict[str, Any]]:
"""SOC 2 - Check for data encryption in transit and at rest"""
findings = []
for tf_file in self.terraform_dir.rglob("*.tf"):
content = tf_file.read_text()
# Check S3 buckets for encryption
s3_buckets = re.finditer(r'resource\s+"aws_s3_bucket"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
for bucket in s3_buckets:
bucket_config = bucket.group(1)
if "server_side_encryption_configuration" not in bucket_config:
findings.append({
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": content[:bucket.start()].count('\n') + 1,
"message": "S3 bucket lacks encryption at rest",
"resource": bucket.group(0).split('"')[3]
})
# Check ALB listeners for HTTPS
alb_listeners = re.finditer(r'resource\s+"aws_lb_listener"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
for listener in alb_listeners:
listener_config = listener.group(1)
if 'protocol = "HTTP"' in listener_config and 'port = "80"' in listener_config:
findings.append({
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": content[:listener.start()].count('\n') + 1,
"message": "Load balancer listener uses unencrypted HTTP",
"resource": listener.group(0).split('"')[3]
})
return findings
def _check_data_security(self) -> List[Dict[str, Any]]:
"""GDPR - Check for data security measures"""
findings = []
for tf_file in self.terraform_dir.rglob("*.tf"):
content = tf_file.read_text()
# Check for resources handling personal data without encryption
resources_with_personal_data = re.finditer(
r'resource\s+"[^"]+"\s+"[^"]+"\s*{([^}]+tags\s*=\s*{[^}]*DataClassification\s*=\s*"personal"[^}]*}[^}]*)}',
content, re.DOTALL
)
for resource in resources_with_personal_data:
resource_config = resource.group(1)
if ("encryption" not in resource_config.lower() and
"kms" not in resource_config.lower()):
findings.append({
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": content[:resource.start()].count('\n') + 1,
"message": "Resource handling personal data lacks encryption",
"resource": resource.group(0).split('"')[3]
})
return findings
def _check_data_retention(self) -> List[Dict[str, Any]]:
"""GDPR - Check for data retention policies"""
findings = []
for tf_file in self.terraform_dir.rglob("*.tf"):
content = tf_file.read_text()
# Check S3 buckets for lifecycle policies
s3_buckets = re.finditer(r'resource\s+"aws_s3_bucket"\s+"[^"]+"\s*{([^}]+)}', content, re.DOTALL)
for bucket in s3_buckets:
bucket_name = bucket.group(0).split('"')[3]
# Look for corresponding lifecycle configuration
lifecycle_pattern = f'resource\\s+"aws_s3_bucket_lifecycle_configuration"\\s+"[^"]*{bucket_name}[^"]*"'
if not re.search(lifecycle_pattern, content):
findings.append({
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": content[:bucket.start()].count('\n') + 1,
"message": "S3 bucket lacks data retention policy",
"resource": bucket_name
})
return findings
def _check_network_segmentation(self) -> List[Dict[str, Any]]:
"""PCI DSS - Check for proper network segmentation"""
findings = []
for tf_file in self.terraform_dir.rglob("*.tf"):
content = tf_file.read_text()
# Check for PCI-scoped resources without proper network isolation
pci_resources = re.finditer(
r'resource\s+"[^"]+"\s+"[^"]+"\s*{([^}]+tags\s*=\s*{[^}]*PCIScope\s*=\s*"true"[^}]*}[^}]*)}',
content, re.DOTALL
)
for resource in pci_resources:
resource_config = resource.group(1)
# Check if resource is in a dedicated VPC or subnet
if ("vpc_id" not in resource_config and
"subnet_id" not in resource_config):
findings.append({
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": content[:resource.start()].count('\n') + 1,
"message": "PCI-scoped resource lacks network segmentation",
"resource": resource.group(0).split('"')[3]
})
return findings
def _check_cardholder_data_encryption(self) -> List[Dict[str, Any]]:
"""PCI DSS - Check encryption of cardholder data"""
findings = []
for tf_file in self.terraform_dir.rglob("*.tf"):
content = tf_file.read_text()
# Check databases that might store cardholder data
db_instances = re.finditer(
r'resource\s+"aws_db_instance"\s+"[^"]+"\s*{([^}]+tags\s*=\s*{[^}]*CardholderData\s*=\s*"true"[^}]*}[^}]*)}',
content, re.DOTALL
)
for db in db_instances:
db_config = db.group(1)
if "storage_encrypted = true" not in db_config:
findings.append({
"file": str(tf_file.relative_to(self.terraform_dir)),
"line": content[:db.start()].count('\n') + 1,
"message": "Database storing cardholder data is not encrypted",
"resource": db.group(0).split('"')[3]
})
return findings
def run_compliance_checks(self) -> Dict[str, Any]:
"""Run all compliance checks"""
results = {
"frameworks": {},
"total_findings": 0,
"critical": 0,
"high": 0,
"medium": 0,
"low": 0
}
for check in self.checks:
framework_findings = check.check_function()
if check.framework not in results["frameworks"]:
results["frameworks"][check.framework] = {
"controls": {},
"total_findings": 0
}
results["frameworks"][check.framework]["controls"][check.control_id] = {
"description": check.description,
"severity": check.severity,
"findings": framework_findings,
"compliant": len(framework_findings) == 0
}
results["frameworks"][check.framework]["total_findings"] += len(framework_findings)
results["total_findings"] += len(framework_findings)
# Count by severity
severity_count = len(framework_findings)
if check.severity == "CRITICAL":
results["critical"] += severity_count
elif check.severity == "HIGH":
results["high"] += severity_count
elif check.severity == "MEDIUM":
results["medium"] += severity_count
else:
results["low"] += severity_count
return results
def main():
import argparse
parser = argparse.ArgumentParser(description='Terraform compliance validator')
parser.add_argument('--terraform-dir', required=True, help='Terraform directory to validate')
parser.add_argument('--output', required=True, help='Output file for results')
parser.add_argument('--frameworks', nargs='+', default=['SOC2', 'GDPR', 'PCI_DSS'],
help='Compliance frameworks to check')
args = parser.parse_args()
validator = ComplianceValidator(args.terraform_dir)
results = validator.run_compliance_checks()
# Filter results by requested frameworks
if args.frameworks:
filtered_frameworks = {k: v for k, v in results["frameworks"].items()
if k in args.frameworks}
results["frameworks"] = filtered_frameworks
with open(args.output, 'w') as f:
json.dump(results, f, indent=2)
print(f"Compliance validation complete.")
print(f"Total findings: {results['total_findings']}")
print(f"Critical: {results['critical']}, High: {results['high']}, Medium: {results['medium']}")
for framework, data in results["frameworks"].items():
compliant_controls = sum(1 for control in data["controls"].values() if control["compliant"])
total_controls = len(data["controls"])
compliance_rate = (compliant_controls / total_controls * 100) if total_controls > 0 else 0
print(f"{framework}: {compliance_rate:.1f}% compliant ({compliant_controls}/{total_controls} controls)")
if __name__ == "__main__":
main()
What’s Next
Security and compliance testing provides deep validation of your infrastructure’s security posture and regulatory compliance. Combined with policy as code, these testing strategies create a comprehensive security validation framework that catches issues early in the development cycle.
In the next part, we’ll explore performance and cost testing techniques that validate not just the functionality and security of your infrastructure, but also its efficiency, scalability, and cost-effectiveness.
Performance and Cost Testing
Performance and cost testing ensure your infrastructure is not only functional and secure, but also efficient and cost-effective. These tests validate resource sizing, identify optimization opportunities, and prevent cost overruns before they impact your budget. Automated cost analysis and performance validation help maintain operational efficiency as your infrastructure scales.
This part covers comprehensive strategies for testing infrastructure performance characteristics and validating cost implications of your Terraform configurations.
Cost Impact Analysis
Analyze the cost implications of infrastructure changes:
#!/usr/bin/env python3
# scripts/cost_impact_analyzer.py
import json
import re
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class ResourceCost:
resource_type: str
resource_name: str
monthly_cost: float
annual_cost: float
cost_factors: Dict[str, any]
class CostAnalyzer:
def __init__(self):
# AWS pricing data (simplified - in practice, use AWS Pricing API)
self.pricing_data = {
"aws_instance": {
"t3.micro": {"hourly": 0.0104, "monthly": 7.59},
"t3.small": {"hourly": 0.0208, "monthly": 15.18},
"t3.medium": {"hourly": 0.0416, "monthly": 30.37},
"t3.large": {"hourly": 0.0832, "monthly": 60.74},
"m5.large": {"hourly": 0.096, "monthly": 70.08},
"m5.xlarge": {"hourly": 0.192, "monthly": 140.16},
},
"aws_rds_instance": {
"db.t3.micro": {"hourly": 0.017, "monthly": 12.41},
"db.t3.small": {"hourly": 0.034, "monthly": 24.82},
"db.r5.large": {"hourly": 0.24, "monthly": 175.20},
"db.r5.xlarge": {"hourly": 0.48, "monthly": 350.40},
},
"aws_s3_bucket": {
"standard": {"per_gb_monthly": 0.023},
"ia": {"per_gb_monthly": 0.0125},
"glacier": {"per_gb_monthly": 0.004},
},
"aws_ebs_volume": {
"gp3": {"per_gb_monthly": 0.08},
"gp2": {"per_gb_monthly": 0.10},
"io1": {"per_gb_monthly": 0.125, "per_iops_monthly": 0.065},
}
}
def analyze_terraform_plan(self, plan_file: str) -> Dict[str, any]:
"""Analyze Terraform plan for cost implications"""
with open(plan_file, 'r') as f:
plan_data = json.load(f)
resource_costs = []
total_monthly_cost = 0
# Analyze planned resources
if 'planned_values' in plan_data and 'root_module' in plan_data['planned_values']:
resources = plan_data['planned_values']['root_module'].get('resources', [])
for resource in resources:
cost = self._calculate_resource_cost(resource)
if cost:
resource_costs.append(cost)
total_monthly_cost += cost.monthly_cost
# Analyze resource changes
cost_changes = self._analyze_cost_changes(plan_data.get('resource_changes', []))
return {
"total_monthly_cost": total_monthly_cost,
"total_annual_cost": total_monthly_cost * 12,
"resource_costs": [
{
"resource_type": rc.resource_type,
"resource_name": rc.resource_name,
"monthly_cost": rc.monthly_cost,
"annual_cost": rc.annual_cost,
"cost_factors": rc.cost_factors
}
for rc in resource_costs
],
"cost_changes": cost_changes,
"cost_breakdown": self._generate_cost_breakdown(resource_costs)
}
def _calculate_resource_cost(self, resource: Dict) -> Optional[ResourceCost]:
"""Calculate cost for a specific resource"""
resource_type = resource.get('type')
resource_name = resource.get('name', 'unknown')
values = resource.get('values', {})
if resource_type == 'aws_instance':
return self._calculate_ec2_cost(resource_name, values)
elif resource_type == 'aws_rds_instance':
return self._calculate_rds_cost(resource_name, values)
elif resource_type == 'aws_s3_bucket':
return self._calculate_s3_cost(resource_name, values)
elif resource_type == 'aws_ebs_volume':
return self._calculate_ebs_cost(resource_name, values)
return None
def _calculate_ec2_cost(self, name: str, values: Dict) -> Optional[ResourceCost]:
"""Calculate EC2 instance cost"""
instance_type = values.get('instance_type')
if not instance_type or instance_type not in self.pricing_data['aws_instance']:
return None
pricing = self.pricing_data['aws_instance'][instance_type]
monthly_cost = pricing['monthly']
# Adjust for additional costs
if values.get('ebs_optimized'):
monthly_cost *= 1.1 # 10% premium for EBS optimization
return ResourceCost(
resource_type='aws_instance',
resource_name=name,
monthly_cost=monthly_cost,
annual_cost=monthly_cost * 12,
cost_factors={
'instance_type': instance_type,
'ebs_optimized': values.get('ebs_optimized', False),
'hourly_rate': pricing['hourly']
}
)
def _calculate_rds_cost(self, name: str, values: Dict) -> Optional[ResourceCost]:
"""Calculate RDS instance cost"""
instance_class = values.get('instance_class')
if not instance_class or instance_class not in self.pricing_data['aws_rds_instance']:
return None
pricing = self.pricing_data['aws_rds_instance'][instance_class]
monthly_cost = pricing['monthly']
# Adjust for Multi-AZ
if values.get('multi_az'):
monthly_cost *= 2
# Add storage cost
allocated_storage = values.get('allocated_storage', 20)
storage_cost = allocated_storage * 0.115 # GP2 storage cost per GB
monthly_cost += storage_cost
return ResourceCost(
resource_type='aws_rds_instance',
resource_name=name,
monthly_cost=monthly_cost,
annual_cost=monthly_cost * 12,
cost_factors={
'instance_class': instance_class,
'multi_az': values.get('multi_az', False),
'allocated_storage': allocated_storage,
'storage_cost': storage_cost
}
)
def _calculate_s3_cost(self, name: str, values: Dict) -> ResourceCost:
"""Calculate S3 bucket cost (estimated)"""
# S3 cost depends on usage, so we provide estimates
estimated_gb = 100 # Default estimate
storage_class = 'standard' # Default
pricing = self.pricing_data['aws_s3_bucket'][storage_class]
monthly_cost = estimated_gb * pricing['per_gb_monthly']
return ResourceCost(
resource_type='aws_s3_bucket',
resource_name=name,
monthly_cost=monthly_cost,
annual_cost=monthly_cost * 12,
cost_factors={
'estimated_storage_gb': estimated_gb,
'storage_class': storage_class,
'per_gb_cost': pricing['per_gb_monthly']
}
)
def _calculate_ebs_cost(self, name: str, values: Dict) -> Optional[ResourceCost]:
"""Calculate EBS volume cost"""
volume_type = values.get('type', 'gp3')
size = values.get('size', 8)
if volume_type not in self.pricing_data['aws_ebs_volume']:
return None
pricing = self.pricing_data['aws_ebs_volume'][volume_type]
monthly_cost = size * pricing['per_gb_monthly']
# Add IOPS cost for io1 volumes
if volume_type == 'io1':
iops = values.get('iops', 100)
monthly_cost += iops * pricing['per_iops_monthly']
return ResourceCost(
resource_type='aws_ebs_volume',
resource_name=name,
monthly_cost=monthly_cost,
annual_cost=monthly_cost * 12,
cost_factors={
'volume_type': volume_type,
'size_gb': size,
'iops': values.get('iops') if volume_type == 'io1' else None
}
)
def _analyze_cost_changes(self, resource_changes: List[Dict]) -> Dict[str, any]:
"""Analyze cost impact of resource changes"""
changes = {
"new_resources": 0,
"modified_resources": 0,
"destroyed_resources": 0,
"cost_increase": 0,
"cost_decrease": 0
}
for change in resource_changes:
actions = change.get('change', {}).get('actions', [])
if 'create' in actions:
changes["new_resources"] += 1
# Estimate cost increase for new resources
if change.get('type') == 'aws_instance':
instance_type = change.get('change', {}).get('after', {}).get('instance_type')
if instance_type in self.pricing_data['aws_instance']:
changes["cost_increase"] += self.pricing_data['aws_instance'][instance_type]['monthly']
elif 'update' in actions:
changes["modified_resources"] += 1
elif 'delete' in actions:
changes["destroyed_resources"] += 1
# Estimate cost decrease for destroyed resources
if change.get('type') == 'aws_instance':
instance_type = change.get('change', {}).get('before', {}).get('instance_type')
if instance_type in self.pricing_data['aws_instance']:
changes["cost_decrease"] += self.pricing_data['aws_instance'][instance_type]['monthly']
changes["net_cost_change"] = changes["cost_increase"] - changes["cost_decrease"]
return changes
def _generate_cost_breakdown(self, resource_costs: List[ResourceCost]) -> Dict[str, any]:
"""Generate cost breakdown by resource type"""
breakdown = {}
for cost in resource_costs:
if cost.resource_type not in breakdown:
breakdown[cost.resource_type] = {
"count": 0,
"monthly_cost": 0,
"annual_cost": 0
}
breakdown[cost.resource_type]["count"] += 1
breakdown[cost.resource_type]["monthly_cost"] += cost.monthly_cost
breakdown[cost.resource_type]["annual_cost"] += cost.annual_cost
return breakdown
def main():
import argparse
parser = argparse.ArgumentParser(description='Terraform cost impact analyzer')
parser.add_argument('--plan-file', required=True, help='Terraform plan JSON file')
parser.add_argument('--budget-limit', type=float, help='Monthly budget limit for validation')
parser.add_argument('--output', required=True, help='Output file for cost analysis')
args = parser.parse_args()
analyzer = CostAnalyzer()
analysis = analyzer.analyze_terraform_plan(args.plan_file)
with open(args.output, 'w') as f:
json.dump(analysis, f, indent=2)
print(f"Cost Analysis Complete:")
print(f" Monthly Cost: ${analysis['total_monthly_cost']:.2f}")
print(f" Annual Cost: ${analysis['total_annual_cost']:.2f}")
if args.budget_limit:
if analysis['total_monthly_cost'] > args.budget_limit:
print(f" ⚠️ BUDGET EXCEEDED: ${analysis['total_monthly_cost']:.2f} > ${args.budget_limit:.2f}")
exit(1)
else:
print(f" ✅ Within budget: ${analysis['total_monthly_cost']:.2f} <= ${args.budget_limit:.2f}")
print("\nCost Breakdown:")
for resource_type, breakdown in analysis['cost_breakdown'].items():
print(f" {resource_type}: {breakdown['count']} resources, ${breakdown['monthly_cost']:.2f}/month")
if __name__ == "__main__":
main()
Performance Testing Framework
Test infrastructure performance characteristics:
#!/usr/bin/env python3
# scripts/performance_tester.py
import json
import time
import concurrent.futures
import requests
from dataclasses import dataclass
from typing import List, Dict, Optional
import boto3
@dataclass
class PerformanceMetric:
metric_name: str
value: float
unit: str
threshold: Optional[float] = None
passed: Optional[bool] = None
class InfrastructurePerformanceTester:
def __init__(self, terraform_outputs: Dict):
self.outputs = terraform_outputs
self.metrics = []
def test_web_application_performance(self, url: str, concurrent_users: int = 10, duration: int = 60):
"""Test web application performance under load"""
print(f"Testing web application performance: {url}")
def make_request():
try:
start_time = time.time()
response = requests.get(url, timeout=30)
end_time = time.time()
return {
'response_time': end_time - start_time,
'status_code': response.status_code,
'success': response.status_code == 200
}
except Exception as e:
return {
'response_time': 30.0,
'status_code': 0,
'success': False,
'error': str(e)
}
# Run load test
results = []
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
while time.time() - start_time < duration:
futures = [executor.submit(make_request) for _ in range(concurrent_users)]
batch_results = [future.result() for future in concurrent.futures.as_completed(futures)]
results.extend(batch_results)
time.sleep(1) # 1 second between batches
# Analyze results
successful_requests = [r for r in results if r['success']]
response_times = [r['response_time'] for r in successful_requests]
if response_times:
avg_response_time = sum(response_times) / len(response_times)
max_response_time = max(response_times)
min_response_time = min(response_times)
p95_response_time = sorted(response_times)[int(len(response_times) * 0.95)]
else:
avg_response_time = max_response_time = min_response_time = p95_response_time = 0
success_rate = len(successful_requests) / len(results) * 100 if results else 0
# Add metrics
self.metrics.extend([
PerformanceMetric("avg_response_time", avg_response_time, "seconds", 2.0),
PerformanceMetric("max_response_time", max_response_time, "seconds", 5.0),
PerformanceMetric("p95_response_time", p95_response_time, "seconds", 3.0),
PerformanceMetric("success_rate", success_rate, "percent", 95.0),
PerformanceMetric("total_requests", len(results), "count"),
])
return {
'total_requests': len(results),
'successful_requests': len(successful_requests),
'success_rate': success_rate,
'avg_response_time': avg_response_time,
'max_response_time': max_response_time,
'min_response_time': min_response_time,
'p95_response_time': p95_response_time
}
def test_database_performance(self, db_endpoint: str, db_name: str):
"""Test database performance"""
print(f"Testing database performance: {db_endpoint}")
# This would typically involve connecting to the database
# and running performance tests. For this example, we'll
# use CloudWatch metrics instead.
try:
cloudwatch = boto3.client('cloudwatch')
# Get recent database metrics
end_time = time.time()
start_time = end_time - 3600 # Last hour
metrics_to_check = [
('CPUUtilization', 'AWS/RDS'),
('DatabaseConnections', 'AWS/RDS'),
('ReadLatency', 'AWS/RDS'),
('WriteLatency', 'AWS/RDS'),
]
db_metrics = {}
for metric_name, namespace in metrics_to_check:
response = cloudwatch.get_metric_statistics(
Namespace=namespace,
MetricName=metric_name,
Dimensions=[
{
'Name': 'DBInstanceIdentifier',
'Value': db_name
}
],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average', 'Maximum']
)
if response['Datapoints']:
latest_datapoint = max(response['Datapoints'], key=lambda x: x['Timestamp'])
db_metrics[metric_name] = {
'average': latest_datapoint['Average'],
'maximum': latest_datapoint['Maximum']
}
# Add database performance metrics
if 'CPUUtilization' in db_metrics:
self.metrics.append(
PerformanceMetric("db_cpu_utilization", db_metrics['CPUUtilization']['average'], "percent", 80.0)
)
if 'ReadLatency' in db_metrics:
self.metrics.append(
PerformanceMetric("db_read_latency", db_metrics['ReadLatency']['average'] * 1000, "milliseconds", 20.0)
)
if 'WriteLatency' in db_metrics:
self.metrics.append(
PerformanceMetric("db_write_latency", db_metrics['WriteLatency']['average'] * 1000, "milliseconds", 50.0)
)
return db_metrics
except Exception as e:
print(f"Error testing database performance: {e}")
return {}
def test_auto_scaling_performance(self, asg_name: str):
"""Test auto scaling group performance"""
print(f"Testing auto scaling performance: {asg_name}")
try:
autoscaling = boto3.client('autoscaling')
cloudwatch = boto3.client('cloudwatch')
# Get ASG details
response = autoscaling.describe_auto_scaling_groups(
AutoScalingGroupNames=[asg_name]
)
if not response['AutoScalingGroups']:
return {}
asg = response['AutoScalingGroups'][0]
# Check scaling metrics
end_time = time.time()
start_time = end_time - 3600 # Last hour
# Get CloudWatch metrics for the ASG
response = cloudwatch.get_metric_statistics(
Namespace='AWS/AutoScaling',
MetricName='GroupTotalInstances',
Dimensions=[
{
'Name': 'AutoScalingGroupName',
'Value': asg_name
}
],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average', 'Maximum', 'Minimum']
)
scaling_metrics = {}
if response['Datapoints']:
latest_datapoint = max(response['Datapoints'], key=lambda x: x['Timestamp'])
scaling_metrics = {
'current_instances': latest_datapoint['Average'],
'max_instances': latest_datapoint['Maximum'],
'min_instances': latest_datapoint['Minimum']
}
# Add scaling performance metrics
self.metrics.extend([
PerformanceMetric("asg_current_capacity", asg['DesiredCapacity'], "count"),
PerformanceMetric("asg_min_size", asg['MinSize'], "count"),
PerformanceMetric("asg_max_size", asg['MaxSize'], "count"),
])
return {
'asg_name': asg_name,
'desired_capacity': asg['DesiredCapacity'],
'min_size': asg['MinSize'],
'max_size': asg['MaxSize'],
'current_instances': len(asg['Instances']),
'scaling_metrics': scaling_metrics
}
except Exception as e:
print(f"Error testing auto scaling performance: {e}")
return {}
def evaluate_performance_thresholds(self):
"""Evaluate all metrics against their thresholds"""
for metric in self.metrics:
if metric.threshold is not None:
if metric.metric_name.endswith('_rate'):
# For rates, higher is better
metric.passed = metric.value >= metric.threshold
elif 'latency' in metric.metric_name or 'response_time' in metric.metric_name:
# For latency/response time, lower is better
metric.passed = metric.value <= metric.threshold
elif 'utilization' in metric.metric_name:
# For utilization, lower is better (below threshold)
metric.passed = metric.value <= metric.threshold
else:
# Default: lower is better
metric.passed = metric.value <= metric.threshold
def generate_performance_report(self) -> Dict:
"""Generate comprehensive performance report"""
self.evaluate_performance_thresholds()
passed_metrics = [m for m in self.metrics if m.passed is True]
failed_metrics = [m for m in self.metrics if m.passed is False]
return {
'timestamp': time.time(),
'total_metrics': len(self.metrics),
'passed_metrics': len(passed_metrics),
'failed_metrics': len(failed_metrics),
'success_rate': len(passed_metrics) / len(self.metrics) * 100 if self.metrics else 0,
'metrics': [
{
'name': m.metric_name,
'value': m.value,
'unit': m.unit,
'threshold': m.threshold,
'passed': m.passed
}
for m in self.metrics
],
'failed_tests': [
{
'name': m.metric_name,
'value': m.value,
'threshold': m.threshold,
'unit': m.unit
}
for m in failed_metrics
]
}
def main():
import argparse
parser = argparse.ArgumentParser(description='Infrastructure performance tester')
parser.add_argument('--terraform-outputs', required=True, help='Terraform outputs JSON file')
parser.add_argument('--output', required=True, help='Output file for performance report')
parser.add_argument('--load-test-duration', type=int, default=60, help='Load test duration in seconds')
parser.add_argument('--concurrent-users', type=int, default=10, help='Number of concurrent users for load testing')
args = parser.parse_args()
# Load Terraform outputs
with open(args.terraform_outputs, 'r') as f:
terraform_outputs = json.load(f)
tester = InfrastructurePerformanceTester(terraform_outputs)
# Run performance tests based on available outputs
if 'load_balancer_dns_name' in terraform_outputs:
url = f"http://{terraform_outputs['load_balancer_dns_name']['value']}"
tester.test_web_application_performance(
url,
args.concurrent_users,
args.load_test_duration
)
if 'database_endpoint' in terraform_outputs:
db_endpoint = terraform_outputs['database_endpoint']['value']
db_name = terraform_outputs.get('database_name', {}).get('value', 'main')
tester.test_database_performance(db_endpoint, db_name)
if 'asg_name' in terraform_outputs:
asg_name = terraform_outputs['asg_name']['value']
tester.test_auto_scaling_performance(asg_name)
# Generate report
report = tester.generate_performance_report()
with open(args.output, 'w') as f:
json.dump(report, f, indent=2)
print(f"Performance testing complete:")
print(f" Total metrics: {report['total_metrics']}")
print(f" Passed: {report['passed_metrics']}")
print(f" Failed: {report['failed_metrics']}")
print(f" Success rate: {report['success_rate']:.1f}%")
if report['failed_tests']:
print("\nFailed performance tests:")
for test in report['failed_tests']:
print(f" - {test['name']}: {test['value']} {test['unit']} (threshold: {test['threshold']})")
exit(1)
if __name__ == "__main__":
main()
Resource Optimization Analysis
Analyze resource configurations for optimization opportunities:
#!/bin/bash
# scripts/optimization-analyzer.sh
set -e
TERRAFORM_DIR=${1:-"infrastructure"}
OUTPUT_DIR=${2:-"optimization-reports"}
mkdir -p "$OUTPUT_DIR"
echo "Analyzing Terraform configurations for optimization opportunities..."
# Generate Terraform plans for analysis
find "$TERRAFORM_DIR" -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
echo "Analyzing $dir for optimization opportunities..."
cd "$dir"
terraform init -backend=false
terraform plan -out=optimization.tfplan
terraform show -json optimization.tfplan > optimization-plan.json
cd - > /dev/null
# Run optimization analysis
python3 scripts/resource_optimizer.py \
--plan-file "$dir/optimization-plan.json" \
--output "$OUTPUT_DIR/$(basename "$dir")-optimization.json"
done
# Generate consolidated optimization report
python3 scripts/consolidate_optimization_reports.py \
--reports-dir "$OUTPUT_DIR" \
--output "$OUTPUT_DIR/consolidated-optimization-report.json"
echo "Optimization analysis complete. Reports saved to $OUTPUT_DIR/"
What’s Next
Performance and cost testing complete the comprehensive testing strategy by validating the efficiency and economic impact of your infrastructure. Combined with functional, security, and policy testing, these techniques ensure your infrastructure meets all requirements for production deployment.
In the final part, we’ll integrate all these testing strategies into comprehensive CI/CD pipelines that automate the entire testing workflow, from static analysis through performance validation, creating a complete quality assurance framework for infrastructure as code.
CI/CD Integration
Integrating Terraform testing into CI/CD pipelines ensures that every infrastructure change is validated before reaching production. A well-designed pipeline combines static analysis, unit testing, integration testing, and policy validation to create a comprehensive quality gate that prevents infrastructure failures and security issues.
This final part demonstrates how to build robust CI/CD pipelines that automate Terraform testing and deployment workflows.
GitHub Actions Pipeline
A comprehensive GitHub Actions workflow for Terraform testing:
# .github/workflows/terraform-test.yml
name: Terraform Test and Deploy
on:
pull_request:
paths: ['infrastructure/**', 'modules/**']
push:
branches: [main]
paths: ['infrastructure/**', 'modules/**']
env:
TF_VERSION: 1.6.0
AWS_REGION: us-west-2
jobs:
static-analysis:
name: Static Analysis
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
with:
terraform_version: ${{ env.TF_VERSION }}
- name: Terraform Format Check
run: terraform fmt -check -recursive
- name: Terraform Validate
run: |
find . -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
echo "Validating $dir"
cd "$dir"
terraform init -backend=false
terraform validate
cd - > /dev/null
done
- name: Setup TFLint
uses: terraform-linters/setup-tflint@v4
with:
tflint_version: v0.50.0
- name: Run TFLint
run: |
tflint --init
tflint --recursive
- name: Run Checkov
uses: bridgecrewio/checkov-action@master
with:
directory: .
framework: terraform
output_format: sarif
output_file_path: checkov.sarif
- name: Upload SARIF file
uses: github/codeql-action/upload-sarif@v3
if: always()
with:
sarif_file: checkov.sarif
unit-tests:
name: Unit Tests
runs-on: ubuntu-latest
needs: static-analysis
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
with:
terraform_version: ${{ env.TF_VERSION }}
- name: Run Unit Tests
run: |
cd test/unit
for test_dir in */; do
echo "Running unit tests in $test_dir"
cd "$test_dir"
terraform init -backend=false
terraform plan -out=test.tfplan
terraform show -json test.tfplan > plan.json
# Add custom validation logic here
cd ..
done
integration-tests:
name: Integration Tests
runs-on: ubuntu-latest
needs: unit-tests
if: github.event_name == 'pull_request'
environment: testing
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '1.21'
- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
with:
terraform_version: ${{ env.TF_VERSION }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_ROLE_ARN }}
aws-region: ${{ env.AWS_REGION }}
- name: Run Integration Tests
run: |
cd test/integration
go mod download
go test -v -timeout 30m ./...
env:
AWS_DEFAULT_REGION: ${{ env.AWS_REGION }}
policy-validation:
name: Policy Validation
runs-on: ubuntu-latest
needs: static-analysis
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup OPA
uses: open-policy-agent/setup-opa@v2
with:
version: latest
- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
with:
terraform_version: ${{ env.TF_VERSION }}
- name: Generate Terraform Plans
run: |
find infrastructure -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
echo "Generating plan for $dir"
cd "$dir"
terraform init -backend=false
terraform plan -out=plan.tfplan
terraform show -json plan.tfplan > plan.json
cd - > /dev/null
done
- name: Run Policy Tests
run: |
find infrastructure -name "plan.json" | while read plan; do
echo "Validating policy for $plan"
opa eval -d policies/ -i "$plan" "data.terraform.deny[x]"
done
deploy-staging:
name: Deploy to Staging
runs-on: ubuntu-latest
needs: [unit-tests, policy-validation]
if: github.ref == 'refs/heads/main'
environment: staging
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
with:
terraform_version: ${{ env.TF_VERSION }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_STAGING_ROLE_ARN }}
aws-region: ${{ env.AWS_REGION }}
- name: Terraform Plan
run: |
cd infrastructure/staging
terraform init
terraform plan -out=staging.tfplan
- name: Terraform Apply
run: |
cd infrastructure/staging
terraform apply staging.tfplan
- name: Run Smoke Tests
run: |
cd test/smoke
go test -v -timeout 10m ./...
deploy-production:
name: Deploy to Production
runs-on: ubuntu-latest
needs: deploy-staging
if: github.ref == 'refs/heads/main'
environment: production
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
with:
terraform_version: ${{ env.TF_VERSION }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_PRODUCTION_ROLE_ARN }}
aws-region: ${{ env.AWS_REGION }}
- name: Terraform Plan
run: |
cd infrastructure/production
terraform init
terraform plan -out=production.tfplan
- name: Manual Approval
uses: trstringer/manual-approval@v1
with:
secret: ${{ github.TOKEN }}
approvers: platform-team
minimum-approvals: 2
issue-title: "Production Deployment Approval"
- name: Terraform Apply
run: |
cd infrastructure/production
terraform apply production.tfplan
- name: Run Production Tests
run: |
cd test/production
go test -v -timeout 15m ./...
GitLab CI Pipeline
A comprehensive GitLab CI pipeline with multiple stages:
# .gitlab-ci.yml
stages:
- validate
- test
- security
- deploy-staging
- deploy-production
variables:
TF_VERSION: "1.6.0"
TF_ROOT: ${CI_PROJECT_DIR}
TF_ADDRESS: ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/terraform/state/${CI_COMMIT_REF_SLUG}
cache:
key: "${TF_ROOT}"
paths:
- ${TF_ROOT}/.terraform
before_script:
- apt-get update -qq && apt-get install -y -qq git curl unzip
- curl -fsSL https://releases.hashicorp.com/terraform/${TF_VERSION}/terraform_${TF_VERSION}_linux_amd64.zip -o terraform.zip
- unzip terraform.zip && mv terraform /usr/local/bin/
- terraform --version
validate:
stage: validate
script:
- terraform fmt -check -recursive
- find . -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
cd "$dir"
terraform init -backend=false
terraform validate
cd - > /dev/null
done
rules:
- changes:
- "**/*.tf"
- "**/*.tfvars"
unit-test:
stage: test
script:
- cd test/unit
- for test_dir in */; do
echo "Testing $test_dir"
cd "$test_dir"
terraform init -backend=false
terraform plan -detailed-exitcode
cd ..
done
rules:
- changes:
- "**/*.tf"
- "**/*.tfvars"
integration-test:
stage: test
image: golang:1.21
services:
- docker:dind
before_script:
- apt-get update -qq && apt-get install -y -qq curl unzip
- curl -fsSL https://releases.hashicorp.com/terraform/${TF_VERSION}/terraform_${TF_VERSION}_linux_amd64.zip -o terraform.zip
- unzip terraform.zip && mv terraform /usr/local/bin/
script:
- cd test/integration
- go mod download
- go test -v -timeout 30m ./...
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- "**/*.tf"
- "**/*.tfvars"
security-scan:
stage: security
image: bridgecrew/checkov:latest
script:
- checkov -d . --framework terraform --output cli --output json --output-file-path console,checkov-report.json
artifacts:
reports:
sast: checkov-report.json
expire_in: 1 week
rules:
- changes:
- "**/*.tf"
- "**/*.tfvars"
policy-check:
stage: security
image: openpolicyagent/opa:latest
before_script:
- apk add --no-cache curl unzip
- curl -fsSL https://releases.hashicorp.com/terraform/${TF_VERSION}/terraform_${TF_VERSION}_linux_amd64.zip -o terraform.zip
- unzip terraform.zip && mv terraform /usr/local/bin/
script:
- find infrastructure -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
cd "$dir"
terraform init -backend=false
terraform plan -out=plan.tfplan
terraform show -json plan.tfplan > plan.json
opa eval -d ../../policies/ -i plan.json "data.terraform.deny[x]"
cd - > /dev/null
done
rules:
- changes:
- "**/*.tf"
- "**/*.tfvars"
- "policies/**/*.rego"
deploy-staging:
stage: deploy-staging
environment:
name: staging
url: https://staging.example.com
before_script:
- echo $AWS_STAGING_CREDENTIALS | base64 -d > ~/.aws/credentials
script:
- cd infrastructure/staging
- terraform init -backend-config="address=${TF_ADDRESS}-staging"
- terraform plan -out=staging.tfplan
- terraform apply staging.tfplan
after_script:
- cd test/smoke
- go test -v -timeout 10m ./...
rules:
- if: $CI_COMMIT_BRANCH == "main"
changes:
- "**/*.tf"
- "**/*.tfvars"
deploy-production:
stage: deploy-production
environment:
name: production
url: https://production.example.com
before_script:
- echo $AWS_PRODUCTION_CREDENTIALS | base64 -d > ~/.aws/credentials
script:
- cd infrastructure/production
- terraform init -backend-config="address=${TF_ADDRESS}-production"
- terraform plan -out=production.tfplan
- terraform apply production.tfplan
after_script:
- cd test/production
- go test -v -timeout 15m ./...
when: manual
rules:
- if: $CI_COMMIT_BRANCH == "main"
changes:
- "**/*.tf"
- "**/*.tfvars"
Azure DevOps Pipeline
A comprehensive Azure DevOps pipeline:
# azure-pipelines.yml
trigger:
branches:
include:
- main
paths:
include:
- infrastructure/*
- modules/*
pr:
branches:
include:
- main
paths:
include:
- infrastructure/*
- modules/*
variables:
terraformVersion: '1.6.0'
awsRegion: 'us-west-2'
stages:
- stage: Validate
displayName: 'Validate and Test'
jobs:
- job: StaticAnalysis
displayName: 'Static Analysis'
pool:
vmImage: 'ubuntu-latest'
steps:
- task: TerraformInstaller@0
displayName: 'Install Terraform'
inputs:
terraformVersion: $(terraformVersion)
- script: |
terraform fmt -check -recursive
displayName: 'Terraform Format Check'
- script: |
find . -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
echo "Validating $dir"
cd "$dir"
terraform init -backend=false
terraform validate
cd - > /dev/null
done
displayName: 'Terraform Validate'
- script: |
curl -s https://raw.githubusercontent.com/terraform-linters/tflint/master/install_linux.sh | bash
tflint --init
tflint --recursive
displayName: 'TFLint'
- script: |
pip install checkov
checkov -d . --framework terraform --output cli --output sarif --output-file-path console,checkov.sarif
displayName: 'Checkov Security Scan'
- task: PublishTestResults@2
condition: always()
inputs:
testResultsFormat: 'JUnit'
testResultsFiles: 'checkov.sarif'
testRunTitle: 'Security Scan Results'
- job: UnitTests
displayName: 'Unit Tests'
dependsOn: StaticAnalysis
pool:
vmImage: 'ubuntu-latest'
steps:
- task: TerraformInstaller@0
inputs:
terraformVersion: $(terraformVersion)
- script: |
cd test/unit
for test_dir in */; do
echo "Running unit tests in $test_dir"
cd "$test_dir"
terraform init -backend=false
terraform plan -out=test.tfplan
cd ..
done
displayName: 'Run Unit Tests'
- stage: IntegrationTest
displayName: 'Integration Testing'
condition: eq(variables['Build.Reason'], 'PullRequest')
dependsOn: Validate
jobs:
- job: IntegrationTests
displayName: 'Integration Tests'
pool:
vmImage: 'ubuntu-latest'
steps:
- task: GoTool@0
inputs:
version: '1.21'
- task: TerraformInstaller@0
inputs:
terraformVersion: $(terraformVersion)
- task: AWSShellScript@1
inputs:
awsCredentials: 'AWS-Testing'
regionName: $(awsRegion)
scriptType: 'inline'
inlineScript: |
cd test/integration
go mod download
go test -v -timeout 30m ./...
displayName: 'Run Integration Tests'
- stage: DeployStaging
displayName: 'Deploy to Staging'
condition: and(succeeded(), eq(variables['Build.SourceBranch'], 'refs/heads/main'))
dependsOn: Validate
jobs:
- deployment: DeployStaging
displayName: 'Deploy to Staging'
environment: 'staging'
pool:
vmImage: 'ubuntu-latest'
strategy:
runOnce:
deploy:
steps:
- task: TerraformInstaller@0
inputs:
terraformVersion: $(terraformVersion)
- task: TerraformTaskV4@4
inputs:
provider: 'aws'
command: 'init'
workingDirectory: 'infrastructure/staging'
backendServiceAWS: 'AWS-Staging'
backendAWSBucketName: 'terraform-state-staging'
backendAWSKey: 'infrastructure/terraform.tfstate'
- task: TerraformTaskV4@4
inputs:
provider: 'aws'
command: 'plan'
workingDirectory: 'infrastructure/staging'
environmentServiceNameAWS: 'AWS-Staging'
- task: TerraformTaskV4@4
inputs:
provider: 'aws'
command: 'apply'
workingDirectory: 'infrastructure/staging'
environmentServiceNameAWS: 'AWS-Staging'
- script: |
cd test/smoke
go test -v -timeout 10m ./...
displayName: 'Run Smoke Tests'
- stage: DeployProduction
displayName: 'Deploy to Production'
condition: and(succeeded(), eq(variables['Build.SourceBranch'], 'refs/heads/main'))
dependsOn: DeployStaging
jobs:
- deployment: DeployProduction
displayName: 'Deploy to Production'
environment: 'production'
pool:
vmImage: 'ubuntu-latest'
strategy:
runOnce:
deploy:
steps:
- task: TerraformInstaller@0
inputs:
terraformVersion: $(terraformVersion)
- task: TerraformTaskV4@4
inputs:
provider: 'aws'
command: 'init'
workingDirectory: 'infrastructure/production'
backendServiceAWS: 'AWS-Production'
backendAWSBucketName: 'terraform-state-production'
backendAWSKey: 'infrastructure/terraform.tfstate'
- task: TerraformTaskV4@4
inputs:
provider: 'aws'
command: 'plan'
workingDirectory: 'infrastructure/production'
environmentServiceNameAWS: 'AWS-Production'
- task: ManualValidation@0
inputs:
notifyUsers: '[email protected]'
instructions: 'Please review the Terraform plan and approve the production deployment'
- task: TerraformTaskV4@4
inputs:
provider: 'aws'
command: 'apply'
workingDirectory: 'infrastructure/production'
environmentServiceNameAWS: 'AWS-Production'
- script: |
cd test/production
go test -v -timeout 15m ./...
displayName: 'Run Production Tests'
Testing Pipeline Optimization
Optimize pipeline performance and reliability:
#!/bin/bash
# scripts/optimize-pipeline.sh
# Parallel test execution
run_tests_parallel() {
local test_dirs=("$@")
local pids=()
for dir in "${test_dirs[@]}"; do
(
echo "Running tests in $dir"
cd "$dir"
terraform init -backend=false
terraform plan -out=test.tfplan
terraform show -json test.tfplan > plan.json
# Run custom validations
python3 ../../scripts/validate-plan.py plan.json
) &
pids+=($!)
done
# Wait for all tests to complete
for pid in "${pids[@]}"; do
wait $pid || exit 1
done
}
# Cache Terraform providers
cache_providers() {
local cache_dir="$HOME/.terraform.d/plugin-cache"
mkdir -p "$cache_dir"
export TF_PLUGIN_CACHE_DIR="$cache_dir"
# Pre-download common providers
terraform providers mirror "$cache_dir"
}
# Selective testing based on changes
selective_testing() {
local changed_files=$(git diff --name-only HEAD~1)
local test_modules=()
for file in $changed_files; do
if [[ $file == modules/* ]]; then
module_name=$(echo "$file" | cut -d'/' -f2)
test_modules+=("test/unit/$module_name")
fi
done
if [ ${#test_modules[@]} -gt 0 ]; then
run_tests_parallel "${test_modules[@]}"
else
echo "No module changes detected, running full test suite"
run_tests_parallel test/unit/*/
fi
}
# Main execution
main() {
cache_providers
selective_testing
}
main "$@"
Conclusion
Comprehensive CI/CD integration ensures that every infrastructure change is thoroughly tested before reaching production. The combination of static analysis, unit testing, integration testing, policy validation, and automated deployment creates a robust quality gate that prevents infrastructure failures and maintains security standards.
The key to successful Terraform testing in CI/CD is balancing thoroughness with speed, using parallel execution, caching, and selective testing to maintain fast feedback cycles while ensuring comprehensive validation of your infrastructure code.