Data Enrichment and Validation

Complex data processing often requires enriching data from multiple sources and validating it asynchronously. Let’s explore patterns for parallel data enrichment and validation.

Parallel Data Enrichment

Enrich data by calling multiple services concurrently:

import asyncio

async def enrich_user_data(user_id: int):
    """Enrich user data from multiple sources"""
    
    async def get_profile(user_id):
        await asyncio.sleep(0.2)  # Simulate API call
        return {"name": f"User {user_id}", "age": 25 + (user_id % 40)}
    
    async def get_preferences(user_id):
        await asyncio.sleep(0.3)  # Simulate API call
        return {"theme": "dark", "notifications": True}
    
    async def get_activity(user_id):
        await asyncio.sleep(0.1)  # Simulate API call
        return {"last_login": "2024-01-01", "login_count": user_id * 10}

These three functions simulate calling different microservices or APIs. In a real application, these might be calls to user service, preferences service, and analytics service.

The key is to fetch all data concurrently rather than sequentially:

    # Fetch all data concurrently
    profile, preferences, activity = await asyncio.gather(
        get_profile(user_id),
        get_preferences(user_id),
        get_activity(user_id),
        return_exceptions=True
    )

Using return_exceptions=True ensures that if one service fails, the others can still succeed. This makes the enrichment process resilient to partial failures.

Finally, we combine the results safely:

    # Combine results, handling failures gracefully
    enriched_data = {"user_id": user_id}
    
    if isinstance(profile, dict):
        enriched_data.update(profile)
    
    if isinstance(preferences, dict):
        enriched_data["preferences"] = preferences
    
    if isinstance(activity, dict):
        enriched_data["activity"] = activity
    
    return enriched_data

This pattern fetches data from multiple sources simultaneously, significantly reducing total processing time.

Batch Enrichment Processing

Process multiple users efficiently:

async def batch_enrich_users(user_ids, batch_size=20):
    """Enrich multiple users in batches"""
    results = []
    
    for i in range(0, len(user_ids), batch_size):
        batch = user_ids[i:i + batch_size]
        
        print(f"Enriching batch {i//batch_size + 1}: {len(batch)} users")
        
        # Enrich batch concurrently
        batch_tasks = [enrich_user_data(uid) for uid in batch]
        batch_results = await asyncio.gather(*batch_tasks)
        
        results.extend(batch_results)
    
    return results

async def enrichment_demo():
    """Demonstrate parallel data enrichment"""
    user_ids = list(range(1, 51))  # 50 users
    
    start_time = asyncio.get_event_loop().time()
    enriched_users = await batch_enrich_users(user_ids, batch_size=10)
    elapsed = asyncio.get_event_loop().time() - start_time
    
    print(f"Enriched {len(enriched_users)} users in {elapsed:.2f}s")

asyncio.run(enrichment_demo())

Data Validation Pipeline

Validate and clean data asynchronously:

import re

async def validate_email(email: str) -> bool:
    """Validate email format"""
    await asyncio.sleep(0.01)  # Simulate validation work
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))

async def validate_phone(phone: str) -> bool:
    """Validate phone number"""
    await asyncio.sleep(0.01)  # Simulate validation work
    cleaned = re.sub(r'[^\d]', '', phone)
    return 10 <= len(cleaned) <= 15

async def validate_age(age) -> bool:
    """Validate age"""
    await asyncio.sleep(0.005)  # Simulate validation work
    try:
        age_int = int(age)
        return 0 <= age_int <= 150
    except (ValueError, TypeError):
        return False

async def validate_record(record: dict) -> dict:
    """Validate a single record"""
    # Run all validations concurrently
    email_valid, phone_valid, age_valid = await asyncio.gather(
        validate_email(record.get("email", "")),
        validate_phone(record.get("phone", "")),
        validate_age(record.get("age")),
        return_exceptions=True
    )
    
    validation_results = {
        "email_valid": email_valid if isinstance(email_valid, bool) else False,
        "phone_valid": phone_valid if isinstance(phone_valid, bool) else False,
        "age_valid": age_valid if isinstance(age_valid, bool) else False
    }
    
    # Overall validity
    validation_results["is_valid"] = all(validation_results.values())
    
    return {
        **record,
        "validation": validation_results
    }

