Async Generators and Streams

Async generators combine the power of generators with async programming, perfect for processing streams of data without loading everything into memory at once.

Understanding Async Generators

An async generator uses async def and yield to produce values asynchronously:

import asyncio

async def number_generator(max_num):
    """Generate numbers asynchronously"""
    for i in range(max_num):
        print(f"Generating {i}")
        await asyncio.sleep(0.1)  # Simulate async work
        yield i

The generator yields values one at a time, with async operations between yields. This allows other code to run while the generator is working.

Consume the async generator with async for:

async def consume_numbers():
    """Consume numbers from async generator"""
    async for number in number_generator(5):
        print(f"Received: {number}")
        await asyncio.sleep(0.05)  # Process the number

asyncio.run(consume_numbers())

Why this is powerful: The generator produces numbers one at a time, and the consumer processes them as they arrive. Neither blocks the other.

Data Processing Pipeline

Async generators excel at building data processing pipelines:

async def fetch_user_data(user_ids):
    """Simulate fetching user data from an API"""
    for user_id in user_ids:
        await asyncio.sleep(0.1)  # Simulate API call
        yield {
            "id": user_id,
            "name": f"User {user_id}",
            "email": f"user{user_id}@example.com"
        }

This generator fetches user data one user at a time, yielding each result as it becomes available.

Add a transformation stage to the pipeline:

async def enrich_user_data(user_stream):
    """Add additional data to each user"""
    async for user in user_stream:
        await asyncio.sleep(0.05)  # Simulate enrichment
        
        user["profile_complete"] = len(user["name"]) > 5
        user["domain"] = user["email"].split("@")[1]
        yield user

Each stage processes data as it flows through, without waiting for all data to be available.

Wire the pipeline together:

async def process_users():
    """Complete user processing pipeline"""
    user_ids = [1, 2, 3, 4, 5]
    
    # Create the pipeline
    raw_users = fetch_user_data(user_ids)
    enriched_users = enrich_user_data(raw_users)
    
    # Process the pipeline
    saved_count = 0
    async for user in enriched_users:
        print(f"Saved: {user['name']} ({user['email']})")
        saved_count += 1
    
    print(f"Pipeline complete: {saved_count} users processed")

asyncio.run(process_users())

Pipeline benefits: Each stage processes data as it becomes available. Memory usage stays constant regardless of data size.

Stream Transformation

Transform data streams with various operations:

async def map_stream(source_stream, transform_func):
    """Apply transformation to each item in stream"""
    async for item in source_stream:
        yield transform_func(item)

async def filter_stream(source_stream, predicate):
    """Filter stream items based on predicate"""
    async for item in source_stream:
        if predicate(item):
            yield item

async def take_stream(source_stream, count):
    """Take only first N items from stream"""
    taken = 0
    async for item in source_stream:
        if taken >= count:
            break
        yield item
        taken += 1

async def number_stream():
    """Generate numbers"""
    for i in range(20):
        await asyncio.sleep(0.05)
        yield i

async def stream_operations():
    """Demonstrate stream transformations"""
    # Create pipeline: numbers -> squares -> evens -> first 5
    numbers = number_stream()
    squares = map_stream(numbers, lambda x: x * x)
    evens = filter_stream(squares, lambda x: x % 2 == 0)
    first_five = take_stream(evens, 5)
    
    results = []
    async for result in first_five:
        results.append(result)
        print(f"Result: {result}")
    
    print(f"Final results: {results}")

asyncio.run(stream_operations())

Buffered Streams

Control memory usage with buffered async generators:

async def buffered_generator(source_stream, buffer_size=3):
    """Buffer items from source stream"""
    buffer = []
    
    async for item in source_stream:
        buffer.append(item)
        
        if len(buffer) >= buffer_size:
            yield buffer.copy()
            buffer.clear()
    
    # Yield remaining items
    if buffer:
        yield buffer

async def data_source():
    """Generate data items"""
    for i in range(10):
        await asyncio.sleep(0.1)
        yield f"item-{i}"

async def process_buffer(buffer):
    """Process a buffer of items"""
    print(f"Processing buffer of {len(buffer)} items: {buffer}")
    await asyncio.sleep(0.2)  # Simulate batch processing

async def buffered_processing():
    """Process data in buffers"""
    source = data_source()
    buffered = buffered_generator(source, buffer_size=3)
    
    async for buffer in buffered:
        await process_buffer(buffer)

asyncio.run(buffered_processing())

Error Handling in Streams

Handle errors gracefully in async generators:

async def unreliable_data_source():
    """Data source that occasionally fails"""
    for i in range(10):
        await asyncio.sleep(0.1)
        
        if i == 5:  # Simulate error
            raise Exception(f"Data source error at item {i}")
        
        yield f"data-{i}"

async def resilient_stream(source_stream):
    """Make stream resilient to errors"""
    try:
        async for item in source_stream:
            yield item
    except Exception as e:
        print(f"Stream error: {e}")
        # Could implement retry logic here

async def error_handling_demo():
    """Demonstrate error handling in streams"""
    try:
        source = unreliable_data_source()
        resilient = resilient_stream(source)
        
        async for item in resilient:
            print(f"Processed: {item}")
            
    except Exception as e:
        print(f"Stream processing failed: {e}")

asyncio.run(error_handling_demo())

Summary

Async generators enable powerful streaming patterns:

Key Benefits

  • Memory Efficient: Process data without loading everything into memory
  • Concurrent: Pipeline stages run concurrently
  • Composable: Chain generators to build complex pipelines
  • Real-time: Handle continuous data streams

Common Patterns

  • Data processing pipelines
  • Stream transformation operations
  • Buffered processing for memory control
  • Error handling in data streams

Best Practices

  • Use pipelines for data transformation
  • Control memory with buffering
  • Chain operations for complex processing
  • Handle errors gracefully in streams

In Part 7, we’ll explore error handling patterns for robust async applications.