Async Generators and Streams
Async generators combine the power of generators with async programming, perfect for processing streams of data without loading everything into memory at once.
Understanding Async Generators
An async generator uses async def
and yield
to produce values asynchronously:
import asyncio
async def number_generator(max_num):
"""Generate numbers asynchronously"""
for i in range(max_num):
print(f"Generating {i}")
await asyncio.sleep(0.1) # Simulate async work
yield i
The generator yields values one at a time, with async operations between yields. This allows other code to run while the generator is working.
Consume the async generator with async for
:
async def consume_numbers():
"""Consume numbers from async generator"""
async for number in number_generator(5):
print(f"Received: {number}")
await asyncio.sleep(0.05) # Process the number
asyncio.run(consume_numbers())
Why this is powerful: The generator produces numbers one at a time, and the consumer processes them as they arrive. Neither blocks the other.
Data Processing Pipeline
Async generators excel at building data processing pipelines:
async def fetch_user_data(user_ids):
"""Simulate fetching user data from an API"""
for user_id in user_ids:
await asyncio.sleep(0.1) # Simulate API call
yield {
"id": user_id,
"name": f"User {user_id}",
"email": f"user{user_id}@example.com"
}
This generator fetches user data one user at a time, yielding each result as it becomes available.
Add a transformation stage to the pipeline:
async def enrich_user_data(user_stream):
"""Add additional data to each user"""
async for user in user_stream:
await asyncio.sleep(0.05) # Simulate enrichment
user["profile_complete"] = len(user["name"]) > 5
user["domain"] = user["email"].split("@")[1]
yield user
Each stage processes data as it flows through, without waiting for all data to be available.
Wire the pipeline together:
async def process_users():
"""Complete user processing pipeline"""
user_ids = [1, 2, 3, 4, 5]
# Create the pipeline
raw_users = fetch_user_data(user_ids)
enriched_users = enrich_user_data(raw_users)
# Process the pipeline
saved_count = 0
async for user in enriched_users:
print(f"Saved: {user['name']} ({user['email']})")
saved_count += 1
print(f"Pipeline complete: {saved_count} users processed")
asyncio.run(process_users())
Pipeline benefits: Each stage processes data as it becomes available. Memory usage stays constant regardless of data size.
Stream Transformation
Transform data streams with various operations:
async def map_stream(source_stream, transform_func):
"""Apply transformation to each item in stream"""
async for item in source_stream:
yield transform_func(item)
async def filter_stream(source_stream, predicate):
"""Filter stream items based on predicate"""
async for item in source_stream:
if predicate(item):
yield item
async def take_stream(source_stream, count):
"""Take only first N items from stream"""
taken = 0
async for item in source_stream:
if taken >= count:
break
yield item
taken += 1
async def number_stream():
"""Generate numbers"""
for i in range(20):
await asyncio.sleep(0.05)
yield i
async def stream_operations():
"""Demonstrate stream transformations"""
# Create pipeline: numbers -> squares -> evens -> first 5
numbers = number_stream()
squares = map_stream(numbers, lambda x: x * x)
evens = filter_stream(squares, lambda x: x % 2 == 0)
first_five = take_stream(evens, 5)
results = []
async for result in first_five:
results.append(result)
print(f"Result: {result}")
print(f"Final results: {results}")
asyncio.run(stream_operations())
Buffered Streams
Control memory usage with buffered async generators:
async def buffered_generator(source_stream, buffer_size=3):
"""Buffer items from source stream"""
buffer = []
async for item in source_stream:
buffer.append(item)
if len(buffer) >= buffer_size:
yield buffer.copy()
buffer.clear()
# Yield remaining items
if buffer:
yield buffer
async def data_source():
"""Generate data items"""
for i in range(10):
await asyncio.sleep(0.1)
yield f"item-{i}"
async def process_buffer(buffer):
"""Process a buffer of items"""
print(f"Processing buffer of {len(buffer)} items: {buffer}")
await asyncio.sleep(0.2) # Simulate batch processing
async def buffered_processing():
"""Process data in buffers"""
source = data_source()
buffered = buffered_generator(source, buffer_size=3)
async for buffer in buffered:
await process_buffer(buffer)
asyncio.run(buffered_processing())
Error Handling in Streams
Handle errors gracefully in async generators:
async def unreliable_data_source():
"""Data source that occasionally fails"""
for i in range(10):
await asyncio.sleep(0.1)
if i == 5: # Simulate error
raise Exception(f"Data source error at item {i}")
yield f"data-{i}"
async def resilient_stream(source_stream):
"""Make stream resilient to errors"""
try:
async for item in source_stream:
yield item
except Exception as e:
print(f"Stream error: {e}")
# Could implement retry logic here
async def error_handling_demo():
"""Demonstrate error handling in streams"""
try:
source = unreliable_data_source()
resilient = resilient_stream(source)
async for item in resilient:
print(f"Processed: {item}")
except Exception as e:
print(f"Stream processing failed: {e}")
asyncio.run(error_handling_demo())
Summary
Async generators enable powerful streaming patterns:
Key Benefits
- Memory Efficient: Process data without loading everything into memory
- Concurrent: Pipeline stages run concurrently
- Composable: Chain generators to build complex pipelines
- Real-time: Handle continuous data streams
Common Patterns
- Data processing pipelines
- Stream transformation operations
- Buffered processing for memory control
- Error handling in data streams
Best Practices
- Use pipelines for data transformation
- Control memory with buffering
- Chain operations for complex processing
- Handle errors gracefully in streams
In Part 7, we’ll explore error handling patterns for robust async applications.