Batch Validation Processing

Process validation efficiently:

async def validation_pipeline(records):
    """Validate multiple records"""
    batch_size = 50
    validated_records = []
    
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        
        # Validate batch concurrently
        batch_tasks = [validate_record(record) for record in batch]
        batch_results = await asyncio.gather(*batch_tasks)
        
        validated_records.extend(batch_results)
        
        print(f"Validated batch {i//batch_size + 1}: {len(batch)} records")
    
    return validated_records

async def validation_demo():
    """Demonstrate data validation pipeline"""
    # Sample data with various quality issues
    sample_records = [
        {"name": "John Doe", "email": "[email protected]", "phone": "555-1234", "age": 30},
        {"name": "Jane Smith", "email": "invalid-email", "phone": "555-5678", "age": 25},
        {"name": "Bob Johnson", "email": "[email protected]", "phone": "not-a-phone", "age": "thirty"},
    ] * 50  # Multiply to simulate larger dataset
    
    start_time = asyncio.get_event_loop().time()
    validated = await validation_pipeline(sample_records)
    elapsed = asyncio.get_event_loop().time() - start_time
    
    # Analyze results
    valid_count = len([r for r in validated if r["validation"]["is_valid"]])
    invalid_count = len(validated) - valid_count
    
    print(f"\nValidation Results:")
    print(f"Total records: {len(validated)}")
    print(f"Valid: {valid_count}")
    print(f"Invalid: {invalid_count}")
    print(f"Processing time: {elapsed:.2f}s")

asyncio.run(validation_demo())

Memory-Efficient Processing

Process large datasets without loading everything into memory:

async def memory_efficient_processor(data_size: int):
    """Process large dataset without loading it all into memory"""
    
    async def process_chunk(chunk_data):
        """Process a chunk of data"""
        processed_rows = []
        
        for row in chunk_data:
            # Simulate processing each row
            await asyncio.sleep(0.001)
            
            processed_row = {
                "id": row.get("id"),
                "processed_value": float(row.get("value", 0)) * 1.1,
                "status": "processed"
            }
            processed_rows.append(processed_row)
        
        return processed_rows
    
    # Process in chunks
    total_processed = 0
    chunk_size = 100
    
    for chunk_start in range(0, data_size, chunk_size):
        chunk_data = [
            {"id": i, "value": i * 0.5}
            for i in range(chunk_start, min(chunk_start + chunk_size, data_size))
        ]
        
        # Process chunk
        processed_chunk = await process_chunk(chunk_data)
        total_processed += len(processed_chunk)
        
        if total_processed % 1000 == 0:
            print(f"Processed {total_processed} items...")
    
    print(f"Processed {total_processed} items total")

asyncio.run(memory_efficient_processor(5000))

Summary

Advanced data processing patterns enable robust, scalable data handling:

Key Patterns

  • Parallel Enrichment: Fetch additional data from multiple sources concurrently
  • Batch Processing: Process data in manageable chunks
  • Validation Pipelines: Validate data using concurrent validation functions
  • Memory-Efficient Processing: Handle large datasets without memory issues

Performance Benefits

  • Concurrent Operations: Multiple data sources accessed simultaneously
  • Batch Processing: Optimal memory usage and throughput
  • Resilient Processing: Graceful handling of partial failures
  • Scalable Architecture: Handle growing data volumes efficiently

Best Practices

  • Use batching to control memory usage and concurrency
  • Implement proper error handling and retry logic
  • Monitor processing rates and resource usage
  • Design for partial failures and graceful degradation

In Part 12, we’ll explore caching and connection pooling for high-performance applications.