Policy as Code

Policy as code transforms governance from manual reviews to automated enforcement, ensuring that infrastructure changes comply with organizational standards before they reach production. Open Policy Agent (OPA) and HashiCorp Sentinel provide powerful frameworks for implementing policy validation that integrates seamlessly with Terraform workflows.

This part covers implementing comprehensive policy frameworks that enforce security, compliance, and operational standards across your Terraform configurations.

Open Policy Agent (OPA) Fundamentals

OPA uses the Rego language to define policies that can evaluate JSON data:

# policies/terraform/security.rego
package terraform.security

# Deny resources that allow unrestricted SSH access
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_security_group_rule"
    resource.values.type == "ingress"
    resource.values.from_port <= 22
    resource.values.to_port >= 22
    resource.values.cidr_blocks[_] == "0.0.0.0/0"
    
    msg := sprintf("Security group rule allows SSH from anywhere: %v", [resource.address])
}

# Require encryption for S3 buckets
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_s3_bucket"
    not has_encryption(resource)
    
    msg := sprintf("S3 bucket must have encryption enabled: %v", [resource.address])
}

has_encryption(resource) {
    resource.values.server_side_encryption_configuration[_]
}

# Require specific tags
required_tags := ["Environment", "Project", "Owner"]

deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_instance"
    missing_tags := required_tags - object.get(resource.values, "tags", {})
    count(missing_tags) > 0
    
    msg := sprintf("Resource missing required tags %v: %v", [missing_tags, resource.address])
}

# Cost control policies
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_instance"
    expensive_types := ["m5.4xlarge", "m5.8xlarge", "m5.12xlarge", "m5.16xlarge"]
    resource.values.instance_type in expensive_types
    
    msg := sprintf("Instance type %v is not allowed: %v", [resource.values.instance_type, resource.address])
}

Advanced OPA Policies

Implement complex governance rules:

# policies/terraform/compliance.rego
package terraform.compliance

import future.keywords.in

# GDPR compliance - ensure data residency
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type in ["aws_s3_bucket", "aws_db_instance", "aws_rds_cluster"]
    
    # Check if resource is in EU region for GDPR compliance
    provider_config := input.configuration.provider_config.aws
    region := provider_config.expressions.region.constant_value
    not startswith(region, "eu-")
    
    # Check if resource handles personal data
    tags := object.get(resource.values, "tags", {})
    tags.DataClassification in ["personal", "sensitive"]
    
    msg := sprintf("GDPR: Personal data resource must be in EU region: %v", [resource.address])
}

# SOC2 compliance - audit logging
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_s3_bucket"
    not has_access_logging(resource)
    
    msg := sprintf("SOC2: S3 bucket must have access logging enabled: %v", [resource.address])
}

has_access_logging(resource) {
    resource.values.logging[_]
}

# PCI DSS compliance - network segmentation
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_security_group_rule"
    resource.values.type == "ingress"
    
    # Check if this is a PCI environment
    tags := object.get(resource.values, "tags", {})
    tags.PCIScope == "true"
    
    # Ensure no broad network access in PCI scope
    "0.0.0.0/0" in resource.values.cidr_blocks
    
    msg := sprintf("PCI DSS: Broad network access not allowed in PCI scope: %v", [resource.address])
}

# Data retention policies
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_s3_bucket"
    
    tags := object.get(resource.values, "tags", {})
    data_retention := object.get(tags, "DataRetention", "")
    
    # Require data retention policy for certain data types
    tags.DataType in ["logs", "backups", "archives"]
    data_retention == ""
    
    msg := sprintf("Data retention policy required for %v: %v", [tags.DataType, resource.address])
}

Testing OPA Policies

Create comprehensive tests for your policies:

# policies/terraform/security_test.rego
package terraform.security

