Data and Storage Strategies

Multi-cloud data strategies require careful planning for replication, backup, disaster recovery, and compliance. Each cloud provider offers different storage services with varying performance characteristics, pricing models, and integration capabilities. This part covers patterns for managing data across multiple clouds effectively.

Cross-Cloud Data Replication

Set up automated data replication between cloud providers:

# Primary storage in AWS
resource "aws_s3_bucket" "primary" {
  bucket = "company-data-primary"
  
  tags = {
    Environment = "production"
    Role        = "primary"
  }
}

resource "aws_s3_bucket_versioning" "primary" {
  bucket = aws_s3_bucket.primary.id
  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_notification" "replication_trigger" {
  bucket = aws_s3_bucket.primary.id
  
  lambda_function {
    lambda_function_arn = aws_lambda_function.cross_cloud_replicator.arn
    events              = ["s3:ObjectCreated:*"]
  }
}

# Backup storage in Azure
resource "azurerm_storage_account" "backup" {
  name                     = "companydatabackup"
  resource_group_name      = azurerm_resource_group.main.name
  location                 = azurerm_resource_group.main.location
  account_tier             = "Standard"
  account_replication_type = "GRS"
  
  tags = {
    Environment = "production"
    Role        = "backup"
  }
}

resource "azurerm_storage_container" "backup" {
  name                  = "data-backup"
  storage_account_name  = azurerm_storage_account.backup.name
  container_access_type = "private"
}

# Archive storage in GCP
resource "google_storage_bucket" "archive" {
  name     = "company-data-archive"
  location = "US"
  
  storage_class = "COLDLINE"
  
  lifecycle_rule {
    condition {
      age = 90
    }
    action {
      type          = "SetStorageClass"
      storage_class = "ARCHIVE"
    }
  }
  
  labels = {
    environment = "production"
    role        = "archive"
  }
}

# Cross-cloud replication Lambda
resource "aws_lambda_function" "cross_cloud_replicator" {
  filename         = "replicator.zip"
  function_name    = "cross-cloud-replicator"
  role            = aws_iam_role.replicator.arn
  handler         = "index.handler"
  runtime         = "python3.9"
  timeout         = 300
  
  environment {
    variables = {
      AZURE_STORAGE_ACCOUNT = azurerm_storage_account.backup.name
      AZURE_CONTAINER       = azurerm_storage_container.backup.name
      GCP_BUCKET           = google_storage_bucket.archive.name
    }
  }
}

resource "aws_iam_role" "replicator" {
  name = "cross-cloud-replicator-role"
  
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "replicator" {
  name = "cross-cloud-replicator-policy"
  role = aws_iam_role.replicator.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject",
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "*"
      }
    ]
  })
}

Database Replication Strategy

Implement cross-cloud database replication:

# Primary database in AWS RDS
resource "aws_db_instance" "primary" {
  identifier = "company-db-primary"
  
  engine         = "postgres"
  engine_version = "13.7"
  instance_class = "db.t3.medium"
  
  allocated_storage     = 100
  max_allocated_storage = 1000
  storage_encrypted     = true
  
  db_name  = "companydb"
  username = "dbadmin"
  password = var.db_password
  
  vpc_security_group_ids = [aws_security_group.rds.id]
  db_subnet_group_name   = aws_db_subnet_group.main.name
  
  backup_retention_period = 7
  backup_window          = "03:00-04:00"
  maintenance_window     = "sun:04:00-sun:05:00"
  
  skip_final_snapshot = false
  final_snapshot_identifier = "company-db-final-snapshot"
  
  tags = {
    Environment = "production"
    Role        = "primary"
  }
}

# Read replica in Azure Database
resource "azurerm_postgresql_server" "replica" {
  name                = "company-db-replica"
  location            = azurerm_resource_group.main.location
  resource_group_name = azurerm_resource_group.main.name
  
  administrator_login          = "dbadmin"
  administrator_login_password = var.db_password
  
  sku_name   = "GP_Gen5_2"
  version    = "11"
  storage_mb = 102400
  
  backup_retention_days        = 7
  geo_redundant_backup_enabled = true
  auto_grow_enabled           = true
  
  ssl_enforcement_enabled = true
  
  tags = {
    Environment = "production"
    Role        = "replica"
  }
}

