Disaster Recovery

State file corruption, accidental deletions, and infrastructure drift can turn into disasters that threaten your entire infrastructure. When prevention fails, you need robust recovery procedures that can restore your Terraform state and get your infrastructure back under management.

This part covers disaster recovery strategies, state reconstruction techniques, and emergency procedures for the worst-case scenarios.

Automated State Backup

Implement comprehensive backup strategies:

#!/bin/bash
# scripts/state-backup.sh

set -e

BACKUP_BUCKET=${1:-"terraform-state-backups"}
STATE_BUCKET=${2:-"terraform-state"}
RETENTION_DAYS=${3:-30}

backup_state() {
    local workspace=${1:-"default"}
    local timestamp=$(date +%Y%m%d-%H%M%S)
    
    echo "Backing up state for workspace: $workspace"
    
    # Pull current state
    terraform workspace select "$workspace"
    terraform state pull > "/tmp/terraform-${workspace}-${timestamp}.tfstate"
    
    # Upload to backup bucket
    aws s3 cp "/tmp/terraform-${workspace}-${timestamp}.tfstate" \
        "s3://$BACKUP_BUCKET/$workspace/terraform-${timestamp}.tfstate"
    
    # Create metadata
    cat > "/tmp/backup-metadata-${timestamp}.json" << EOF
{
    "workspace": "$workspace",
    "timestamp": "$timestamp",
    "terraform_version": "$(terraform version -json | jq -r '.terraform_version')",
    "state_serial": $(terraform state pull | jq '.serial'),
    "resource_count": $(terraform state list | wc -l)
}
EOF
    
    aws s3 cp "/tmp/backup-metadata-${timestamp}.json" \
        "s3://$BACKUP_BUCKET/$workspace/metadata-${timestamp}.json"
    
    # Cleanup temp files
    rm -f "/tmp/terraform-${workspace}-${timestamp}.tfstate"
    rm -f "/tmp/backup-metadata-${timestamp}.json"
    
    echo "✅ Backup completed: $workspace"
}

cleanup_old_backups() {
    echo "Cleaning up backups older than $RETENTION_DAYS days..."
    
    cutoff_date=$(date -d "$RETENTION_DAYS days ago" +%Y%m%d)
    
    aws s3 ls "s3://$BACKUP_BUCKET/" --recursive | while read -r line; do
        backup_date=$(echo "$line" | grep -o '[0-9]\{8\}-[0-9]\{6\}' | head -1 | cut -d'-' -f1)
        
        if [ "$backup_date" -lt "$cutoff_date" ]; then
            file_path=$(echo "$line" | awk '{print $4}')
            echo "Deleting old backup: $file_path"
            aws s3 rm "s3://$BACKUP_BUCKET/$file_path"
        fi
    done
}

# Backup all workspaces
terraform workspace list | grep -v '^\*' | sed 's/^[[:space:]]*//' | while read -r workspace; do
    if [ -n "$workspace" ]; then
        backup_state "$workspace"
    fi
done

cleanup_old_backups
echo "✅ All backups completed"

State Reconstruction

Rebuild state from existing infrastructure:

#!/usr/bin/env python3
# scripts/state_reconstructor.py

import boto3
import json
import subprocess
from typing import Dict, List, Tuple