# Test SSH restriction policy
test_deny_ssh_from_anywhere {
    deny[_] with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_security_group_rule.bad_ssh",
                    "type": "aws_security_group_rule",
                    "values": {
                        "type": "ingress",
                        "from_port": 22,
                        "to_port": 22,
                        "cidr_blocks": ["0.0.0.0/0"]
                    }
                }]
            }
        }
    }
}

test_allow_ssh_from_specific_cidr {
    count(deny) == 0 with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_security_group_rule.good_ssh",
                    "type": "aws_security_group_rule",
                    "values": {
                        "type": "ingress",
                        "from_port": 22,
                        "to_port": 22,
                        "cidr_blocks": ["10.0.0.0/8"]
                    }
                }]
            }
        }
    }
}

# Test S3 encryption policy
test_deny_unencrypted_s3 {
    deny[_] with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_s3_bucket.unencrypted",
                    "type": "aws_s3_bucket",
                    "values": {
                        "bucket": "my-bucket"
                    }
                }]
            }
        }
    }
}

test_allow_encrypted_s3 {
    count(deny) == 0 with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_s3_bucket.encrypted",
                    "type": "aws_s3_bucket",
                    "values": {
                        "bucket": "my-bucket",
                        "server_side_encryption_configuration": [{
                            "rule": [{
                                "apply_server_side_encryption_by_default": [{
                                    "sse_algorithm": "AES256"
                                }]
                            }]
                        }]
                    }
                }]
            }
        }
    }
}

# Test required tags policy
test_deny_missing_required_tags {
    deny[_] with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_instance.no_tags",
                    "type": "aws_instance",
                    "values": {
                        "instance_type": "t3.micro",
                        "tags": {
                            "Name": "test-instance"
                        }
                    }
                }]
            }
        }
    }
}

test_allow_all_required_tags {
    count(deny) == 0 with input as {
        "planned_values": {
            "root_module": {
                "resources": [{
                    "address": "aws_instance.with_tags",
                    "type": "aws_instance",
                    "values": {
                        "instance_type": "t3.micro",
                        "tags": {
                            "Name": "test-instance",
                            "Environment": "dev",
                            "Project": "test-project",
                            "Owner": "team-name"
                        }
                    }
                }]
            }
        }
    }
}

Sentinel Policies

HashiCorp Sentinel provides another powerful policy framework:

# policies/sentinel/aws-security.sentinel
import "tfplan/v2" as tfplan
import "strings"

# Get all AWS security group rules
security_group_rules = filter tfplan.resource_changes as _, rc {
    rc.type is "aws_security_group_rule" and
    rc.mode is "managed" and
    (rc.change.actions contains "create" or rc.change.actions contains "update")
}

# Function to check if SSH is open to the world
ssh_open_to_world = func(rule) {
    return rule.change.after.type is "ingress" and
           rule.change.after.from_port <= 22 and
           rule.change.after.to_port >= 22 and
           "0.0.0.0/0" in rule.change.after.cidr_blocks
}

# Main rule
main = rule {
    all security_group_rules as _, rule {
        not ssh_open_to_world(rule)
    }
}

# Violation messages
violations = [
    {
        "resource": rule.address,
        "message": "SSH (port 22) should not be open to 0.0.0.0/0"
    } for rule in security_group_rules if ssh_open_to_world(rule)
]

# Print violations
print("SSH Security Violations:")
for violations as violation {
    print("  -", violation.resource, ":", violation.message)
}

Policy Integration with CI/CD

Integrate policy validation into your CI/CD pipeline:

#!/bin/bash
# scripts/policy-check.sh

set -e

TERRAFORM_DIR=${1:-"infrastructure"}
POLICY_DIR=${2:-"policies"}

echo "Running policy validation on Terraform configurations..."

