Disaster Recovery
State file corruption, accidental deletions, and infrastructure drift can turn into disasters that threaten your entire infrastructure. When prevention fails, you need robust recovery procedures that can restore your Terraform state and get your infrastructure back under management.
This part covers disaster recovery strategies, state reconstruction techniques, and emergency procedures for the worst-case scenarios.
Automated State Backup
Implement comprehensive backup strategies:
#!/bin/bash
# scripts/state-backup.sh
set -e
BACKUP_BUCKET=${1:-"terraform-state-backups"}
STATE_BUCKET=${2:-"terraform-state"}
RETENTION_DAYS=${3:-30}
backup_state() {
local workspace=${1:-"default"}
local timestamp=$(date +%Y%m%d-%H%M%S)
echo "Backing up state for workspace: $workspace"
# Pull current state
terraform workspace select "$workspace"
terraform state pull > "/tmp/terraform-${workspace}-${timestamp}.tfstate"
# Upload to backup bucket
aws s3 cp "/tmp/terraform-${workspace}-${timestamp}.tfstate" \
"s3://$BACKUP_BUCKET/$workspace/terraform-${timestamp}.tfstate"
# Create metadata
cat > "/tmp/backup-metadata-${timestamp}.json" << EOF
{
"workspace": "$workspace",
"timestamp": "$timestamp",
"terraform_version": "$(terraform version -json | jq -r '.terraform_version')",
"state_serial": $(terraform state pull | jq '.serial'),
"resource_count": $(terraform state list | wc -l)
}
EOF
aws s3 cp "/tmp/backup-metadata-${timestamp}.json" \
"s3://$BACKUP_BUCKET/$workspace/metadata-${timestamp}.json"
# Cleanup temp files
rm -f "/tmp/terraform-${workspace}-${timestamp}.tfstate"
rm -f "/tmp/backup-metadata-${timestamp}.json"
echo "✅ Backup completed: $workspace"
}
cleanup_old_backups() {
echo "Cleaning up backups older than $RETENTION_DAYS days..."
cutoff_date=$(date -d "$RETENTION_DAYS days ago" +%Y%m%d)
aws s3 ls "s3://$BACKUP_BUCKET/" --recursive | while read -r line; do
backup_date=$(echo "$line" | grep -o '[0-9]\{8\}-[0-9]\{6\}' | head -1 | cut -d'-' -f1)
if [ "$backup_date" -lt "$cutoff_date" ]; then
file_path=$(echo "$line" | awk '{print $4}')
echo "Deleting old backup: $file_path"
aws s3 rm "s3://$BACKUP_BUCKET/$file_path"
fi
done
}
# Backup all workspaces
terraform workspace list | grep -v '^\*' | sed 's/^[[:space:]]*//' | while read -r workspace; do
if [ -n "$workspace" ]; then
backup_state "$workspace"
fi
done
cleanup_old_backups
echo "✅ All backups completed"
State Reconstruction
Rebuild state from existing infrastructure:
#!/usr/bin/env python3
# scripts/state_reconstructor.py
import boto3
import json
import subprocess
from typing import Dict, List, Tuple
class StateReconstructor:
def __init__(self, region: str = "us-west-2"):
self.ec2 = boto3.client('ec2', region_name=region)
self.rds = boto3.client('rds', region_name=region)
self.s3 = boto3.client('s3')
self.region = region
def discover_infrastructure(self) -> Dict[str, List[Tuple[str, str]]]:
"""Discover existing infrastructure for reconstruction"""
resources = {
'aws_instance': self._discover_instances(),
'aws_vpc': self._discover_vpcs(),
'aws_subnet': self._discover_subnets(),
'aws_security_group': self._discover_security_groups(),
'aws_s3_bucket': self._discover_s3_buckets(),
'aws_db_instance': self._discover_rds_instances()
}
return resources
def _discover_instances(self) -> List[Tuple[str, str]]:
instances = []
response = self.ec2.describe_instances()
for reservation in response['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] != 'terminated':
name = self._get_name_tag(instance.get('Tags', []))
instances.append((f"aws_instance.{name}", instance['InstanceId']))
return instances
def _discover_vpcs(self) -> List[Tuple[str, str]]:
vpcs = []
response = self.ec2.describe_vpcs()
for vpc in response['Vpcs']:
name = self._get_name_tag(vpc.get('Tags', []))
vpcs.append((f"aws_vpc.{name}", vpc['VpcId']))
return vpcs
def _discover_subnets(self) -> List[Tuple[str, str]]:
subnets = []
response = self.ec2.describe_subnets()
for subnet in response['Subnets']:
name = self._get_name_tag(subnet.get('Tags', []))
subnets.append((f"aws_subnet.{name}", subnet['SubnetId']))
return subnets
def _discover_security_groups(self) -> List[Tuple[str, str]]:
sgs = []
response = self.ec2.describe_security_groups()
for sg in response['SecurityGroups']:
if sg['GroupName'] != 'default':
name = sg['GroupName'].replace('-', '_')
sgs.append((f"aws_security_group.{name}", sg['GroupId']))
return sgs
def _discover_s3_buckets(self) -> List[Tuple[str, str]]:
buckets = []
response = self.s3.list_buckets()
for bucket in response['Buckets']:
name = bucket['Name'].replace('-', '_').replace('.', '_')
buckets.append((f"aws_s3_bucket.{name}", bucket['Name']))
return buckets
def _discover_rds_instances(self) -> List[Tuple[str, str]]:
instances = []
response = self.rds.describe_db_instances()
for db in response['DBInstances']:
if db['DBInstanceStatus'] != 'deleting':
name = db['DBInstanceIdentifier'].replace('-', '_')
instances.append((f"aws_db_instance.{name}", db['DBInstanceIdentifier']))
return instances
def _get_name_tag(self, tags: List[Dict]) -> str:
for tag in tags:
if tag['Key'] == 'Name':
return tag['Value'].lower().replace(' ', '_').replace('-', '_')
return 'unnamed'
def generate_import_script(self, resources: Dict[str, List[Tuple[str, str]]]) -> str:
"""Generate import script for discovered resources"""
script_lines = [
"#!/bin/bash",
"set -e",
"",
"echo 'Starting state reconstruction...'",
"",
"# Backup any existing state",
"if [ -f terraform.tfstate ]; then",
" cp terraform.tfstate terraform.tfstate.backup.$(date +%Y%m%d-%H%M%S)",
"fi",
""
]
for resource_type, resource_list in resources.items():
if resource_list:
script_lines.append(f"# Import {resource_type} resources")
for terraform_address, resource_id in resource_list:
script_lines.append(f"echo 'Importing {terraform_address}...'")
script_lines.append(f"terraform import '{terraform_address}' '{resource_id}' || echo 'Failed to import {terraform_address}'")
script_lines.append("")
script_lines.extend([
"echo 'State reconstruction completed'",
"terraform state list"
])
return '\n'.join(script_lines)
def reconstruct_state(self, output_dir: str = "."):
"""Full state reconstruction process"""
print("🔍 Discovering existing infrastructure...")
resources = self.discover_infrastructure()
total_resources = sum(len(resource_list) for resource_list in resources.values())
print(f"Found {total_resources} resources to reconstruct")
# Generate import script
import_script = self.generate_import_script(resources)
with open(f"{output_dir}/reconstruct_state.sh", 'w') as f:
f.write(import_script)
# Make script executable
subprocess.run(['chmod', '+x', f"{output_dir}/reconstruct_state.sh"])
# Generate basic Terraform configuration
self._generate_basic_config(resources, output_dir)
print(f"✅ Reconstruction files generated in {output_dir}")
print("Run ./reconstruct_state.sh to import resources")
def _generate_basic_config(self, resources: Dict[str, List[Tuple[str, str]]], output_dir: str):
"""Generate basic Terraform configuration for discovered resources"""
config_lines = []
for resource_type, resource_list in resources.items():
for terraform_address, resource_id in resource_list:
resource_name = terraform_address.split('.')[1]
if resource_type == "aws_instance":
config_lines.append(f'''
resource "aws_instance" "{resource_name}" {{
# Configuration will be populated after import
lifecycle {{
ignore_changes = [ami, user_data]
}}
}}''')
elif resource_type == "aws_vpc":
config_lines.append(f'''
resource "aws_vpc" "{resource_name}" {{
# Configuration will be populated after import
}}''')
elif resource_type == "aws_subnet":
config_lines.append(f'''
resource "aws_subnet" "{resource_name}" {{
# Configuration will be populated after import
}}''')
elif resource_type == "aws_security_group":
config_lines.append(f'''
resource "aws_security_group" "{resource_name}" {{
# Configuration will be populated after import
}}''')
elif resource_type == "aws_s3_bucket":
config_lines.append(f'''
resource "aws_s3_bucket" "{resource_name}" {{
bucket = "{resource_id}"
}}''')
elif resource_type == "aws_db_instance":
config_lines.append(f'''
resource "aws_db_instance" "{resource_name}" {{
identifier = "{resource_id}"
skip_final_snapshot = true
}}''')
with open(f"{output_dir}/reconstructed.tf", 'w') as f:
f.write('\n'.join(config_lines))
def main():
import argparse
parser = argparse.ArgumentParser(description='Terraform State Reconstructor')
parser.add_argument('--region', default='us-west-2', help='AWS region')
parser.add_argument('--output-dir', default='.', help='Output directory')
args = parser.parse_args()
reconstructor = StateReconstructor(args.region)
reconstructor.reconstruct_state(args.output_dir)
if __name__ == "__main__":
main()
Emergency Recovery Procedures
Handle critical state corruption scenarios:
#!/bin/bash
# scripts/emergency-recovery.sh
set -e
BACKUP_BUCKET=${1:-"terraform-state-backups"}
WORKSPACE=${2:-"default"}
emergency_restore() {
echo "🚨 EMERGENCY STATE RECOVERY"
echo "Workspace: $WORKSPACE"
# List available backups
echo "Available backups:"
aws s3 ls "s3://$BACKUP_BUCKET/$WORKSPACE/" --recursive | grep '\.tfstate$' | tail -10
read -p "Enter backup filename (or 'latest' for most recent): " backup_choice
if [ "$backup_choice" = "latest" ]; then
BACKUP_FILE=$(aws s3 ls "s3://$BACKUP_BUCKET/$WORKSPACE/" --recursive | grep '\.tfstate$' | tail -1 | awk '{print $4}')
else
BACKUP_FILE="$WORKSPACE/$backup_choice"
fi
echo "Restoring from: $BACKUP_FILE"
# Download backup
aws s3 cp "s3://$BACKUP_BUCKET/$BACKUP_FILE" "/tmp/restore.tfstate"
# Validate backup
if ! jq empty "/tmp/restore.tfstate" 2>/dev/null; then
echo "❌ Invalid backup file"
exit 1
fi
# Create safety backup of current state
if [ -f "terraform.tfstate" ]; then
cp terraform.tfstate "terraform.tfstate.emergency-backup.$(date +%Y%m%d-%H%M%S)"
fi
# Restore state
terraform workspace select "$WORKSPACE"
terraform state push "/tmp/restore.tfstate"
# Verify restoration
echo "Verifying restored state..."
terraform plan -detailed-exitcode
if [ $? -eq 0 ]; then
echo "✅ Emergency recovery successful"
else
echo "⚠️ Recovery completed but state may need adjustment"
fi
rm -f "/tmp/restore.tfstate"
}
partial_recovery() {
echo "🔧 PARTIAL STATE RECOVERY"
# Extract specific resources from backup
read -p "Enter resource addresses to recover (space-separated): " resources
BACKUP_FILE=$(aws s3 ls "s3://$BACKUP_BUCKET/$WORKSPACE/" --recursive | grep '\.tfstate$' | tail -1 | awk '{print $4}')
aws s3 cp "s3://$BACKUP_BUCKET/$BACKUP_FILE" "/tmp/backup.tfstate"
for resource in $resources; do
echo "Recovering resource: $resource"
# Extract resource from backup
jq ".resources[] | select(.name == \"${resource##*.}\" and .type == \"${resource%.*}\")" "/tmp/backup.tfstate" > "/tmp/resource.json"
if [ -s "/tmp/resource.json" ]; then
# Get resource ID for import
RESOURCE_ID=$(jq -r '.instances[0].attributes.id // .instances[0].attributes.arn // empty' "/tmp/resource.json")
if [ -n "$RESOURCE_ID" ]; then
echo "Importing $resource with ID: $RESOURCE_ID"
terraform import "$resource" "$RESOURCE_ID"
else
echo "⚠️ Could not determine resource ID for $resource"
fi
else
echo "❌ Resource $resource not found in backup"
fi
done
rm -f "/tmp/backup.tfstate" "/tmp/resource.json"
}
drift_recovery() {
echo "🔄 INFRASTRUCTURE DRIFT RECOVERY"
# Detect drift
echo "Detecting infrastructure drift..."
terraform plan -out=drift.tfplan
# Show drift summary
terraform show -json drift.tfplan | jq -r '
.resource_changes[] |
select(.change.actions[] | contains("update") or contains("delete") or contains("create")) |
"\(.change.actions | join(",")): \(.address)"
'
read -p "Apply changes to fix drift? (y/N): " apply_changes
if [[ $apply_changes =~ ^[Yy]$ ]]; then
terraform apply drift.tfplan
echo "✅ Drift recovery completed"
else
echo "Drift recovery cancelled"
fi
rm -f drift.tfplan
}
case "${3:-help}" in
"emergency")
emergency_restore
;;
"partial")
partial_recovery
;;
"drift")
drift_recovery
;;
*)
echo "Usage: $0 <backup_bucket> <workspace> [emergency|partial|drift]"
echo ""
echo "Recovery modes:"
echo " emergency - Full state restoration from backup"
echo " partial - Recover specific resources from backup"
echo " drift - Detect and fix infrastructure drift"
exit 1
;;
esac
State Validation and Repair
Validate and repair corrupted state files:
#!/usr/bin/env python3
# scripts/state_validator.py
import json
import sys
from typing import Dict, List, Any
class StateValidator:
def __init__(self, state_file: str):
with open(state_file, 'r') as f:
self.state = json.load(f)
self.errors = []
self.warnings = []
def validate_structure(self) -> bool:
"""Validate basic state file structure"""
required_fields = ['version', 'terraform_version', 'serial', 'resources']
for field in required_fields:
if field not in self.state:
self.errors.append(f"Missing required field: {field}")
if 'resources' in self.state:
if not isinstance(self.state['resources'], list):
self.errors.append("Resources field must be a list")
return len(self.errors) == 0
def validate_resources(self) -> bool:
"""Validate resource definitions"""
if 'resources' not in self.state:
return False
for i, resource in enumerate(self.state['resources']):
resource_path = f"resources[{i}]"
# Check required resource fields
required_fields = ['mode', 'type', 'name', 'instances']
for field in required_fields:
if field not in resource:
self.errors.append(f"{resource_path}: Missing field '{field}'")
# Validate instances
if 'instances' in resource:
for j, instance in enumerate(resource['instances']):
instance_path = f"{resource_path}.instances[{j}]"
if 'attributes' not in instance:
self.errors.append(f"{instance_path}: Missing attributes")
if 'schema_version' not in instance:
self.warnings.append(f"{instance_path}: Missing schema_version")
return len(self.errors) == 0
def check_dependencies(self) -> bool:
"""Check for broken dependencies"""
resource_addresses = set()
dependencies = []
# Collect all resource addresses
for resource in self.state.get('resources', []):
address = f"{resource['type']}.{resource['name']}"
resource_addresses.add(address)
# Check dependencies
for resource in self.state.get('resources', []):
for instance in resource.get('instances', []):
deps = instance.get('dependencies', [])
for dep in deps:
if dep not in resource_addresses:
self.errors.append(f"Broken dependency: {dep}")
return len(self.errors) == 0
def repair_state(self) -> Dict[str, Any]:
"""Attempt to repair common state issues"""
repaired_state = self.state.copy()
repairs = []
# Fix missing serial
if 'serial' not in repaired_state:
repaired_state['serial'] = 1
repairs.append("Added missing serial number")
# Fix missing version
if 'version' not in repaired_state:
repaired_state['version'] = 4
repairs.append("Added missing version")
# Remove broken dependencies
for resource in repaired_state.get('resources', []):
for instance in resource.get('instances', []):
if 'dependencies' in instance:
valid_deps = []
for dep in instance['dependencies']:
# Check if dependency exists
dep_exists = any(
f"{r['type']}.{r['name']}" == dep
for r in repaired_state.get('resources', [])
)
if dep_exists:
valid_deps.append(dep)
else:
repairs.append(f"Removed broken dependency: {dep}")
instance['dependencies'] = valid_deps
return repaired_state, repairs
def generate_report(self) -> str:
"""Generate validation report"""
report = ["Terraform State Validation Report", "=" * 40, ""]
if self.errors:
report.extend(["ERRORS:", ""])
for error in self.errors:
report.append(f" ❌ {error}")
report.append("")
if self.warnings:
report.extend(["WARNINGS:", ""])
for warning in self.warnings:
report.append(f" ⚠️ {warning}")
report.append("")
if not self.errors and not self.warnings:
report.append("✅ State file is valid")
return "\n".join(report)
def main():
import argparse
parser = argparse.ArgumentParser(description='Terraform State Validator')
parser.add_argument('state_file', help='Path to state file')
parser.add_argument('--repair', action='store_true', help='Attempt to repair issues')
parser.add_argument('--output', help='Output file for repaired state')
args = parser.parse_args()
try:
validator = StateValidator(args.state_file)
# Run validation
validator.validate_structure()
validator.validate_resources()
validator.check_dependencies()
# Print report
print(validator.generate_report())
# Repair if requested
if args.repair and validator.errors:
print("\nAttempting repairs...")
repaired_state, repairs = validator.repair_state()
output_file = args.output or f"{args.state_file}.repaired"
with open(output_file, 'w') as f:
json.dump(repaired_state, f, indent=2)
print(f"\nRepairs made:")
for repair in repairs:
print(f" 🔧 {repair}")
print(f"\nRepaired state saved to: {output_file}")
# Exit with error code if validation failed
sys.exit(1 if validator.errors else 0)
except Exception as e:
print(f"❌ Error validating state file: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
What’s Next
Disaster recovery capabilities ensure that even catastrophic state failures don’t result in permanent infrastructure loss. These tools and procedures provide multiple layers of protection and recovery options for different failure scenarios.
In the next part, we’ll explore performance optimization techniques that help manage large state files efficiently and reduce the time required for Terraform operations in complex environments.