Data Processing Pipelines

Async programming excels at data processing tasks that involve I/O operations. Let’s build efficient data processing systems that can handle large volumes of data.

Why Async for Data Processing

Traditional data processing is often I/O bound:

  • Reading files: CPU waits for disk operations
  • API calls: CPU waits for network responses
  • Database queries: CPU waits for query results

With async processing, while one operation waits for I/O, others can continue processing. This dramatically improves throughput.

Processing Multiple Files Concurrently

Instead of processing files one by one, process them simultaneously:

import asyncio
import aiofiles
from pathlib import Path
import json

async def process_single_file(file_path: Path):
    """Process a single JSON file"""
    try:
        async with aiofiles.open(file_path, 'r') as file:
            content = await file.read()
            data = json.loads(content)
        
        record_count = len(data) if isinstance(data, list) else 1
        
        return {
            "file": file_path.name,
            "records": record_count,
            "status": "success"
        }
    
    except Exception as e:
        return {
            "file": file_path.name,
            "error": str(e),
            "status": "failed"
        }

The key insight here is using aiofiles for non-blocking file operations. Regular file operations would block the event loop, preventing other files from being processed concurrently.

Now we need a way to process files in controlled batches to avoid overwhelming the system:

async def process_files_batch(file_paths, batch_size=10):
    """Process files in batches to control memory usage"""
    results = []
    
    for i in range(0, len(file_paths), batch_size):
        batch = file_paths[i:i + batch_size]
        
        print(f"Processing batch {i//batch_size + 1}: {len(batch)} files")
        
        # Process batch concurrently
        batch_tasks = [process_single_file(path) for path in batch]
        batch_results = await asyncio.gather(*batch_tasks)
        
        results.extend(batch_results)
        
        # Optional: brief pause between batches
        await asyncio.sleep(0.1)
    
    return results

Batching prevents memory exhaustion when processing thousands of files. Each batch runs concurrently, but we limit how many files are processed simultaneously.

Here’s how to put it all together:

async def file_processing_demo():
    """Demonstrate concurrent file processing"""
    # Simulate file paths
    file_paths = [Path(f"data/file_{i}.json") for i in range(50)]
    
    start_time = asyncio.get_event_loop().time()
    results = await process_files_batch(file_paths, batch_size=10)
    elapsed = asyncio.get_event_loop().time() - start_time
    
    successful = len([r for r in results if r["status"] == "success"])
    failed = len([r for r in results if r["status"] == "failed"])
    
    print(f"Processed {len(results)} files in {elapsed:.2f}s")
    print(f"Successful: {successful}")
    print(f"Failed: {failed}")

asyncio.run(file_processing_demo())

ETL Pipeline with Async Generators

Build Extract-Transform-Load pipelines using async generators:

import aiohttp

async def extract_api_data(api_urls):
    """Extract data from multiple APIs"""
    async with aiohttp.ClientSession() as session:
        for url in api_urls:
            try:
                async with session.get(url, timeout=10) as response:
                    if response.status == 200:
                        data = await response.json()
                        yield data
                    else:
                        print(f"Failed to fetch {url}: {response.status}")
            except Exception as e:
                print(f"Error fetching {url}: {e}")

The extraction phase uses async generators to yield data as it becomes available. This allows the transformation phase to start processing while extraction continues, creating a streaming pipeline.

Next, we transform the raw data:

async def transform_data(data_stream):
    """Transform raw data"""
    async for raw_data in data_stream:
        # Clean and transform data
        transformed = {
            "id": raw_data.get("id"),
            "name": raw_data.get("name", "").strip().title(),
            "email": raw_data.get("email", "").lower(),
            "processed_at": asyncio.get_event_loop().time()
        }
        
        # Skip invalid records
        if transformed["id"] and transformed["email"]:
            yield transformed

The transformation happens record-by-record as data flows through. Invalid records are filtered out immediately rather than being processed further.

Finally, we load the data in batches for efficiency:

