Data Enrichment and Validation
Complex data processing often requires enriching data from multiple sources and validating it asynchronously. Let’s explore patterns for parallel data enrichment and validation.
Parallel Data Enrichment
Enrich data by calling multiple services concurrently:
import asyncio
async def enrich_user_data(user_id: int):
"""Enrich user data from multiple sources"""
async def get_profile(user_id):
await asyncio.sleep(0.2) # Simulate API call
return {"name": f"User {user_id}", "age": 25 + (user_id % 40)}
async def get_preferences(user_id):
await asyncio.sleep(0.3) # Simulate API call
return {"theme": "dark", "notifications": True}
async def get_activity(user_id):
await asyncio.sleep(0.1) # Simulate API call
return {"last_login": "2024-01-01", "login_count": user_id * 10}
These three functions simulate calling different microservices or APIs. In a real application, these might be calls to user service, preferences service, and analytics service.
The key is to fetch all data concurrently rather than sequentially:
# Fetch all data concurrently
profile, preferences, activity = await asyncio.gather(
get_profile(user_id),
get_preferences(user_id),
get_activity(user_id),
return_exceptions=True
)
Using return_exceptions=True
ensures that if one service fails, the others can still succeed. This makes the enrichment process resilient to partial failures.
Finally, we combine the results safely:
# Combine results, handling failures gracefully
enriched_data = {"user_id": user_id}
if isinstance(profile, dict):
enriched_data.update(profile)
if isinstance(preferences, dict):
enriched_data["preferences"] = preferences
if isinstance(activity, dict):
enriched_data["activity"] = activity
return enriched_data
This pattern fetches data from multiple sources simultaneously, significantly reducing total processing time.
Batch Enrichment Processing
Process multiple users efficiently:
async def batch_enrich_users(user_ids, batch_size=20):
"""Enrich multiple users in batches"""
results = []
for i in range(0, len(user_ids), batch_size):
batch = user_ids[i:i + batch_size]
print(f"Enriching batch {i//batch_size + 1}: {len(batch)} users")
# Enrich batch concurrently
batch_tasks = [enrich_user_data(uid) for uid in batch]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
return results
async def enrichment_demo():
"""Demonstrate parallel data enrichment"""
user_ids = list(range(1, 51)) # 50 users
start_time = asyncio.get_event_loop().time()
enriched_users = await batch_enrich_users(user_ids, batch_size=10)
elapsed = asyncio.get_event_loop().time() - start_time
print(f"Enriched {len(enriched_users)} users in {elapsed:.2f}s")
asyncio.run(enrichment_demo())
Data Validation Pipeline
Validate and clean data asynchronously:
import re
async def validate_email(email: str) -> bool:
"""Validate email format"""
await asyncio.sleep(0.01) # Simulate validation work
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
async def validate_phone(phone: str) -> bool:
"""Validate phone number"""
await asyncio.sleep(0.01) # Simulate validation work
cleaned = re.sub(r'[^\d]', '', phone)
return 10 <= len(cleaned) <= 15
async def validate_age(age) -> bool:
"""Validate age"""
await asyncio.sleep(0.005) # Simulate validation work
try:
age_int = int(age)
return 0 <= age_int <= 150
except (ValueError, TypeError):
return False
async def validate_record(record: dict) -> dict:
"""Validate a single record"""
# Run all validations concurrently
email_valid, phone_valid, age_valid = await asyncio.gather(
validate_email(record.get("email", "")),
validate_phone(record.get("phone", "")),
validate_age(record.get("age")),
return_exceptions=True
)
validation_results = {
"email_valid": email_valid if isinstance(email_valid, bool) else False,
"phone_valid": phone_valid if isinstance(phone_valid, bool) else False,
"age_valid": age_valid if isinstance(age_valid, bool) else False
}
# Overall validity
validation_results["is_valid"] = all(validation_results.values())
return {
**record,
"validation": validation_results
}
Batch Validation Processing
Process validation efficiently:
async def validation_pipeline(records):
"""Validate multiple records"""
batch_size = 50
validated_records = []
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
# Validate batch concurrently
batch_tasks = [validate_record(record) for record in batch]
batch_results = await asyncio.gather(*batch_tasks)
validated_records.extend(batch_results)
print(f"Validated batch {i//batch_size + 1}: {len(batch)} records")
return validated_records
async def validation_demo():
"""Demonstrate data validation pipeline"""
# Sample data with various quality issues
sample_records = [
{"name": "John Doe", "email": "[email protected]", "phone": "555-1234", "age": 30},
{"name": "Jane Smith", "email": "invalid-email", "phone": "555-5678", "age": 25},
{"name": "Bob Johnson", "email": "[email protected]", "phone": "not-a-phone", "age": "thirty"},
] * 50 # Multiply to simulate larger dataset
start_time = asyncio.get_event_loop().time()
validated = await validation_pipeline(sample_records)
elapsed = asyncio.get_event_loop().time() - start_time
# Analyze results
valid_count = len([r for r in validated if r["validation"]["is_valid"]])
invalid_count = len(validated) - valid_count
print(f"\nValidation Results:")
print(f"Total records: {len(validated)}")
print(f"Valid: {valid_count}")
print(f"Invalid: {invalid_count}")
print(f"Processing time: {elapsed:.2f}s")
asyncio.run(validation_demo())
Memory-Efficient Processing
Process large datasets without loading everything into memory:
async def memory_efficient_processor(data_size: int):
"""Process large dataset without loading it all into memory"""
async def process_chunk(chunk_data):
"""Process a chunk of data"""
processed_rows = []
for row in chunk_data:
# Simulate processing each row
await asyncio.sleep(0.001)
processed_row = {
"id": row.get("id"),
"processed_value": float(row.get("value", 0)) * 1.1,
"status": "processed"
}
processed_rows.append(processed_row)
return processed_rows
# Process in chunks
total_processed = 0
chunk_size = 100
for chunk_start in range(0, data_size, chunk_size):
chunk_data = [
{"id": i, "value": i * 0.5}
for i in range(chunk_start, min(chunk_start + chunk_size, data_size))
]
# Process chunk
processed_chunk = await process_chunk(chunk_data)
total_processed += len(processed_chunk)
if total_processed % 1000 == 0:
print(f"Processed {total_processed} items...")
print(f"Processed {total_processed} items total")
asyncio.run(memory_efficient_processor(5000))
Summary
Advanced data processing patterns enable robust, scalable data handling:
Key Patterns
- Parallel Enrichment: Fetch additional data from multiple sources concurrently
- Batch Processing: Process data in manageable chunks
- Validation Pipelines: Validate data using concurrent validation functions
- Memory-Efficient Processing: Handle large datasets without memory issues
Performance Benefits
- Concurrent Operations: Multiple data sources accessed simultaneously
- Batch Processing: Optimal memory usage and throughput
- Resilient Processing: Graceful handling of partial failures
- Scalable Architecture: Handle growing data volumes efficiently
Best Practices
- Use batching to control memory usage and concurrency
- Implement proper error handling and retry logic
- Monitor processing rates and resource usage
- Design for partial failures and graceful degradation
In Part 12, we’ll explore caching and connection pooling for high-performance applications.