Data and Storage Strategies
Multi-cloud data strategies require careful planning for replication, backup, disaster recovery, and compliance. Each cloud provider offers different storage services with varying performance characteristics, pricing models, and integration capabilities. This part covers patterns for managing data across multiple clouds effectively.
Cross-Cloud Data Replication
Set up automated data replication between cloud providers:
# Primary storage in AWS
resource "aws_s3_bucket" "primary" {
bucket = "company-data-primary"
tags = {
Environment = "production"
Role = "primary"
}
}
resource "aws_s3_bucket_versioning" "primary" {
bucket = aws_s3_bucket.primary.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_notification" "replication_trigger" {
bucket = aws_s3_bucket.primary.id
lambda_function {
lambda_function_arn = aws_lambda_function.cross_cloud_replicator.arn
events = ["s3:ObjectCreated:*"]
}
}
# Backup storage in Azure
resource "azurerm_storage_account" "backup" {
name = "companydatabackup"
resource_group_name = azurerm_resource_group.main.name
location = azurerm_resource_group.main.location
account_tier = "Standard"
account_replication_type = "GRS"
tags = {
Environment = "production"
Role = "backup"
}
}
resource "azurerm_storage_container" "backup" {
name = "data-backup"
storage_account_name = azurerm_storage_account.backup.name
container_access_type = "private"
}
# Archive storage in GCP
resource "google_storage_bucket" "archive" {
name = "company-data-archive"
location = "US"
storage_class = "COLDLINE"
lifecycle_rule {
condition {
age = 90
}
action {
type = "SetStorageClass"
storage_class = "ARCHIVE"
}
}
labels = {
environment = "production"
role = "archive"
}
}
# Cross-cloud replication Lambda
resource "aws_lambda_function" "cross_cloud_replicator" {
filename = "replicator.zip"
function_name = "cross-cloud-replicator"
role = aws_iam_role.replicator.arn
handler = "index.handler"
runtime = "python3.9"
timeout = 300
environment {
variables = {
AZURE_STORAGE_ACCOUNT = azurerm_storage_account.backup.name
AZURE_CONTAINER = azurerm_storage_container.backup.name
GCP_BUCKET = google_storage_bucket.archive.name
}
}
}
resource "aws_iam_role" "replicator" {
name = "cross-cloud-replicator-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy" "replicator" {
name = "cross-cloud-replicator-policy"
role = aws_iam_role.replicator.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:PutObject",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "*"
}
]
})
}
Database Replication Strategy
Implement cross-cloud database replication:
# Primary database in AWS RDS
resource "aws_db_instance" "primary" {
identifier = "company-db-primary"
engine = "postgres"
engine_version = "13.7"
instance_class = "db.t3.medium"
allocated_storage = 100
max_allocated_storage = 1000
storage_encrypted = true
db_name = "companydb"
username = "dbadmin"
password = var.db_password
vpc_security_group_ids = [aws_security_group.rds.id]
db_subnet_group_name = aws_db_subnet_group.main.name
backup_retention_period = 7
backup_window = "03:00-04:00"
maintenance_window = "sun:04:00-sun:05:00"
skip_final_snapshot = false
final_snapshot_identifier = "company-db-final-snapshot"
tags = {
Environment = "production"
Role = "primary"
}
}
# Read replica in Azure Database
resource "azurerm_postgresql_server" "replica" {
name = "company-db-replica"
location = azurerm_resource_group.main.location
resource_group_name = azurerm_resource_group.main.name
administrator_login = "dbadmin"
administrator_login_password = var.db_password
sku_name = "GP_Gen5_2"
version = "11"
storage_mb = 102400
backup_retention_days = 7
geo_redundant_backup_enabled = true
auto_grow_enabled = true
ssl_enforcement_enabled = true
tags = {
Environment = "production"
Role = "replica"
}
}
# Backup database in GCP Cloud SQL
resource "google_sql_database_instance" "backup" {
name = "company-db-backup"
database_version = "POSTGRES_13"
region = "us-central1"
settings {
tier = "db-custom-2-7680"
backup_configuration {
enabled = true
start_time = "03:00"
point_in_time_recovery_enabled = true
}
ip_configuration {
ipv4_enabled = false
private_network = google_compute_network.main.id
}
database_flags {
name = "log_statement"
value = "all"
}
}
deletion_protection = true
}
Data Synchronization Pipeline
Create automated data synchronization across clouds:
#!/usr/bin/env python3
# scripts/data_sync_pipeline.py
import boto3
import asyncio
from azure.storage.blob import BlobServiceClient
from google.cloud import storage as gcs
from typing import Dict, List, Optional
import hashlib
import json
from datetime import datetime
class MultiCloudDataSync:
def __init__(self, config: Dict[str, any]):
self.config = config
# Initialize cloud clients
self.s3_client = boto3.client('s3')
self.azure_client = BlobServiceClient(
account_url=f"https://{config['azure']['account_name']}.blob.core.windows.net",
credential=config['azure']['access_key']
)
self.gcp_client = gcs.Client(project=config['gcp']['project_id'])
async def sync_all_data(self) -> Dict[str, any]:
"""Synchronize data across all cloud providers"""
sync_results = {
'timestamp': datetime.utcnow().isoformat(),
'synced_objects': 0,
'errors': [],
'providers': {}
}
# Get source data inventory
source_objects = await self._get_source_inventory()
# Sync to each target provider
for target_provider in self.config['sync_targets']:
provider_results = await self._sync_to_provider(
source_objects,
target_provider
)
sync_results['providers'][target_provider] = provider_results
sync_results['synced_objects'] += provider_results.get('synced_count', 0)
sync_results['errors'].extend(provider_results.get('errors', []))
return sync_results
async def _get_source_inventory(self) -> List[Dict[str, any]]:
"""Get inventory of objects from source provider"""
source_config = self.config['source']
objects = []
if source_config['provider'] == 'aws':
paginator = self.s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=source_config['bucket']):
for obj in page.get('Contents', []):
objects.append({
'key': obj['Key'],
'size': obj['Size'],
'etag': obj['ETag'].strip('"'),
'last_modified': obj['LastModified'].isoformat(),
'provider': 'aws'
})
return objects
async def _sync_to_provider(self, source_objects: List[Dict], target_provider: str) -> Dict[str, any]:
"""Sync objects to target provider"""
results = {
'synced_count': 0,
'skipped_count': 0,
'errors': []
}
target_config = self.config['targets'][target_provider]
# Get existing objects in target
existing_objects = await self._get_target_inventory(target_provider, target_config)
existing_keys = {obj['key']: obj for obj in existing_objects}
for source_obj in source_objects:
try:
# Check if object needs sync
if await self._needs_sync(source_obj, existing_keys.get(source_obj['key'])):
await self._copy_object(source_obj, target_provider, target_config)
results['synced_count'] += 1
else:
results['skipped_count'] += 1
except Exception as e:
results['errors'].append({
'object': source_obj['key'],
'error': str(e)
})
return results
async def _get_target_inventory(self, provider: str, config: Dict) -> List[Dict[str, any]]:
"""Get inventory from target provider"""
objects = []
if provider == 'azure':
container_client = self.azure_client.get_container_client(config['container'])
async for blob in container_client.list_blobs():
objects.append({
'key': blob.name,
'size': blob.size,
'etag': blob.etag.strip('"'),
'last_modified': blob.last_modified.isoformat()
})
elif provider == 'gcp':
bucket = self.gcp_client.bucket(config['bucket'])
for blob in bucket.list_blobs():
objects.append({
'key': blob.name,
'size': blob.size,
'etag': blob.etag.strip('"'),
'last_modified': blob.time_created.isoformat()
})
return objects
async def _needs_sync(self, source_obj: Dict, target_obj: Optional[Dict]) -> bool:
"""Determine if object needs synchronization"""
if not target_obj:
return True
# Compare ETags (checksums)
if source_obj['etag'] != target_obj['etag']:
return True
# Compare sizes
if source_obj['size'] != target_obj['size']:
return True
return False
async def _copy_object(self, source_obj: Dict, target_provider: str, target_config: Dict):
"""Copy object from source to target provider"""
# Download from source
source_config = self.config['source']
if source_config['provider'] == 'aws':
response = self.s3_client.get_object(
Bucket=source_config['bucket'],
Key=source_obj['key']
)
data = response['Body'].read()
# Upload to target
if target_provider == 'azure':
blob_client = self.azure_client.get_blob_client(
container=target_config['container'],
blob=source_obj['key']
)
blob_client.upload_blob(data, overwrite=True)
elif target_provider == 'gcp':
bucket = self.gcp_client.bucket(target_config['bucket'])
blob = bucket.blob(source_obj['key'])
blob.upload_from_string(data)
def generate_sync_report(self, results: Dict[str, any]) -> str:
"""Generate human-readable sync report"""
report_lines = [
"Multi-Cloud Data Sync Report",
"=" * 40,
f"Timestamp: {results['timestamp']}",
f"Total Objects Synced: {results['synced_objects']}",
f"Total Errors: {len(results['errors'])}",
""
]
for provider, provider_results in results['providers'].items():
report_lines.extend([
f"{provider.upper()} Results:",
f" Synced: {provider_results['synced_count']}",
f" Skipped: {provider_results['skipped_count']}",
f" Errors: {len(provider_results['errors'])}",
""
])
if results['errors']:
report_lines.extend(["Errors:", ""])
for error in results['errors'][:10]: # Show first 10 errors
report_lines.append(f" {error['object']}: {error['error']}")
return "\n".join(report_lines)
async def main():
import argparse
parser = argparse.ArgumentParser(description='Multi-Cloud Data Sync')
parser.add_argument('--config', required=True, help='Sync configuration file')
parser.add_argument('--dry-run', action='store_true', help='Show what would be synced')
args = parser.parse_args()
with open(args.config, 'r') as f:
config = json.load(f)
sync_manager = MultiCloudDataSync(config)
if args.dry_run:
print("DRY RUN - No data will be copied")
results = await sync_manager.sync_all_data()
report = sync_manager.generate_sync_report(results)
print(report)
if __name__ == "__main__":
asyncio.run(main())
Disaster Recovery Automation
Implement automated disaster recovery across clouds:
#!/bin/bash
# scripts/disaster-recovery.sh
set -e
DR_CONFIG_FILE=${1:-"dr-config.json"}
RECOVERY_TYPE=${2:-"full"} # full, partial, test
execute_disaster_recovery() {
echo "🚨 Executing disaster recovery: $RECOVERY_TYPE"
# Load DR configuration
if [ ! -f "$DR_CONFIG_FILE" ]; then
echo "❌ DR configuration file not found: $DR_CONFIG_FILE"
exit 1
fi
PRIMARY_PROVIDER=$(jq -r '.primary_provider' "$DR_CONFIG_FILE")
DR_PROVIDER=$(jq -r '.dr_provider' "$DR_CONFIG_FILE")
echo "Primary: $PRIMARY_PROVIDER"
echo "DR Target: $DR_PROVIDER"
# Check primary provider health
if check_provider_health "$PRIMARY_PROVIDER"; then
echo "⚠️ Primary provider is healthy. Are you sure you want to proceed?"
read -p "Continue with DR? (y/N): " -n 1 -r
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
exit 1
fi
fi
# Execute recovery steps
case "$RECOVERY_TYPE" in
"full")
execute_full_recovery
;;
"partial")
execute_partial_recovery
;;
"test")
execute_test_recovery
;;
*)
echo "❌ Unknown recovery type: $RECOVERY_TYPE"
exit 1
;;
esac
}
check_provider_health() {
local provider=$1
case "$provider" in
"aws")
aws sts get-caller-identity >/dev/null 2>&1
;;
"azure")
az account show >/dev/null 2>&1
;;
"gcp")
gcloud auth list --filter=status:ACTIVE --format="value(account)" | head -1 >/dev/null 2>&1
;;
esac
}
execute_full_recovery() {
echo "🔄 Executing full disaster recovery..."
# 1. Activate DR infrastructure
activate_dr_infrastructure
# 2. Restore data from backups
restore_data_from_backups
# 3. Update DNS to point to DR site
update_dns_to_dr
# 4. Validate recovery
validate_recovery
echo "✅ Full disaster recovery completed"
}
activate_dr_infrastructure() {
echo "Activating DR infrastructure..."
DR_TERRAFORM_DIR=$(jq -r '.dr_terraform_dir' "$DR_CONFIG_FILE")
cd "$DR_TERRAFORM_DIR"
# Initialize and apply DR infrastructure
terraform init
terraform plan -var="dr_mode=active"
terraform apply -auto-approve -var="dr_mode=active"
# Wait for infrastructure to be ready
sleep 60
}
restore_data_from_backups() {
echo "Restoring data from backups..."
BACKUP_LOCATIONS=$(jq -r '.backup_locations[]' "$DR_CONFIG_FILE")
for backup_location in $BACKUP_LOCATIONS; do
echo "Restoring from: $backup_location"
# This would call provider-specific restore scripts
case "$DR_PROVIDER" in
"aws")
restore_from_s3_backup "$backup_location"
;;
"azure")
restore_from_azure_backup "$backup_location"
;;
"gcp")
restore_from_gcs_backup "$backup_location"
;;
esac
done
}
update_dns_to_dr() {
echo "Updating DNS to point to DR site..."
DR_ENDPOINT=$(jq -r '.dr_endpoint' "$DR_CONFIG_FILE")
DNS_ZONE=$(jq -r '.dns_zone' "$DR_CONFIG_FILE")
# Update DNS record to point to DR endpoint
aws route53 change-resource-record-sets \
--hosted-zone-id "$DNS_ZONE" \
--change-batch "{
\"Changes\": [{
\"Action\": \"UPSERT\",
\"ResourceRecordSet\": {
\"Name\": \"$(jq -r '.primary_domain' "$DR_CONFIG_FILE")\",
\"Type\": \"A\",
\"TTL\": 60,
\"ResourceRecords\": [{\"Value\": \"$DR_ENDPOINT\"}]
}
}]
}"
}
validate_recovery() {
echo "Validating disaster recovery..."
HEALTH_CHECK_URL=$(jq -r '.health_check_url' "$DR_CONFIG_FILE")
# Wait for application to be healthy
for i in {1..30}; do
if curl -f "$HEALTH_CHECK_URL" >/dev/null 2>&1; then
echo "✅ Application is healthy"
return 0
fi
echo "Waiting for application to be healthy... ($i/30)"
sleep 10
done
echo "❌ Application health check failed"
return 1
}
# Execute based on parameters
case "${3:-execute}" in
"execute")
execute_disaster_recovery
;;
"test")
echo "🧪 Testing disaster recovery procedures..."
RECOVERY_TYPE="test"
execute_disaster_recovery
;;
*)
echo "Usage: $0 <dr_config_file> <recovery_type> [execute|test]"
echo ""
echo "Recovery types: full, partial, test"
exit 1
;;
esac
What’s Next
Data and storage strategies provide the foundation for reliable multi-cloud operations, but monitoring and observability across multiple providers requires unified approaches. In the next part, we’ll explore how to implement comprehensive monitoring and observability that gives you visibility into your entire multi-cloud infrastructure from a single pane of glass.