# Backup database in GCP Cloud SQL
resource "google_sql_database_instance" "backup" {
  name             = "company-db-backup"
  database_version = "POSTGRES_13"
  region          = "us-central1"
  
  settings {
    tier = "db-custom-2-7680"
    
    backup_configuration {
      enabled    = true
      start_time = "03:00"
      
      point_in_time_recovery_enabled = true
    }
    
    ip_configuration {
      ipv4_enabled = false
      private_network = google_compute_network.main.id
    }
    
    database_flags {
      name  = "log_statement"
      value = "all"
    }
  }
  
  deletion_protection = true
}

Data Synchronization Pipeline

Create automated data synchronization across clouds:

#!/usr/bin/env python3
# scripts/data_sync_pipeline.py

import boto3
import asyncio
from azure.storage.blob import BlobServiceClient
from google.cloud import storage as gcs
from typing import Dict, List, Optional
import hashlib
import json
from datetime import datetime

class MultiCloudDataSync:
    def __init__(self, config: Dict[str, any]):
        self.config = config
        
        # Initialize cloud clients
        self.s3_client = boto3.client('s3')
        self.azure_client = BlobServiceClient(
            account_url=f"https://{config['azure']['account_name']}.blob.core.windows.net",
            credential=config['azure']['access_key']
        )
        self.gcp_client = gcs.Client(project=config['gcp']['project_id'])
    
    async def sync_all_data(self) -> Dict[str, any]:
        """Synchronize data across all cloud providers"""
        
        sync_results = {
            'timestamp': datetime.utcnow().isoformat(),
            'synced_objects': 0,
            'errors': [],
            'providers': {}
        }
        
        # Get source data inventory
        source_objects = await self._get_source_inventory()
        
        # Sync to each target provider
        for target_provider in self.config['sync_targets']:
            provider_results = await self._sync_to_provider(
                source_objects, 
                target_provider
            )
            sync_results['providers'][target_provider] = provider_results
            sync_results['synced_objects'] += provider_results.get('synced_count', 0)
            sync_results['errors'].extend(provider_results.get('errors', []))
        
        return sync_results
    
    async def _get_source_inventory(self) -> List[Dict[str, any]]:
        """Get inventory of objects from source provider"""
        
        source_config = self.config['source']
        objects = []
        
        if source_config['provider'] == 'aws':
            paginator = self.s3_client.get_paginator('list_objects_v2')
            
            for page in paginator.paginate(Bucket=source_config['bucket']):
                for obj in page.get('Contents', []):
                    objects.append({
                        'key': obj['Key'],
                        'size': obj['Size'],
                        'etag': obj['ETag'].strip('"'),
                        'last_modified': obj['LastModified'].isoformat(),
                        'provider': 'aws'
                    })
        
        return objects
    
    async def _sync_to_provider(self, source_objects: List[Dict], target_provider: str) -> Dict[str, any]:
        """Sync objects to target provider"""
        
        results = {
            'synced_count': 0,
            'skipped_count': 0,
            'errors': []
        }
        
        target_config = self.config['targets'][target_provider]
        
        # Get existing objects in target
        existing_objects = await self._get_target_inventory(target_provider, target_config)
        existing_keys = {obj['key']: obj for obj in existing_objects}
        
        for source_obj in source_objects:
            try:
                # Check if object needs sync
                if await self._needs_sync(source_obj, existing_keys.get(source_obj['key'])):
                    await self._copy_object(source_obj, target_provider, target_config)
                    results['synced_count'] += 1
                else:
                    results['skipped_count'] += 1
                    
            except Exception as e:
                results['errors'].append({
                    'object': source_obj['key'],
                    'error': str(e)
                })
        
        return results
    
    async def _get_target_inventory(self, provider: str, config: Dict) -> List[Dict[str, any]]:
        """Get inventory from target provider"""
        
        objects = []
        
        if provider == 'azure':
            container_client = self.azure_client.get_container_client(config['container'])
            
            async for blob in container_client.list_blobs():
                objects.append({
                    'key': blob.name,
                    'size': blob.size,
                    'etag': blob.etag.strip('"'),
                    'last_modified': blob.last_modified.isoformat()
                })
        
        elif provider == 'gcp':
            bucket = self.gcp_client.bucket(config['bucket'])
            
            for blob in bucket.list_blobs():
                objects.append({
                    'key': blob.name,
                    'size': blob.size,
                    'etag': blob.etag.strip('"'),
                    'last_modified': blob.time_created.isoformat()
                })
        
        return objects
    
    async def _needs_sync(self, source_obj: Dict, target_obj: Optional[Dict]) -> bool:
        """Determine if object needs synchronization"""
        
        if not target_obj:
            return True
        
        # Compare ETags (checksums)
        if source_obj['etag'] != target_obj['etag']:
            return True
        
        # Compare sizes
        if source_obj['size'] != target_obj['size']:
            return True
        
        return False
    
    async def _copy_object(self, source_obj: Dict, target_provider: str, target_config: Dict):
        """Copy object from source to target provider"""
        
        # Download from source
        source_config = self.config['source']
        
        if source_config['provider'] == 'aws':
            response = self.s3_client.get_object(
                Bucket=source_config['bucket'],
                Key=source_obj['key']
            )
            data = response['Body'].read()
        
        # Upload to target
        if target_provider == 'azure':
            blob_client = self.azure_client.get_blob_client(
                container=target_config['container'],
                blob=source_obj['key']
            )
            blob_client.upload_blob(data, overwrite=True)
        
        elif target_provider == 'gcp':
            bucket = self.gcp_client.bucket(target_config['bucket'])
            blob = bucket.blob(source_obj['key'])
            blob.upload_from_string(data)
    
    def generate_sync_report(self, results: Dict[str, any]) -> str:
        """Generate human-readable sync report"""
        
        report_lines = [
            "Multi-Cloud Data Sync Report",
            "=" * 40,
            f"Timestamp: {results['timestamp']}",
            f"Total Objects Synced: {results['synced_objects']}",
            f"Total Errors: {len(results['errors'])}",
            ""
        ]
        
        for provider, provider_results in results['providers'].items():
            report_lines.extend([
                f"{provider.upper()} Results:",
                f"  Synced: {provider_results['synced_count']}",
                f"  Skipped: {provider_results['skipped_count']}",
                f"  Errors: {len(provider_results['errors'])}",
                ""
            ])
        
        if results['errors']:
            report_lines.extend(["Errors:", ""])
            for error in results['errors'][:10]:  # Show first 10 errors
                report_lines.append(f"  {error['object']}: {error['error']}")
        
        return "\n".join(report_lines)