class StateReconstructor:
    def __init__(self, region: str = "us-west-2"):
        self.ec2 = boto3.client('ec2', region_name=region)
        self.rds = boto3.client('rds', region_name=region)
        self.s3 = boto3.client('s3')
        self.region = region
    
    def discover_infrastructure(self) -> Dict[str, List[Tuple[str, str]]]:
        """Discover existing infrastructure for reconstruction"""
        
        resources = {
            'aws_instance': self._discover_instances(),
            'aws_vpc': self._discover_vpcs(),
            'aws_subnet': self._discover_subnets(),
            'aws_security_group': self._discover_security_groups(),
            'aws_s3_bucket': self._discover_s3_buckets(),
            'aws_db_instance': self._discover_rds_instances()
        }
        
        return resources
    
    def _discover_instances(self) -> List[Tuple[str, str]]:
        instances = []
        response = self.ec2.describe_instances()
        
        for reservation in response['Reservations']:
            for instance in reservation['Instances']:
                if instance['State']['Name'] != 'terminated':
                    name = self._get_name_tag(instance.get('Tags', []))
                    instances.append((f"aws_instance.{name}", instance['InstanceId']))
        
        return instances
    
    def _discover_vpcs(self) -> List[Tuple[str, str]]:
        vpcs = []
        response = self.ec2.describe_vpcs()
        
        for vpc in response['Vpcs']:
            name = self._get_name_tag(vpc.get('Tags', []))
            vpcs.append((f"aws_vpc.{name}", vpc['VpcId']))
        
        return vpcs
    
    def _discover_subnets(self) -> List[Tuple[str, str]]:
        subnets = []
        response = self.ec2.describe_subnets()
        
        for subnet in response['Subnets']:
            name = self._get_name_tag(subnet.get('Tags', []))
            subnets.append((f"aws_subnet.{name}", subnet['SubnetId']))
        
        return subnets
    
    def _discover_security_groups(self) -> List[Tuple[str, str]]:
        sgs = []
        response = self.ec2.describe_security_groups()
        
        for sg in response['SecurityGroups']:
            if sg['GroupName'] != 'default':
                name = sg['GroupName'].replace('-', '_')
                sgs.append((f"aws_security_group.{name}", sg['GroupId']))
        
        return sgs
    
    def _discover_s3_buckets(self) -> List[Tuple[str, str]]:
        buckets = []
        response = self.s3.list_buckets()
        
        for bucket in response['Buckets']:
            name = bucket['Name'].replace('-', '_').replace('.', '_')
            buckets.append((f"aws_s3_bucket.{name}", bucket['Name']))
        
        return buckets
    
    def _discover_rds_instances(self) -> List[Tuple[str, str]]:
        instances = []
        response = self.rds.describe_db_instances()
        
        for db in response['DBInstances']:
            if db['DBInstanceStatus'] != 'deleting':
                name = db['DBInstanceIdentifier'].replace('-', '_')
                instances.append((f"aws_db_instance.{name}", db['DBInstanceIdentifier']))
        
        return instances
    
    def _get_name_tag(self, tags: List[Dict]) -> str:
        for tag in tags:
            if tag['Key'] == 'Name':
                return tag['Value'].lower().replace(' ', '_').replace('-', '_')
        return 'unnamed'
    
    def generate_import_script(self, resources: Dict[str, List[Tuple[str, str]]]) -> str:
        """Generate import script for discovered resources"""
        
        script_lines = [
            "#!/bin/bash",
            "set -e",
            "",
            "echo 'Starting state reconstruction...'",
            "",
            "# Backup any existing state",
            "if [ -f terraform.tfstate ]; then",
            "    cp terraform.tfstate terraform.tfstate.backup.$(date +%Y%m%d-%H%M%S)",
            "fi",
            ""
        ]
        
        for resource_type, resource_list in resources.items():
            if resource_list:
                script_lines.append(f"# Import {resource_type} resources")
                
                for terraform_address, resource_id in resource_list:
                    script_lines.append(f"echo 'Importing {terraform_address}...'")
                    script_lines.append(f"terraform import '{terraform_address}' '{resource_id}' || echo 'Failed to import {terraform_address}'")
                
                script_lines.append("")
        
        script_lines.extend([
            "echo 'State reconstruction completed'",
            "terraform state list"
        ])
        
        return '\n'.join(script_lines)
    
    def reconstruct_state(self, output_dir: str = "."):
        """Full state reconstruction process"""
        
        print("🔍 Discovering existing infrastructure...")
        resources = self.discover_infrastructure()
        
        total_resources = sum(len(resource_list) for resource_list in resources.values())
        print(f"Found {total_resources} resources to reconstruct")
        
        # Generate import script
        import_script = self.generate_import_script(resources)
        
        with open(f"{output_dir}/reconstruct_state.sh", 'w') as f:
            f.write(import_script)
        
        # Make script executable
        subprocess.run(['chmod', '+x', f"{output_dir}/reconstruct_state.sh"])
        
        # Generate basic Terraform configuration
        self._generate_basic_config(resources, output_dir)
        
        print(f"✅ Reconstruction files generated in {output_dir}")
        print("Run ./reconstruct_state.sh to import resources")
    
    def _generate_basic_config(self, resources: Dict[str, List[Tuple[str, str]]], output_dir: str):
        """Generate basic Terraform configuration for discovered resources"""
        
        config_lines = []
        
        for resource_type, resource_list in resources.items():
            for terraform_address, resource_id in resource_list:
                resource_name = terraform_address.split('.')[1]
                
                if resource_type == "aws_instance":
                    config_lines.append(f'''
resource "aws_instance" "{resource_name}" {{
  # Configuration will be populated after import
  lifecycle {{
    ignore_changes = [ami, user_data]
  }}
}}''')
                
                elif resource_type == "aws_vpc":
                    config_lines.append(f'''
resource "aws_vpc" "{resource_name}" {{
  # Configuration will be populated after import
}}''')
                
                elif resource_type == "aws_subnet":
                    config_lines.append(f'''
resource "aws_subnet" "{resource_name}" {{
  # Configuration will be populated after import
}}''')
                
                elif resource_type == "aws_security_group":
                    config_lines.append(f'''
resource "aws_security_group" "{resource_name}" {{
  # Configuration will be populated after import
}}''')
                
                elif resource_type == "aws_s3_bucket":
                    config_lines.append(f'''
resource "aws_s3_bucket" "{resource_name}" {{
  bucket = "{resource_id}"
}}''')
                
                elif resource_type == "aws_db_instance":
                    config_lines.append(f'''
resource "aws_db_instance" "{resource_name}" {{
  identifier = "{resource_id}"
  skip_final_snapshot = true
}}''')
        
        with open(f"{output_dir}/reconstructed.tf", 'w') as f:
            f.write('\n'.join(config_lines))

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='Terraform State Reconstructor')
    parser.add_argument('--region', default='us-west-2', help='AWS region')
    parser.add_argument('--output-dir', default='.', help='Output directory')
    
    args = parser.parse_args()
    
    reconstructor = StateReconstructor(args.region)
    reconstructor.reconstruct_state(args.output_dir)