# Find all Terraform directories
find "$TERRAFORM_DIR" -name "*.tf" -exec dirname {} \; | sort -u | while read dir; do
    echo "Checking policies for $dir"
    
    cd "$dir"
    
    # Generate Terraform plan
    terraform init -backend=false
    terraform plan -out=plan.tfplan
    terraform show -json plan.tfplan > plan.json
    
    # Run OPA policy evaluation
    echo "Running OPA policy checks..."
    opa eval -d "../../$POLICY_DIR" -i plan.json "data.terraform.deny[x]" --format pretty
    
    # Check if there are any policy violations
    violations=$(opa eval -d "../../$POLICY_DIR" -i plan.json "data.terraform.deny[x]" --format raw)
    
    if [ "$violations" != "[]" ]; then
        echo "❌ Policy violations found in $dir"
        echo "$violations" | jq -r '.[]'
        exit 1
    else
        echo "✅ No policy violations found in $dir"
    fi
    
    cd - > /dev/null
done

echo "All policy checks passed!"

Dynamic Policy Configuration

Create policies that adapt to different environments:

# policies/terraform/environment_policies.rego
package terraform.environment

import future.keywords.in

# Environment-specific configurations
environment_config := {
    "dev": {
        "allowed_instance_types": ["t3.micro", "t3.small"],
        "max_instance_count": 5,
        "require_encryption": false
    },
    "staging": {
        "allowed_instance_types": ["t3.small", "t3.medium", "m5.large"],
        "max_instance_count": 10,
        "require_encryption": true
    },
    "prod": {
        "allowed_instance_types": ["t3.medium", "t3.large", "m5.large", "m5.xlarge"],
        "max_instance_count": 50,
        "require_encryption": true
    }
}

# Get environment from tags or variables
get_environment(resource) = env {
    env := resource.values.tags.Environment
} else = env {
    # Fallback to terraform variables
    env := input.variables.environment.value
} else = "dev" {
    # Default to dev if no environment specified
    true
}

# Check instance type restrictions
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_instance"
    
    env := get_environment(resource)
    config := environment_config[env]
    
    not resource.values.instance_type in config.allowed_instance_types
    
    msg := sprintf("Instance type %v not allowed in %v environment. Allowed types: %v", 
                   [resource.values.instance_type, env, config.allowed_instance_types])
}

# Check instance count limits
deny[msg] {
    env := input.variables.environment.value
    config := environment_config[env]
    
    instances := [r | r := input.planned_values.root_module.resources[_]; r.type == "aws_instance"]
    count(instances) > config.max_instance_count
    
    msg := sprintf("Too many instances (%v) for %v environment. Maximum allowed: %v", 
                   [count(instances), env, config.max_instance_count])
}

# Environment-specific encryption requirements
deny[msg] {
    resource := input.planned_values.root_module.resources[_]
    resource.type == "aws_s3_bucket"
    
    env := get_environment(resource)
    config := environment_config[env]
    
    config.require_encryption == true
    not has_encryption(resource)
    
    msg := sprintf("S3 bucket encryption required in %v environment: %v", [env, resource.address])
}

has_encryption(resource) {
    resource.values.server_side_encryption_configuration[_]
}

Policy Reporting and Metrics

Generate comprehensive policy compliance reports:

#!/usr/bin/env python3
# scripts/policy_report.py

import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path