async def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='Multi-Cloud Data Sync')
    parser.add_argument('--config', required=True, help='Sync configuration file')
    parser.add_argument('--dry-run', action='store_true', help='Show what would be synced')
    
    args = parser.parse_args()
    
    with open(args.config, 'r') as f:
        config = json.load(f)
    
    sync_manager = MultiCloudDataSync(config)
    
    if args.dry_run:
        print("DRY RUN - No data will be copied")
    
    results = await sync_manager.sync_all_data()
    report = sync_manager.generate_sync_report(results)
    
    print(report)

if __name__ == "__main__":
    asyncio.run(main())

Disaster Recovery Automation

Implement automated disaster recovery across clouds:

#!/bin/bash
# scripts/disaster-recovery.sh

set -e

DR_CONFIG_FILE=${1:-"dr-config.json"}
RECOVERY_TYPE=${2:-"full"}  # full, partial, test

execute_disaster_recovery() {
    echo "🚨 Executing disaster recovery: $RECOVERY_TYPE"
    
    # Load DR configuration
    if [ ! -f "$DR_CONFIG_FILE" ]; then
        echo "❌ DR configuration file not found: $DR_CONFIG_FILE"
        exit 1
    fi
    
    PRIMARY_PROVIDER=$(jq -r '.primary_provider' "$DR_CONFIG_FILE")
    DR_PROVIDER=$(jq -r '.dr_provider' "$DR_CONFIG_FILE")
    
    echo "Primary: $PRIMARY_PROVIDER"
    echo "DR Target: $DR_PROVIDER"
    
    # Check primary provider health
    if check_provider_health "$PRIMARY_PROVIDER"; then
        echo "⚠️  Primary provider is healthy. Are you sure you want to proceed?"
        read -p "Continue with DR? (y/N): " -n 1 -r
        echo
        if [[ ! $REPLY =~ ^[Yy]$ ]]; then
            exit 1
        fi
    fi
    
    # Execute recovery steps
    case "$RECOVERY_TYPE" in
        "full")
            execute_full_recovery
            ;;
        "partial")
            execute_partial_recovery
            ;;
        "test")
            execute_test_recovery
            ;;
        *)
            echo "❌ Unknown recovery type: $RECOVERY_TYPE"
            exit 1
            ;;
    esac
}