if __name__ == "__main__":
    main()

Emergency Recovery Procedures

Handle critical state corruption scenarios:

#!/bin/bash
# scripts/emergency-recovery.sh

set -e

BACKUP_BUCKET=${1:-"terraform-state-backups"}
WORKSPACE=${2:-"default"}

emergency_restore() {
    echo "🚨 EMERGENCY STATE RECOVERY"
    echo "Workspace: $WORKSPACE"
    
    # List available backups
    echo "Available backups:"
    aws s3 ls "s3://$BACKUP_BUCKET/$WORKSPACE/" --recursive | grep '\.tfstate$' | tail -10
    
    read -p "Enter backup filename (or 'latest' for most recent): " backup_choice
    
    if [ "$backup_choice" = "latest" ]; then
        BACKUP_FILE=$(aws s3 ls "s3://$BACKUP_BUCKET/$WORKSPACE/" --recursive | grep '\.tfstate$' | tail -1 | awk '{print $4}')
    else
        BACKUP_FILE="$WORKSPACE/$backup_choice"
    fi
    
    echo "Restoring from: $BACKUP_FILE"
    
    # Download backup
    aws s3 cp "s3://$BACKUP_BUCKET/$BACKUP_FILE" "/tmp/restore.tfstate"
    
    # Validate backup
    if ! jq empty "/tmp/restore.tfstate" 2>/dev/null; then
        echo "❌ Invalid backup file"
        exit 1
    fi
    
    # Create safety backup of current state
    if [ -f "terraform.tfstate" ]; then
        cp terraform.tfstate "terraform.tfstate.emergency-backup.$(date +%Y%m%d-%H%M%S)"
    fi
    
    # Restore state
    terraform workspace select "$WORKSPACE"
    terraform state push "/tmp/restore.tfstate"
    
    # Verify restoration
    echo "Verifying restored state..."
    terraform plan -detailed-exitcode
    
    if [ $? -eq 0 ]; then
        echo "✅ Emergency recovery successful"
    else
        echo "⚠️  Recovery completed but state may need adjustment"
    fi
    
    rm -f "/tmp/restore.tfstate"
}