async def load_to_database(transformed_stream):
    """Load transformed data to database (simulated)"""
    batch = []
    batch_size = 100
    
    async for record in transformed_stream:
        batch.append(record)
        
        if len(batch) >= batch_size:
            await insert_batch(batch)
            batch.clear()
    
    # Insert remaining records
    if batch:
        await insert_batch(batch)

async def insert_batch(records):
    """Insert batch of records to database (simulated)"""
    # Simulate database insert
    await asyncio.sleep(0.1)
    print(f"Inserted batch of {len(records)} records")

Batching reduces database overhead by grouping multiple records into single operations. The pipeline processes data continuously without waiting for all extraction to complete.

Here’s how to wire the pipeline together:

    # Load to database
    await load_to_database(transformed_data)
    
    print("ETL pipeline completed successfully")

asyncio.run(etl_pipeline())

Real-time Data Processing

Process streaming data in real-time:

import random
from datetime import datetime

async def data_stream_simulator():
    """Simulate real-time data stream"""
    while True:
        # Generate sample event
        event = {
            "timestamp": datetime.utcnow().isoformat(),
            "user_id": random.randint(1, 1000),
            "event_type": random.choice(["login", "logout", "purchase", "view"]),
            "value": random.uniform(0, 100)
        }
        
        yield event
        await asyncio.sleep(0.1)  # 10 events per second

async def event_aggregator(event_stream, window_seconds=5):
    """Aggregate events in time windows"""
    window_data = {}
    last_flush = asyncio.get_event_loop().time()
    
    async for event in event_stream:
        current_time = asyncio.get_event_loop().time()
        
        # Add event to current window
        event_type = event["event_type"]
        if event_type not in window_data:
            window_data[event_type] = {"count": 0, "total_value": 0}
        
        window_data[event_type]["count"] += 1
        window_data[event_type]["total_value"] += event["value"]
        
        # Flush window if time elapsed
        if current_time - last_flush >= window_seconds:
            if window_data:
                yield {
                    "window_start": last_flush,
                    "window_end": current_time,
                    "aggregates": window_data.copy()
                }
                
                window_data.clear()
                last_flush = current_time

async def process_aggregates(aggregate_stream):
    """Process aggregated data"""
    async for window in aggregate_stream:
        print(f"Window {window['window_start']:.1f}-{window['window_end']:.1f}:")
        for event_type, stats in window["aggregates"].items():
            avg_value = stats["total_value"] / stats["count"]
            print(f"  {event_type}: {stats['count']} events, avg value: {avg_value:.1f}")

async def real_time_processing():
    """Real-time data processing pipeline"""
    # Create processing pipeline
    events = data_stream_simulator()
    aggregates = event_aggregator(events, window_seconds=3)
    
    # Process aggregates (this will run indefinitely)
    try:
        await process_aggregates(aggregates)
    except KeyboardInterrupt:
        print("Stopping real-time processing...")

# Run for a limited time in demo
async def demo_real_time():
    task = asyncio.create_task(real_time_processing())
    
    # Run for 15 seconds
    await asyncio.sleep(15)
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("Demo completed")

asyncio.run(demo_real_time())

Summary

Async data processing enables efficient handling of I/O-bound operations:

Key Patterns

  • Concurrent File Processing: Process multiple files simultaneously
  • ETL Pipelines: Extract, transform, and load data using async generators
  • Real-time Processing: Handle streaming data with time-windowed aggregation
  • Batch Processing: Control memory usage with batching

Performance Benefits

  • Higher Throughput: Process more data in less time
  • Better Resource Utilization: CPU stays busy while waiting for I/O
  • Scalability: Handle larger datasets with the same resources
  • Responsiveness: System remains responsive during processing

Best Practices

  • Process data in batches to control memory usage
  • Use async generators for streaming data
  • Implement proper error handling for partial failures
  • Monitor memory usage and processing rates
  • Use connection pooling for database operations

In Part 11, we’ll explore data enrichment and validation patterns for complex processing scenarios.