class PolicyReporter:
    def __init__(self, terraform_dir, policy_dir):
        self.terraform_dir = Path(terraform_dir)
        self.policy_dir = Path(policy_dir)
        self.results = []
    
    def run_policy_check(self, tf_dir):
        """Run OPA policy check on a Terraform directory"""
        try:
            # Generate plan
            subprocess.run(
                ["terraform", "init", "-backend=false"],
                cwd=tf_dir,
                check=True,
                capture_output=True
            )
            
            subprocess.run(
                ["terraform", "plan", "-out=plan.tfplan"],
                cwd=tf_dir,
                check=True,
                capture_output=True
            )
            
            subprocess.run(
                ["terraform", "show", "-json", "plan.tfplan"],
                cwd=tf_dir,
                check=True,
                capture_output=True,
                stdout=open(tf_dir / "plan.json", "w")
            )
            
            # Run OPA evaluation
            result = subprocess.run(
                ["opa", "eval", "-d", str(self.policy_dir), 
                 "-i", str(tf_dir / "plan.json"), 
                 "data.terraform.deny[x]", "--format", "json"],
                capture_output=True,
                text=True
            )
            
            violations = json.loads(result.stdout)
            
            return {
                "directory": str(tf_dir.relative_to(self.terraform_dir)),
                "violations": violations.get("result", []),
                "status": "failed" if violations.get("result") else "passed"
            }
            
        except subprocess.CalledProcessError as e:
            return {
                "directory": str(tf_dir.relative_to(self.terraform_dir)),
                "violations": [f"Error running policy check: {e}"],
                "status": "error"
            }
    
    def generate_report(self):
        """Generate comprehensive policy compliance report"""
        # Find all Terraform directories
        tf_dirs = []
        for tf_file in self.terraform_dir.rglob("*.tf"):
            tf_dirs.append(tf_file.parent)
        
        tf_dirs = list(set(tf_dirs))  # Remove duplicates
        
        # Run policy checks
        for tf_dir in tf_dirs:
            result = self.run_policy_check(tf_dir)
            self.results.append(result)
        
        # Generate summary
        total_dirs = len(self.results)
        passed_dirs = len([r for r in self.results if r["status"] == "passed"])
        failed_dirs = len([r for r in self.results if r["status"] == "failed"])
        error_dirs = len([r for r in self.results if r["status"] == "error"])
        
        total_violations = sum(len(r["violations"]) for r in self.results)
        
        report = {
            "timestamp": datetime.now().isoformat(),
            "summary": {
                "total_directories": total_dirs,
                "passed": passed_dirs,
                "failed": failed_dirs,
                "errors": error_dirs,
                "total_violations": total_violations,
                "compliance_rate": (passed_dirs / total_dirs * 100) if total_dirs > 0 else 0
            },
            "results": self.results
        }
        
        return report
    
    def save_report(self, report, filename="policy_report.json"):
        """Save report to file"""
        with open(filename, "w") as f:
            json.dump(report, f, indent=2)
        
        print(f"Policy report saved to {filename}")
    
    def print_summary(self, report):
        """Print report summary to console"""
        summary = report["summary"]
        
        print("\n" + "="*50)
        print("POLICY COMPLIANCE REPORT")
        print("="*50)
        print(f"Timestamp: {report['timestamp']}")
        print(f"Total Directories: {summary['total_directories']}")
        print(f"Passed: {summary['passed']}")
        print(f"Failed: {summary['failed']}")
        print(f"Errors: {summary['errors']}")
        print(f"Total Violations: {summary['total_violations']}")
        print(f"Compliance Rate: {summary['compliance_rate']:.1f}%")
        
        if summary['failed'] > 0:
            print("\nFAILED DIRECTORIES:")
            for result in report['results']:
                if result['status'] == 'failed':
                    print(f"  - {result['directory']}: {len(result['violations'])} violations")
                    for violation in result['violations']:
                        print(f"    • {violation}")
        
        print("="*50)

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: python3 policy_report.py <terraform_dir> <policy_dir>")
        sys.exit(1)
    
    reporter = PolicyReporter(sys.argv[1], sys.argv[2])
    report = reporter.generate_report()
    reporter.save_report(report)
    reporter.print_summary(report)
    
    # Exit with error code if there are violations
    if report["summary"]["failed"] > 0 or report["summary"]["errors"] > 0:
        sys.exit(1)

What’s Next

Policy as code transforms infrastructure governance from reactive reviews to proactive enforcement, ensuring compliance and security standards are met before resources are created. The combination of OPA and Sentinel provides powerful frameworks for implementing comprehensive governance that scales with your organization.

In the next part, we’ll explore security and compliance testing, including vulnerability scanning, compliance validation, and automated security assessments that complement policy enforcement with deeper security analysis.