partial_recovery() {
    echo "🔧 PARTIAL STATE RECOVERY"
    
    # Extract specific resources from backup
    read -p "Enter resource addresses to recover (space-separated): " resources
    
    BACKUP_FILE=$(aws s3 ls "s3://$BACKUP_BUCKET/$WORKSPACE/" --recursive | grep '\.tfstate$' | tail -1 | awk '{print $4}')
    aws s3 cp "s3://$BACKUP_BUCKET/$BACKUP_FILE" "/tmp/backup.tfstate"
    
    for resource in $resources; do
        echo "Recovering resource: $resource"
        
        # Extract resource from backup
        jq ".resources[] | select(.name == \"${resource##*.}\" and .type == \"${resource%.*}\")" "/tmp/backup.tfstate" > "/tmp/resource.json"
        
        if [ -s "/tmp/resource.json" ]; then
            # Get resource ID for import
            RESOURCE_ID=$(jq -r '.instances[0].attributes.id // .instances[0].attributes.arn // empty' "/tmp/resource.json")
            
            if [ -n "$RESOURCE_ID" ]; then
                echo "Importing $resource with ID: $RESOURCE_ID"
                terraform import "$resource" "$RESOURCE_ID"
            else
                echo "⚠️  Could not determine resource ID for $resource"
            fi
        else
            echo "❌ Resource $resource not found in backup"
        fi
    done
    
    rm -f "/tmp/backup.tfstate" "/tmp/resource.json"
}

drift_recovery() {
    echo "🔄 INFRASTRUCTURE DRIFT RECOVERY"
    
    # Detect drift
    echo "Detecting infrastructure drift..."
    terraform plan -out=drift.tfplan
    
    # Show drift summary
    terraform show -json drift.tfplan | jq -r '
        .resource_changes[] | 
        select(.change.actions[] | contains("update") or contains("delete") or contains("create")) |
        "\(.change.actions | join(",")): \(.address)"
    '
    
    read -p "Apply changes to fix drift? (y/N): " apply_changes
    
    if [[ $apply_changes =~ ^[Yy]$ ]]; then
        terraform apply drift.tfplan
        echo "✅ Drift recovery completed"
    else
        echo "Drift recovery cancelled"
    fi
    
    rm -f drift.tfplan
}

case "${3:-help}" in
    "emergency")
        emergency_restore
        ;;
    "partial")
        partial_recovery
        ;;
    "drift")
        drift_recovery
        ;;
    *)
        echo "Usage: $0 <backup_bucket> <workspace> [emergency|partial|drift]"
        echo ""
        echo "Recovery modes:"
        echo "  emergency - Full state restoration from backup"
        echo "  partial   - Recover specific resources from backup"
        echo "  drift     - Detect and fix infrastructure drift"
        exit 1
        ;;
esac

State Validation and Repair

Validate and repair corrupted state files:

#!/usr/bin/env python3
# scripts/state_validator.py

import json
import sys
from typing import Dict, List, Any