check_provider_health() {
    local provider=$1
    
    case "$provider" in
        "aws")
            aws sts get-caller-identity >/dev/null 2>&1
            ;;
        "azure")
            az account show >/dev/null 2>&1
            ;;
        "gcp")
            gcloud auth list --filter=status:ACTIVE --format="value(account)" | head -1 >/dev/null 2>&1
            ;;
    esac
}

execute_full_recovery() {
    echo "🔄 Executing full disaster recovery..."
    
    # 1. Activate DR infrastructure
    activate_dr_infrastructure
    
    # 2. Restore data from backups
    restore_data_from_backups
    
    # 3. Update DNS to point to DR site
    update_dns_to_dr
    
    # 4. Validate recovery
    validate_recovery
    
    echo "✅ Full disaster recovery completed"
}

activate_dr_infrastructure() {
    echo "Activating DR infrastructure..."
    
    DR_TERRAFORM_DIR=$(jq -r '.dr_terraform_dir' "$DR_CONFIG_FILE")
    
    cd "$DR_TERRAFORM_DIR"
    
    # Initialize and apply DR infrastructure
    terraform init
    terraform plan -var="dr_mode=active"
    terraform apply -auto-approve -var="dr_mode=active"
    
    # Wait for infrastructure to be ready
    sleep 60
}

restore_data_from_backups() {
    echo "Restoring data from backups..."
    
    BACKUP_LOCATIONS=$(jq -r '.backup_locations[]' "$DR_CONFIG_FILE")
    
    for backup_location in $BACKUP_LOCATIONS; do
        echo "Restoring from: $backup_location"
        
        # This would call provider-specific restore scripts
        case "$DR_PROVIDER" in
            "aws")
                restore_from_s3_backup "$backup_location"
                ;;
            "azure")
                restore_from_azure_backup "$backup_location"
                ;;
            "gcp")
                restore_from_gcs_backup "$backup_location"
                ;;
        esac
    done
}

update_dns_to_dr() {
    echo "Updating DNS to point to DR site..."
    
    DR_ENDPOINT=$(jq -r '.dr_endpoint' "$DR_CONFIG_FILE")
    DNS_ZONE=$(jq -r '.dns_zone' "$DR_CONFIG_FILE")
    
    # Update DNS record to point to DR endpoint
    aws route53 change-resource-record-sets \
        --hosted-zone-id "$DNS_ZONE" \
        --change-batch "{
            \"Changes\": [{
                \"Action\": \"UPSERT\",
                \"ResourceRecordSet\": {
                    \"Name\": \"$(jq -r '.primary_domain' "$DR_CONFIG_FILE")\",
                    \"Type\": \"A\",
                    \"TTL\": 60,
                    \"ResourceRecords\": [{\"Value\": \"$DR_ENDPOINT\"}]
                }
            }]
        }"
}

validate_recovery() {
    echo "Validating disaster recovery..."
    
    HEALTH_CHECK_URL=$(jq -r '.health_check_url' "$DR_CONFIG_FILE")
    
    # Wait for application to be healthy
    for i in {1..30}; do
        if curl -f "$HEALTH_CHECK_URL" >/dev/null 2>&1; then
            echo "✅ Application is healthy"
            return 0
        fi
        
        echo "Waiting for application to be healthy... ($i/30)"
        sleep 10
    done
    
    echo "❌ Application health check failed"
    return 1
}

# Execute based on parameters
case "${3:-execute}" in
    "execute")
        execute_disaster_recovery
        ;;
    "test")
        echo "🧪 Testing disaster recovery procedures..."
        RECOVERY_TYPE="test"
        execute_disaster_recovery
        ;;
    *)
        echo "Usage: $0 <dr_config_file> <recovery_type> [execute|test]"
        echo ""
        echo "Recovery types: full, partial, test"
        exit 1
        ;;
esac

What’s Next

Data and storage strategies provide the foundation for reliable multi-cloud operations, but monitoring and observability across multiple providers requires unified approaches. In the next part, we’ll explore how to implement comprehensive monitoring and observability that gives you visibility into your entire multi-cloud infrastructure from a single pane of glass.