Data Processing Pipelines
Async programming excels at data processing tasks that involve I/O operations. Let’s build efficient data processing systems that can handle large volumes of data.
Why Async for Data Processing
Traditional data processing is often I/O bound:
- Reading files: CPU waits for disk operations
- API calls: CPU waits for network responses
- Database queries: CPU waits for query results
With async processing, while one operation waits for I/O, others can continue processing. This dramatically improves throughput.
Processing Multiple Files Concurrently
Instead of processing files one by one, process them simultaneously:
import asyncio
import aiofiles
from pathlib import Path
import json
async def process_single_file(file_path: Path):
"""Process a single JSON file"""
try:
async with aiofiles.open(file_path, 'r') as file:
content = await file.read()
data = json.loads(content)
record_count = len(data) if isinstance(data, list) else 1
return {
"file": file_path.name,
"records": record_count,
"status": "success"
}
except Exception as e:
return {
"file": file_path.name,
"error": str(e),
"status": "failed"
}
The key insight here is using aiofiles
for non-blocking file operations. Regular file operations would block the event loop, preventing other files from being processed concurrently.
Now we need a way to process files in controlled batches to avoid overwhelming the system:
async def process_files_batch(file_paths, batch_size=10):
"""Process files in batches to control memory usage"""
results = []
for i in range(0, len(file_paths), batch_size):
batch = file_paths[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {len(batch)} files")
# Process batch concurrently
batch_tasks = [process_single_file(path) for path in batch]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
# Optional: brief pause between batches
await asyncio.sleep(0.1)
return results
Batching prevents memory exhaustion when processing thousands of files. Each batch runs concurrently, but we limit how many files are processed simultaneously.
Here’s how to put it all together:
async def file_processing_demo():
"""Demonstrate concurrent file processing"""
# Simulate file paths
file_paths = [Path(f"data/file_{i}.json") for i in range(50)]
start_time = asyncio.get_event_loop().time()
results = await process_files_batch(file_paths, batch_size=10)
elapsed = asyncio.get_event_loop().time() - start_time
successful = len([r for r in results if r["status"] == "success"])
failed = len([r for r in results if r["status"] == "failed"])
print(f"Processed {len(results)} files in {elapsed:.2f}s")
print(f"Successful: {successful}")
print(f"Failed: {failed}")
asyncio.run(file_processing_demo())
ETL Pipeline with Async Generators
Build Extract-Transform-Load pipelines using async generators:
import aiohttp
async def extract_api_data(api_urls):
"""Extract data from multiple APIs"""
async with aiohttp.ClientSession() as session:
for url in api_urls:
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
data = await response.json()
yield data
else:
print(f"Failed to fetch {url}: {response.status}")
except Exception as e:
print(f"Error fetching {url}: {e}")
The extraction phase uses async generators to yield data as it becomes available. This allows the transformation phase to start processing while extraction continues, creating a streaming pipeline.
Next, we transform the raw data:
async def transform_data(data_stream):
"""Transform raw data"""
async for raw_data in data_stream:
# Clean and transform data
transformed = {
"id": raw_data.get("id"),
"name": raw_data.get("name", "").strip().title(),
"email": raw_data.get("email", "").lower(),
"processed_at": asyncio.get_event_loop().time()
}
# Skip invalid records
if transformed["id"] and transformed["email"]:
yield transformed
The transformation happens record-by-record as data flows through. Invalid records are filtered out immediately rather than being processed further.
Finally, we load the data in batches for efficiency:
async def load_to_database(transformed_stream):
"""Load transformed data to database (simulated)"""
batch = []
batch_size = 100
async for record in transformed_stream:
batch.append(record)
if len(batch) >= batch_size:
await insert_batch(batch)
batch.clear()
# Insert remaining records
if batch:
await insert_batch(batch)
async def insert_batch(records):
"""Insert batch of records to database (simulated)"""
# Simulate database insert
await asyncio.sleep(0.1)
print(f"Inserted batch of {len(records)} records")
Batching reduces database overhead by grouping multiple records into single operations. The pipeline processes data continuously without waiting for all extraction to complete.
Here’s how to wire the pipeline together:
# Load to database
await load_to_database(transformed_data)
print("ETL pipeline completed successfully")
asyncio.run(etl_pipeline())
Real-time Data Processing
Process streaming data in real-time:
import random
from datetime import datetime
async def data_stream_simulator():
"""Simulate real-time data stream"""
while True:
# Generate sample event
event = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": random.randint(1, 1000),
"event_type": random.choice(["login", "logout", "purchase", "view"]),
"value": random.uniform(0, 100)
}
yield event
await asyncio.sleep(0.1) # 10 events per second
async def event_aggregator(event_stream, window_seconds=5):
"""Aggregate events in time windows"""
window_data = {}
last_flush = asyncio.get_event_loop().time()
async for event in event_stream:
current_time = asyncio.get_event_loop().time()
# Add event to current window
event_type = event["event_type"]
if event_type not in window_data:
window_data[event_type] = {"count": 0, "total_value": 0}
window_data[event_type]["count"] += 1
window_data[event_type]["total_value"] += event["value"]
# Flush window if time elapsed
if current_time - last_flush >= window_seconds:
if window_data:
yield {
"window_start": last_flush,
"window_end": current_time,
"aggregates": window_data.copy()
}
window_data.clear()
last_flush = current_time
async def process_aggregates(aggregate_stream):
"""Process aggregated data"""
async for window in aggregate_stream:
print(f"Window {window['window_start']:.1f}-{window['window_end']:.1f}:")
for event_type, stats in window["aggregates"].items():
avg_value = stats["total_value"] / stats["count"]
print(f" {event_type}: {stats['count']} events, avg value: {avg_value:.1f}")
async def real_time_processing():
"""Real-time data processing pipeline"""
# Create processing pipeline
events = data_stream_simulator()
aggregates = event_aggregator(events, window_seconds=3)
# Process aggregates (this will run indefinitely)
try:
await process_aggregates(aggregates)
except KeyboardInterrupt:
print("Stopping real-time processing...")
# Run for a limited time in demo
async def demo_real_time():
task = asyncio.create_task(real_time_processing())
# Run for 15 seconds
await asyncio.sleep(15)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Demo completed")
asyncio.run(demo_real_time())
Summary
Async data processing enables efficient handling of I/O-bound operations:
Key Patterns
- Concurrent File Processing: Process multiple files simultaneously
- ETL Pipelines: Extract, transform, and load data using async generators
- Real-time Processing: Handle streaming data with time-windowed aggregation
- Batch Processing: Control memory usage with batching
Performance Benefits
- Higher Throughput: Process more data in less time
- Better Resource Utilization: CPU stays busy while waiting for I/O
- Scalability: Handle larger datasets with the same resources
- Responsiveness: System remains responsive during processing
Best Practices
- Process data in batches to control memory usage
- Use async generators for streaming data
- Implement proper error handling for partial failures
- Monitor memory usage and processing rates
- Use connection pooling for database operations
In Part 11, we’ll explore data enrichment and validation patterns for complex processing scenarios.