class StateValidator:
    def __init__(self, state_file: str):
        with open(state_file, 'r') as f:
            self.state = json.load(f)
        self.errors = []
        self.warnings = []
    
    def validate_structure(self) -> bool:
        """Validate basic state file structure"""
        
        required_fields = ['version', 'terraform_version', 'serial', 'resources']
        
        for field in required_fields:
            if field not in self.state:
                self.errors.append(f"Missing required field: {field}")
        
        if 'resources' in self.state:
            if not isinstance(self.state['resources'], list):
                self.errors.append("Resources field must be a list")
        
        return len(self.errors) == 0
    
    def validate_resources(self) -> bool:
        """Validate resource definitions"""
        
        if 'resources' not in self.state:
            return False
        
        for i, resource in enumerate(self.state['resources']):
            resource_path = f"resources[{i}]"
            
            # Check required resource fields
            required_fields = ['mode', 'type', 'name', 'instances']
            for field in required_fields:
                if field not in resource:
                    self.errors.append(f"{resource_path}: Missing field '{field}'")
            
            # Validate instances
            if 'instances' in resource:
                for j, instance in enumerate(resource['instances']):
                    instance_path = f"{resource_path}.instances[{j}]"
                    
                    if 'attributes' not in instance:
                        self.errors.append(f"{instance_path}: Missing attributes")
                    
                    if 'schema_version' not in instance:
                        self.warnings.append(f"{instance_path}: Missing schema_version")
        
        return len(self.errors) == 0
    
    def check_dependencies(self) -> bool:
        """Check for broken dependencies"""
        
        resource_addresses = set()
        dependencies = []
        
        # Collect all resource addresses
        for resource in self.state.get('resources', []):
            address = f"{resource['type']}.{resource['name']}"
            resource_addresses.add(address)
        
        # Check dependencies
        for resource in self.state.get('resources', []):
            for instance in resource.get('instances', []):
                deps = instance.get('dependencies', [])
                for dep in deps:
                    if dep not in resource_addresses:
                        self.errors.append(f"Broken dependency: {dep}")
        
        return len(self.errors) == 0
    
    def repair_state(self) -> Dict[str, Any]:
        """Attempt to repair common state issues"""
        
        repaired_state = self.state.copy()
        repairs = []
        
        # Fix missing serial
        if 'serial' not in repaired_state:
            repaired_state['serial'] = 1
            repairs.append("Added missing serial number")
        
        # Fix missing version
        if 'version' not in repaired_state:
            repaired_state['version'] = 4
            repairs.append("Added missing version")
        
        # Remove broken dependencies
        for resource in repaired_state.get('resources', []):
            for instance in resource.get('instances', []):
                if 'dependencies' in instance:
                    valid_deps = []
                    for dep in instance['dependencies']:
                        # Check if dependency exists
                        dep_exists = any(
                            f"{r['type']}.{r['name']}" == dep 
                            for r in repaired_state.get('resources', [])
                        )
                        if dep_exists:
                            valid_deps.append(dep)
                        else:
                            repairs.append(f"Removed broken dependency: {dep}")
                    
                    instance['dependencies'] = valid_deps
        
        return repaired_state, repairs
    
    def generate_report(self) -> str:
        """Generate validation report"""
        
        report = ["Terraform State Validation Report", "=" * 40, ""]
        
        if self.errors:
            report.extend(["ERRORS:", ""])
            for error in self.errors:
                report.append(f"  ❌ {error}")
            report.append("")
        
        if self.warnings:
            report.extend(["WARNINGS:", ""])
            for warning in self.warnings:
                report.append(f"  ⚠️  {warning}")
            report.append("")
        
        if not self.errors and not self.warnings:
            report.append("✅ State file is valid")
        
        return "\n".join(report)

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='Terraform State Validator')
    parser.add_argument('state_file', help='Path to state file')
    parser.add_argument('--repair', action='store_true', help='Attempt to repair issues')
    parser.add_argument('--output', help='Output file for repaired state')
    
    args = parser.parse_args()
    
    try:
        validator = StateValidator(args.state_file)
        
        # Run validation
        validator.validate_structure()
        validator.validate_resources()
        validator.check_dependencies()
        
        # Print report
        print(validator.generate_report())
        
        # Repair if requested
        if args.repair and validator.errors:
            print("\nAttempting repairs...")
            repaired_state, repairs = validator.repair_state()
            
            output_file = args.output or f"{args.state_file}.repaired"
            
            with open(output_file, 'w') as f:
                json.dump(repaired_state, f, indent=2)
            
            print(f"\nRepairs made:")
            for repair in repairs:
                print(f"  🔧 {repair}")
            
            print(f"\nRepaired state saved to: {output_file}")
        
        # Exit with error code if validation failed
        sys.exit(1 if validator.errors else 0)
        
    except Exception as e:
        print(f"❌ Error validating state file: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

What’s Next

Disaster recovery capabilities ensure that even catastrophic state failures don’t result in permanent infrastructure loss. These tools and procedures provide multiple layers of protection and recovery options for different failure scenarios.

In the next part, we’ll explore performance optimization techniques that help manage large state files efficiently and reduce the time required for Terraform operations in complex environments.