Asynchronous Programming in Python: Building High-Performance Applications
Master Python’s asynchronous programming capabilities to build fast, scalable applications that handle thousands of concurrent operations efficiently.
Why Async Programming Matters
Asynchronous programming can seem like magic when you see a single Python process handle 10,000 concurrent connections. How can one thread outperform traditional multi-threaded servers? The answer lies in efficiency and smart resource management.
Think of a restaurant where waiters take one order, go to the kitchen, wait for the food to cook, bring it back, then serve the next customer. If your meal takes 20 minutes, other customers wait 20 minutes just to place their order.
Now imagine waiters who take your order, submit it to the kitchen, then immediately serve other customers while your food cooks. When your meal is ready, they bring it to you. The same waiter serves many customers simultaneously.
This is exactly how async programming works in Python - and why it can be so powerful.
The Problem with Blocking Code
Traditional Python code executes one line at a time, waiting for each operation to complete:
import requests
import time
def fetch_three_apis():
start = time.time()
requests.get('https://api1.example.com/data') # Wait 1 second
requests.get('https://api2.example.com/data') # Wait 1 second
requests.get('https://api3.example.com/data') # Wait 1 second
print(f"Total time: {time.time() - start:.1f} seconds") # ~3 seconds
The problem: Your program waits 3 seconds total, even though your CPU sits idle waiting for network responses.
The Async Solution
With async programming, you start all three requests simultaneously:
import asyncio
import aiohttp
async def fetch_three_apis_async():
async with aiohttp.ClientSession() as session:
tasks = [
session.get('https://api1.example.com/data'),
session.get('https://api2.example.com/data'),
session.get('https://api3.example.com/data')
]
responses = await asyncio.gather(*tasks)
# Total time: ~1 second (concurrent requests)
The benefit: All three requests run concurrently, completing in roughly the time of the slowest request instead of the sum of all requests.
When Async Programming Helps
Async programming excels when your code spends time waiting for:
- Network requests (APIs, web scraping)
- Database queries
- File operations (reading/writing large files)
- User input or external events
It’s not helpful for CPU-intensive tasks like mathematical calculations, where the CPU is already busy.
Real-World Impact
Consider a web server handling user requests:
-
Synchronous server: Handles one request at a time. If each request takes 100ms (including database queries), you can handle 10 requests per second.
-
Async server: While one request waits for the database, it processes other requests. The same server can handle 100+ requests per second.
This 10x improvement comes from better resource utilization, not faster hardware.
Common Use Cases
Web Scraping: Instead of scraping websites one by one, async lets you scrape dozens simultaneously:
# Synchronous: 10 websites × 2 seconds each = 20 seconds
# Async: 10 websites concurrently = ~2 seconds
API Integration: Modern applications often call multiple APIs:
# Get user data, preferences, and recent activity simultaneously
user_data, preferences, activity = await asyncio.gather(
fetch_user(user_id),
fetch_preferences(user_id),
fetch_recent_activity(user_id)
)
File Processing: Process multiple files while others are being read:
# Process 100 log files concurrently instead of sequentially
# Reduces processing time from hours to minutes
Database Operations: Execute multiple queries concurrently:
# Fetch related data in parallel instead of sequential queries
# Reduces page load time from 500ms to 100ms
Performance Comparison
Here’s what async programming can achieve:
Operation Type | Synchronous | Async | Improvement |
---|---|---|---|
API calls (10 requests) | 10 seconds | 1 second | 10x faster |
Database queries (5 queries) | 250ms | 50ms | 5x faster |
File processing (100 files) | 2 hours | 20 minutes | 6x faster |
Web scraping (50 pages) | 100 seconds | 15 seconds | 7x faster |
Why Now?
Async programming has become essential because:
- Microservices: Applications make dozens of API calls
- Real-time features: Users expect instant responses
- Cloud costs: Better resource utilization saves money
- Mobile apps: Faster responses improve user experience
- IoT devices: Handle thousands of sensor readings simultaneously
Key Concepts Preview
In the following parts, we’ll explore:
- Event Loop: The engine that manages async operations
- Coroutines: Functions that can pause and resume
- Tasks: Concurrent operations that run together
- Await: How to wait for async operations to complete
What You’ll Build
By the end of this guide, you’ll understand how to build:
- Web APIs that handle thousands of concurrent requests
- Data processing pipelines that work on multiple files simultaneously
- Microservices that communicate efficiently
- Production systems with proper monitoring and error handling
Next Steps
In Part 2, we’ll set up your development environment and write your first async program, exploring how the event loop coordinates multiple operations.
Key insight to remember: Async programming isn’t about making individual operations faster - it’s about doing more operations at the same time.
Environment Setup
Setting up your development environment for async programming requires a few specific tools and libraries.
Create a Virtual Environment
Always use a virtual environment for async projects:
python -m venv async_env
source async_env/bin/activate # Windows: async_env\Scripts\activate
Install Essential Packages
Install the core packages you’ll need:
pip install aiohttp aiofiles pytest-asyncio
pip freeze > requirements.txt
Why these packages:
aiohttp
- Replacesrequests
for async HTTP callsaiofiles
- Handles file I/O without blockingpytest-asyncio
- Lets you test async functions
Project Structure
Organize your async project for growth:
my_async_project/
├── src/
│ ├── main.py # Application entry point
│ ├── http_client.py # HTTP operations
│ └── config.py # Configuration
├── tests/
│ └── test_main.py # Tests
└── requirements.txt
Basic Configuration
Create a simple configuration system:
# src/config.py
import os
from dataclasses import dataclass
@dataclass
class Config:
timeout: float = 30.0
max_connections: int = 100
debug: bool = os.getenv('DEBUG', 'False').lower() == 'true'
Simple HTTP Client
Create a reusable HTTP client:
# src/http_client.py
import aiohttp
class AsyncHttpClient:
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def get(self, url):
async with self.session.get(url) as response:
return await response.json()
Your First Async Program
Here’s a simple program that demonstrates the power of async programming:
# first_async.py
import asyncio
import time
async def make_coffee(name):
print(f"☕ {name}: Starting to make coffee")
await asyncio.sleep(3) # Coffee takes 3 seconds
print(f"☕ {name}: Coffee ready!")
return f"{name}'s coffee"
async def make_toast(name):
print(f"🍞 {name}: Starting to make toast")
await asyncio.sleep(2) # Toast takes 2 seconds
print(f"🍞 {name}: Toast ready!")
return f"{name}'s toast"
async def make_breakfast():
start_time = time.time()
# Make coffee and toast simultaneously
coffee, toast = await asyncio.gather(
make_coffee("Alice"),
make_toast("Alice")
)
elapsed = time.time() - start_time
print(f"🍽️ Breakfast ready in {elapsed:.1f} seconds!")
asyncio.run(make_breakfast())
What happens when you run this:
- Coffee and toast start at the same time
- Toast finishes after 2 seconds
- Coffee finishes after 3 seconds
- Total time: ~3 seconds (not 5 seconds!)
Testing Your Async Code
Write tests to verify your async functions work correctly:
# tests/test_main.py
import pytest
import asyncio
@pytest.mark.asyncio
async def test_make_coffee():
result = await make_coffee("Test")
assert result == "Test's coffee"
@pytest.mark.asyncio
async def test_concurrent_operations():
start = time.time()
# Run 5 tasks concurrently
tasks = [asyncio.sleep(0.1) for _ in range(5)]
await asyncio.gather(*tasks)
elapsed = time.time() - start
assert elapsed < 0.2 # Should take ~0.1s, not ~0.5s
Run your tests:
pytest tests/
Common Beginner Mistakes
1. Forgetting await
# Wrong - returns a coroutine object
result = async_function()
# Correct - waits for completion
result = await async_function()
2. Using blocking operations
# Wrong - blocks the event loop
import time
time.sleep(1)
# Correct - yields to event loop
await asyncio.sleep(1)
3. Not using async context managers
# Wrong - might not close properly
session = aiohttp.ClientSession()
response = await session.get(url)
# Correct - always closes
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
Next Steps
Now that you have a working async environment, in Part 3 we’ll explore your first async program and see the event loop in action.
Your First Async Program
Let’s write your first async program and see the magic of concurrent execution in action.
Basic Async Function
Start with a simple async function:
import asyncio
async def say_hello(name, delay):
"""Async function that greets someone after a delay"""
print(f"Hello {name}, starting...")
await asyncio.sleep(delay) # Non-blocking sleep
print(f"Hello {name}, finished after {delay}s!")
return f"Greeted {name}"
# Run a single async function
async def main():
result = await say_hello("Alice", 2)
print(f"Result: {result}")
# Execute the async program
asyncio.run(main())
Key points:
async def
creates an async function (coroutine)await
pauses execution until the operation completesasyncio.run()
starts the event loop and runs the main coroutine
Concurrent Execution
Now let’s see the real power - running multiple operations simultaneously:
import asyncio
import time
async def fetch_data(source, delay):
"""Simulate fetching data from different sources"""
print(f"Fetching from {source}...")
await asyncio.sleep(delay) # Simulate network delay
print(f"Got data from {source}")
return f"Data from {source}"
async def sequential_example():
"""Sequential execution - slow"""
print("=== Sequential Execution ===")
start_time = time.time()
result1 = await fetch_data("Database", 2)
result2 = await fetch_data("API", 1.5)
result3 = await fetch_data("Cache", 0.5)
total_time = time.time() - start_time
print(f"Sequential total time: {total_time:.1f}s\n")
return [result1, result2, result3]
async def concurrent_example():
"""Concurrent execution - fast"""
print("=== Concurrent Execution ===")
start_time = time.time()
# Start all operations simultaneously
task1 = asyncio.create_task(fetch_data("Database", 2))
task2 = asyncio.create_task(fetch_data("API", 1.5))
task3 = asyncio.create_task(fetch_data("Cache", 0.5))
# Wait for all to complete
results = await asyncio.gather(task1, task2, task3)
total_time = time.time() - start_time
print(f"Concurrent total time: {total_time:.1f}s\n")
return results
async def main():
# Run both examples
await sequential_example()
await concurrent_example()
asyncio.run(main())
Output shows the difference:
- Sequential: ~4 seconds (2 + 1.5 + 0.5)
- Concurrent: ~2 seconds (longest operation determines total time)
Common Patterns
Pattern 1: Fire and Forget
async def background_task(name):
"""Task that runs in background"""
await asyncio.sleep(3)
print(f"Background task {name} completed")
async def fire_and_forget_example():
"""Start tasks without waiting for them"""
# Start background tasks
asyncio.create_task(background_task("Task-1"))
asyncio.create_task(background_task("Task-2"))
# Do other work immediately
print("Main work starting...")
await asyncio.sleep(1)
print("Main work finished")
# Wait a bit to see background tasks complete
await asyncio.sleep(3)
asyncio.run(fire_and_forget_example())
Pattern 2: Timeout Handling
async def slow_operation():
"""Operation that might take too long"""
await asyncio.sleep(5)
return "Slow result"
async def timeout_example():
"""Handle operations with timeout"""
try:
# Wait maximum 2 seconds
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
print(f"Got result: {result}")
except asyncio.TimeoutError:
print("Operation timed out!")
### Pattern 3: Error Handling in Concurrent Operations
```python
async def risky_operation(name, should_fail=False):
"""Operation that might fail"""
await asyncio.sleep(1)
if should_fail:
raise ValueError(f"Operation {name} failed!")
return f"Success from {name}"
async def error_handling_example():
"""Handle errors in concurrent operations"""
tasks = [
asyncio.create_task(risky_operation("Task-1", False)),
asyncio.create_task(risky_operation("Task-2", True)), # This will fail
asyncio.create_task(risky_operation("Task-3", False))
]
# Use return_exceptions=True to get both results and exceptions
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i+1} failed: {result}")
else:
print(f"Task {i+1} succeeded: {result}")
asyncio.run(error_handling_example())
Real-World Example: Web Scraper
Here’s a practical example that demonstrates async power:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""Fetch a single URL"""
try:
async with session.get(url) as response:
content = await response.text()
return f"Success {url}: {len(content)} characters"
except Exception as e:
return f"Failed {url}: Error - {str(e)}"
async def scrape_websites():
"""Scrape multiple websites concurrently"""
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/status/200",
"https://httpbin.org/json"
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start_time
print("Scraping Results:")
for result in results:
print(f" {result}")
print(f"\nTotal time: {total_time:.1f}s")
print(f"Average per URL: {total_time/len(urls):.1f}s")
# Run the scraper
asyncio.run(scrape_websites())
Common Mistakes and Solutions
Mistake 1: Forgetting await
# Wrong - creates coroutine but doesn't execute it
async def wrong_way():
result = fetch_data("API", 1) # Missing await!
print(result) # Prints: <coroutine object>
# Correct - actually executes the coroutine
async def right_way():
result = await fetch_data("API", 1) # With await
print(result) # Prints: "Data from API"
Mistake 2: Using time.sleep()
instead of asyncio.sleep()
# Wrong - blocks the entire event loop
async def blocking_sleep():
time.sleep(2) # Blocks everything!
# Correct - allows other tasks to run
async def non_blocking_sleep():
await asyncio.sleep(2) # Other tasks can run
Mistake 3: Not handling exceptions in concurrent tasks
# Wrong - one failure stops everything
async def fragile_approach():
results = await asyncio.gather(
risky_operation("A"),
risky_operation("B", should_fail=True), # This stops everything
risky_operation("C")
)
# Correct - handle exceptions gracefully
async def robust_approach():
results = await asyncio.gather(
risky_operation("A"),
risky_operation("B", should_fail=True),
risky_operation("C"),
return_exceptions=True # Continue despite failures
)
Debugging Async Code
Enable Debug Mode
# Add this to see detailed async debugging info
import asyncio
asyncio.run(main(), debug=True)
Check for Unawaited Coroutines
Python will warn you about coroutines that weren’t awaited:
RuntimeWarning: coroutine 'fetch_data' was never awaited
Use asyncio.current_task()
for debugging
async def debug_example():
current = asyncio.current_task()
print(f"Current task: {current.get_name()}")
all_tasks = asyncio.all_tasks()
print(f"Total running tasks: {len(all_tasks)}")
When Async Makes Sense (And When It Doesn’t)
After writing your first async programs, you might wonder when to use these patterns: Async shines when you’re waiting for things:
- Network requests (APIs, web scraping)
- Database queries
- File operations (reading/writing large files)
- User input or external events
Skip async for:
- CPU-intensive tasks (mathematical calculations, image processing)
- Simple scripts that do one thing at a time
- Legacy code where integration complexity outweighs benefits
The async functions you’ll use most:
asyncio.run()
- Start the event loopasyncio.create_task()
- Schedule coroutine for executionasyncio.gather()
- Wait for multiple operationsasyncio.wait_for()
- Add timeout to operationsawait
- Pause until operation completes
Next Steps
In Part 4, we’ll dive deeper into the event loop - the engine that makes all this concurrency possible. You’ll learn how it schedules tasks, handles I/O, and coordinates your async operations.
Understanding the Event Loop
The event loop is the heart of async programming. It’s a single-threaded loop that manages all your async operations, deciding when to run each task and when to wait for I/O operations.
What is the Event Loop?
Think of the event loop as a traffic controller at a busy intersection. It coordinates multiple lanes of traffic (your async tasks), ensuring everyone gets their turn without collisions.
import asyncio
async def task_a():
print("Task A: Starting")
await asyncio.sleep(1)
print("Task A: Finished")
async def task_b():
print("Task B: Starting")
await asyncio.sleep(0.5)
print("Task B: Finished")
# The event loop coordinates both tasks
asyncio.run(asyncio.gather(task_a(), task_b()))
Output:
Task A: Starting
Task B: Starting
Task B: Finished
Task A: Finished
Notice how Task B finishes first even though Task A started first. The event loop switches between tasks when they hit await
points.
Event Loop Lifecycle
The event loop follows a simple cycle:
- Check for ready tasks - Run any tasks that can continue
- Handle I/O events - Process completed network/file operations
- Schedule callbacks - Queue up tasks that are now ready
- Repeat until no more work remains
async def show_event_loop_work():
loop = asyncio.get_running_loop()
print(f"Loop is running: {loop.is_running()}")
# Schedule a callback
def callback():
print("Callback executed!")
loop.call_later(0.5, callback)
await asyncio.sleep(1) # Let the callback run
asyncio.run(show_event_loop_work())
Creating and Managing Tasks
Tasks are the event loop’s way of tracking coroutines:
async def background_work(name, duration):
print(f"{name}: Starting work")
await asyncio.sleep(duration)
print(f"{name}: Work complete")
return f"{name} result"
async def task_management_demo():
# Create tasks explicitly
task1 = asyncio.create_task(background_work("Worker-1", 2))
task2 = asyncio.create_task(background_work("Worker-2", 1))
print("Tasks created, doing other work...")
await asyncio.sleep(0.5)
# Check task status
print(f"Task1 done: {task1.done()}")
print(f"Task2 done: {task2.done()}")
# Wait for completion
results = await asyncio.gather(task1, task2)
print(f"Results: {results}")
asyncio.run(task_management_demo())
Handling Blocking Operations
The event loop can’t switch tasks during blocking operations. When you encounter CPU-intensive work or synchronous libraries, you need to move that work off the main thread:
import concurrent.futures
import time
def blocking_operation(duration):
"""Simulate CPU-intensive work"""
time.sleep(duration) # This blocks!
return f"Blocked for {duration} seconds"
async def handle_blocking_work():
loop = asyncio.get_running_loop()
# Right way - run in thread pool
with concurrent.futures.ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor,
blocking_operation,
2
)
print(f"Result: {result}")
asyncio.run(handle_blocking_work())
Event Loop Best Practices
Don’t Block the Loop
# Bad - blocks the event loop
async def bad_example():
time.sleep(1) # Blocks everything!
return "Bad"
# Good - uses async sleep
async def good_example():
await asyncio.sleep(1) # Allows other tasks to run
return "Good"
Handle Exceptions Properly
async def risky_task():
await asyncio.sleep(0.1)
raise ValueError("Something went wrong!")
async def exception_handling():
# For multiple tasks, use return_exceptions
tasks = [risky_task() for _ in range(3)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
Event Loop Debugging and Monitoring
Understanding what your event loop is doing helps debug performance issues:
import asyncio
import time
async def monitor_event_loop():
"""Monitor event loop performance"""
loop = asyncio.get_running_loop()
# Enable debug mode for detailed info
loop.set_debug(True)
# Check current tasks
all_tasks = asyncio.all_tasks(loop)
print(f"Currently running tasks: {len(all_tasks)}")
for task in all_tasks:
print(f" - {task.get_name()}: {task.get_coro()}")
# Monitor loop time
start_time = loop.time()
await asyncio.sleep(0.1)
elapsed = loop.time() - start_time
print(f"Loop time elapsed: {elapsed:.3f}s")
async def slow_task():
"""Task that might slow down the loop"""
print("Slow task starting...")
# Simulate work that yields control frequently
for i in range(5):
await asyncio.sleep(0.1) # Yield control
print(f" Slow task step {i+1}")
print("Slow task finished")
async def debugging_demo():
# Start monitoring
monitor_task = asyncio.create_task(monitor_event_loop())
slow_work = asyncio.create_task(slow_task())
await asyncio.gather(monitor_task, slow_work)
asyncio.run(debugging_demo())
Common Event Loop Pitfalls
Pitfall 1: Forgetting to Create Tasks
# Wrong - coroutines don't run until awaited
async def wrong_way():
background_work("Task-1", 2) # Just creates coroutine object
background_work("Task-2", 1) # Doesn't actually run
await asyncio.sleep(3)
# Right - create tasks to run concurrently
async def right_way():
task1 = asyncio.create_task(background_work("Task-1", 2))
task2 = asyncio.create_task(background_work("Task-2", 1))
await asyncio.sleep(3) # Tasks run in background
Pitfall 2: Blocking the Loop with Synchronous Code
import requests
# Wrong - blocks the entire event loop
async def blocking_http_request():
response = requests.get("https://httpbin.org/delay/2") # Blocks!
return response.json()
# Right - use async HTTP client
import aiohttp
async def non_blocking_http_request():
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/delay/2") as response:
return await response.json()
Pitfall 3: Not Handling Task Cancellation
async def cancellable_task():
"""Task that handles cancellation gracefully"""
try:
for i in range(10):
print(f"Working... step {i}")
await asyncio.sleep(0.5)
except asyncio.CancelledError:
print("Task was cancelled, cleaning up...")
# Perform cleanup here
raise # Re-raise to properly cancel
async def cancellation_demo():
task = asyncio.create_task(cancellable_task())
# Let it run for a bit
await asyncio.sleep(2)
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task cancellation handled")
asyncio.run(cancellation_demo())
Event Loop Performance Tips
Tip 1: Batch Operations
# Less efficient - many small operations
async def many_small_operations():
results = []
for i in range(100):
result = await small_async_operation(i)
results.append(result)
return results
# More efficient - batch operations
async def batched_operations():
tasks = [small_async_operation(i) for i in range(100)]
return await asyncio.gather(*tasks)
async def small_async_operation(n):
await asyncio.sleep(0.01)
return n * 2
Tip 2: Use Semaphores to Limit Concurrency
async def limited_concurrency_demo():
"""Limit concurrent operations to prevent overwhelming resources"""
semaphore = asyncio.Semaphore(5) # Max 5 concurrent operations
async def limited_operation(n):
async with semaphore:
print(f"Starting operation {n}")
await asyncio.sleep(1)
print(f"Finished operation {n}")
return n
# Start 20 operations, but only 5 run at once
tasks = [limited_operation(i) for i in range(20)]
results = await asyncio.gather(*tasks)
return results
asyncio.run(limited_concurrency_demo())
Summary
The event loop is your async program’s conductor:
Key Concepts
- Single-threaded: One loop manages all async operations
- Cooperative: Tasks voluntarily yield control at
await
points - Scheduling: Controls when tasks run and in what order
- I/O Management: Handles network and file operations efficiently
Best Practices
- Never block the event loop with synchronous operations
- Use
loop.run_in_executor()
for CPU-intensive work - Handle exceptions properly in concurrent tasks
- Create tasks with
asyncio.create_task()
In Part 5, we’ll dive into coroutines - the functions that make the event loop’s coordination possible.
Coroutines Fundamentals
Coroutines are functions that can pause and resume execution. They’re the building blocks of async programming, allowing the event loop to switch between tasks efficiently.
Understanding Coroutines
A coroutine is created with async def
and can pause execution at await
points:
import asyncio
async def simple_coroutine():
print("Coroutine started")
await asyncio.sleep(1) # Pause here
print("Coroutine resumed")
return "Done"
# Running a coroutine
result = asyncio.run(simple_coroutine())
print(result) # "Done"
Key insight: When the coroutine hits await asyncio.sleep(1)
, it pauses and lets other coroutines run. After 1 second, it resumes from exactly where it left off.
Coroutine States
Coroutines have different states during their lifecycle:
import inspect
async def example_coroutine():
await asyncio.sleep(1)
return "completed"
# Create but don't run the coroutine
coro = example_coroutine()
print(f"State: {inspect.getcoroutinestate(coro)}") # CORO_CREATED
# Must run to see other states
asyncio.run(coro)
The states are:
- CORO_CREATED: Just created, not started
- CORO_RUNNING: Currently executing
- CORO_SUSPENDED: Paused at an await point
- CORO_CLOSED: Finished or cancelled
Coroutine Communication
Coroutines can pass data through return values:
async def fetch_user(user_id):
await asyncio.sleep(0.5) # Simulate API call
return {
"id": user_id,
"name": f"User {user_id}",
"email": f"user{user_id}@example.com"
}
async def process_user(user_id):
user = await fetch_user(user_id)
user["processed"] = True
return user
async def main():
# Process multiple users concurrently
user_ids = [1, 2, 3]
tasks = [process_user(uid) for uid in user_ids]
users = await asyncio.gather(*tasks)
for user in users:
print(f"Processed: {user['name']}")
asyncio.run(main())
Using Queues for Communication
Queues enable producer-consumer patterns:
async def producer(queue, items):
"""Produce items and put them in queue"""
for item in items:
await queue.put(item)
print(f"📦 Produced: {item}")
await asyncio.sleep(0.1)
await queue.put(None) # Signal completion
async def consumer(queue, name):
"""Consume items from queue"""
while True:
item = await queue.get()
if item is None: # End signal
break
print(f"Processing {name}: {item}")
await asyncio.sleep(0.2) # Simulate processing
async def queue_demo():
queue = asyncio.Queue(maxsize=2) # Bounded queue
items = ["task-1", "task-2", "task-3"]
await asyncio.gather(
producer(queue, items),
consumer(queue, "Worker-A")
)
asyncio.run(queue_demo())
Async Context Managers
Combine coroutines with context managers for resource management:
class AsyncDatabase:
def __init__(self, name):
self.name = name
self.connected = False
async def __aenter__(self):
print(f"🔌 Connecting to {self.name}")
await asyncio.sleep(0.1) # Simulate connection time
self.connected = True
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"🔌 Disconnecting from {self.name}")
await asyncio.sleep(0.05) # Simulate cleanup time
self.connected = False
async def query(self, sql):
if not self.connected:
raise RuntimeError("Database not connected")
await asyncio.sleep(0.2) # Simulate query time
return f"Results for: {sql}"
async def database_operations():
"""Use database with proper cleanup"""
async with AsyncDatabase("UserDB") as db:
result = await db.query("SELECT * FROM users")
return result
result = asyncio.run(database_operations())
print(f"Query result: {result}")
Error Handling in Coroutines
Proper error handling is crucial in async code:
import random
async def unreliable_task(task_id):
"""Task that might fail"""
await asyncio.sleep(0.5)
if random.random() < 0.3: # 30% chance of failure
raise Exception(f"Task {task_id} failed!")
return f"Task {task_id} succeeded"
async def robust_task_runner():
"""Run multiple tasks with proper error handling"""
tasks = [unreliable_task(i) for i in range(5)]
# Use return_exceptions to handle failures gracefully
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = []
failures = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failures.append(f"Task {i}: {result}")
else:
successes.append(result)
print(f"Successes: {len(successes)}")
print(f"Failures: {len(failures)}")
asyncio.run(robust_task_runner())
Summary
Coroutines are the foundation of async programming:
Key Concepts
- Pausable Functions: Can suspend and resume execution
- Cooperative: Yield control voluntarily at
await
points - Stateful: Maintain state between suspensions
- Composable: Can be combined and chained together
Best Practices
- Use
async with
for resource management - Handle exceptions with
return_exceptions=True
- Use queues for inter-coroutine communication
- Always await coroutines properly
Common Patterns
- Producer-consumer with queues
- Resource management with async context managers
- Error handling with gather and return_exceptions
In Part 6, we’ll explore async generators and how they enable powerful streaming data patterns.
Async Generators and Streams
Async generators combine the power of generators with async programming, perfect for processing streams of data without loading everything into memory at once.
Understanding Async Generators
An async generator uses async def
and yield
to produce values asynchronously:
import asyncio
async def number_generator(max_num):
"""Generate numbers asynchronously"""
for i in range(max_num):
print(f"Generating {i}")
await asyncio.sleep(0.1) # Simulate async work
yield i
The generator yields values one at a time, with async operations between yields. This allows other code to run while the generator is working.
Consume the async generator with async for
:
async def consume_numbers():
"""Consume numbers from async generator"""
async for number in number_generator(5):
print(f"Received: {number}")
await asyncio.sleep(0.05) # Process the number
asyncio.run(consume_numbers())
Why this is powerful: The generator produces numbers one at a time, and the consumer processes them as they arrive. Neither blocks the other.
Data Processing Pipeline
Async generators excel at building data processing pipelines:
async def fetch_user_data(user_ids):
"""Simulate fetching user data from an API"""
for user_id in user_ids:
await asyncio.sleep(0.1) # Simulate API call
yield {
"id": user_id,
"name": f"User {user_id}",
"email": f"user{user_id}@example.com"
}
This generator fetches user data one user at a time, yielding each result as it becomes available.
Add a transformation stage to the pipeline:
async def enrich_user_data(user_stream):
"""Add additional data to each user"""
async for user in user_stream:
await asyncio.sleep(0.05) # Simulate enrichment
user["profile_complete"] = len(user["name"]) > 5
user["domain"] = user["email"].split("@")[1]
yield user
Each stage processes data as it flows through, without waiting for all data to be available.
Wire the pipeline together:
async def process_users():
"""Complete user processing pipeline"""
user_ids = [1, 2, 3, 4, 5]
# Create the pipeline
raw_users = fetch_user_data(user_ids)
enriched_users = enrich_user_data(raw_users)
# Process the pipeline
saved_count = 0
async for user in enriched_users:
print(f"Saved: {user['name']} ({user['email']})")
saved_count += 1
print(f"Pipeline complete: {saved_count} users processed")
asyncio.run(process_users())
Pipeline benefits: Each stage processes data as it becomes available. Memory usage stays constant regardless of data size.
Stream Transformation
Transform data streams with various operations:
async def map_stream(source_stream, transform_func):
"""Apply transformation to each item in stream"""
async for item in source_stream:
yield transform_func(item)
async def filter_stream(source_stream, predicate):
"""Filter stream items based on predicate"""
async for item in source_stream:
if predicate(item):
yield item
async def take_stream(source_stream, count):
"""Take only first N items from stream"""
taken = 0
async for item in source_stream:
if taken >= count:
break
yield item
taken += 1
async def number_stream():
"""Generate numbers"""
for i in range(20):
await asyncio.sleep(0.05)
yield i
async def stream_operations():
"""Demonstrate stream transformations"""
# Create pipeline: numbers -> squares -> evens -> first 5
numbers = number_stream()
squares = map_stream(numbers, lambda x: x * x)
evens = filter_stream(squares, lambda x: x % 2 == 0)
first_five = take_stream(evens, 5)
results = []
async for result in first_five:
results.append(result)
print(f"Result: {result}")
print(f"Final results: {results}")
asyncio.run(stream_operations())
Buffered Streams
Control memory usage with buffered async generators:
async def buffered_generator(source_stream, buffer_size=3):
"""Buffer items from source stream"""
buffer = []
async for item in source_stream:
buffer.append(item)
if len(buffer) >= buffer_size:
yield buffer.copy()
buffer.clear()
# Yield remaining items
if buffer:
yield buffer
async def data_source():
"""Generate data items"""
for i in range(10):
await asyncio.sleep(0.1)
yield f"item-{i}"
async def process_buffer(buffer):
"""Process a buffer of items"""
print(f"Processing buffer of {len(buffer)} items: {buffer}")
await asyncio.sleep(0.2) # Simulate batch processing
async def buffered_processing():
"""Process data in buffers"""
source = data_source()
buffered = buffered_generator(source, buffer_size=3)
async for buffer in buffered:
await process_buffer(buffer)
asyncio.run(buffered_processing())
Error Handling in Streams
Handle errors gracefully in async generators:
async def unreliable_data_source():
"""Data source that occasionally fails"""
for i in range(10):
await asyncio.sleep(0.1)
if i == 5: # Simulate error
raise Exception(f"Data source error at item {i}")
yield f"data-{i}"
async def resilient_stream(source_stream):
"""Make stream resilient to errors"""
try:
async for item in source_stream:
yield item
except Exception as e:
print(f"Stream error: {e}")
# Could implement retry logic here
async def error_handling_demo():
"""Demonstrate error handling in streams"""
try:
source = unreliable_data_source()
resilient = resilient_stream(source)
async for item in resilient:
print(f"Processed: {item}")
except Exception as e:
print(f"Stream processing failed: {e}")
asyncio.run(error_handling_demo())
Summary
Async generators enable powerful streaming patterns:
Key Benefits
- Memory Efficient: Process data without loading everything into memory
- Concurrent: Pipeline stages run concurrently
- Composable: Chain generators to build complex pipelines
- Real-time: Handle continuous data streams
Common Patterns
- Data processing pipelines
- Stream transformation operations
- Buffered processing for memory control
- Error handling in data streams
Best Practices
- Use pipelines for data transformation
- Control memory with buffering
- Chain operations for complex processing
- Handle errors gracefully in streams
In Part 7, we’ll explore error handling patterns for robust async applications.
Error Handling Basics
Robust async applications need sophisticated error handling. When multiple coroutines run concurrently, failures can cascade quickly without proper safeguards.
Exception Propagation in Async Code
Exceptions in async code behave differently than synchronous code:
import asyncio
import random
async def risky_task(task_id):
"""Task that might fail"""
await asyncio.sleep(0.5)
if random.random() < 0.3: # 30% failure rate
raise ValueError(f"Task {task_id} failed!")
return f"Task {task_id} completed"
async def basic_error_handling():
"""Basic error handling with try/except"""
for i in range(5):
try:
result = await risky_task(i)
print(f"Success: {result}")
except ValueError as e:
print(f"Error: {e}")
asyncio.run(basic_error_handling())
Handling Multiple Task Failures
When running multiple tasks concurrently, use return_exceptions=True
:
async def concurrent_error_handling():
"""Handle errors in concurrent tasks"""
tasks = [risky_task(i) for i in range(10)]
# Gather all results, including exceptions
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = []
failures = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failures.append(f"Task {i}: {result}")
else:
successes.append(result)
print(f"Successful tasks: {len(successes)}")
print(f"Failed tasks: {len(failures)}")
# Continue processing successful results
for success in successes:
print(f"Processing: {success}")
asyncio.run(concurrent_error_handling())
Task Cancellation and Cleanup
Properly cancel tasks and clean up resources:
async def long_running_task(name, duration):
"""Task that can be cancelled"""
try:
print(f"{name}: Starting work for {duration}s")
await asyncio.sleep(duration)
print(f"{name}: Work completed")
return f"{name} result"
except asyncio.CancelledError:
print(f"{name}: Task was cancelled, cleaning up...")
# Perform cleanup here
await asyncio.sleep(0.1) # Simulate cleanup
print(f"{name}: Cleanup complete")
raise # Re-raise to properly cancel
async def cancellation_demo():
"""Demonstrate task cancellation"""
# Start multiple tasks
tasks = [
asyncio.create_task(long_running_task("Task-A", 3)),
asyncio.create_task(long_running_task("Task-B", 5)),
asyncio.create_task(long_running_task("Task-C", 2))
]
try:
# Wait for 2.5 seconds, then cancel remaining tasks
await asyncio.sleep(2.5)
print("🛑 Cancelling remaining tasks...")
for task in tasks:
if not task.done():
task.cancel()
# Wait for all tasks to finish (including cancellation)
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, asyncio.CancelledError):
print(f"Task {i}: Cancelled")
elif isinstance(result, Exception):
print(f"Task {i}: Failed - {result}")
else:
print(f"Task {i}: Completed - {result}")
except KeyboardInterrupt:
print("Received interrupt, cancelling all tasks...")
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
asyncio.run(cancellation_demo())
Timeout Management
Handle timeouts gracefully across your application:
async def timeout_wrapper(coroutine, timeout_seconds, operation_name="Operation"):
"""Wrapper for timeout handling"""
try:
return await asyncio.wait_for(coroutine, timeout=timeout_seconds)
except asyncio.TimeoutError:
print(f"{operation_name} timed out after {timeout_seconds}s")
raise
except Exception as e:
print(f"{operation_name} failed: {e}")
raise
async def slow_operation():
"""Simulate slow operation"""
await asyncio.sleep(3)
return {"users": ["alice", "bob", "charlie"]}
async def timeout_demo():
"""Demonstrate timeout handling"""
operations = [
(slow_operation(), 2.5, "Database Query"),
(slow_operation(), 1.5, "API Call"),
(slow_operation(), 4.0, "File Processing")
]
for coroutine, timeout, name in operations:
try:
result = await timeout_wrapper(coroutine, timeout, name)
print(f"{name}: Success")
except asyncio.TimeoutError:
print(f"{name}: Timed out")
except Exception as e:
print(f"{name}: Failed - {e}")
asyncio.run(timeout_demo())
Retry with Exponential Backoff
Implement sophisticated retry mechanisms:
async def retry_with_backoff(
coroutine_func,
max_retries=3,
base_delay=1,
backoff_factor=2
):
"""Retry with exponential backoff"""
for attempt in range(max_retries + 1):
try:
return await coroutine_func()
except Exception as e:
if attempt == max_retries:
print(f"🚫 All {max_retries + 1} attempts failed")
raise e
# Calculate delay with exponential backoff
delay = base_delay * (backoff_factor ** attempt)
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {delay:.1f}s...")
await asyncio.sleep(delay)
async def flaky_api_call():
"""API call that fails randomly"""
await asyncio.sleep(0.2)
if random.random() < 0.6: # 60% failure rate
raise Exception("API temporarily unavailable")
return {"status": "success", "data": "API response"}
async def retry_demo():
"""Demonstrate retry with backoff"""
try:
result = await retry_with_backoff(
flaky_api_call,
max_retries=4,
base_delay=0.5,
backoff_factor=2
)
print(f"Success: {result}")
except Exception as e:
print(f"💥 Final failure: {e}")
asyncio.run(retry_demo())
Summary
Basic error handling in async applications:
Key Patterns
- Exception Handling: Use
return_exceptions=True
for concurrent tasks - Task Cancellation: Implement proper cleanup in cancelled tasks
- Timeouts: Set appropriate timeouts for all operations
- Retry Logic: Implement exponential backoff for transient failures
Best Practices
- Always handle exceptions in concurrent tasks
- Implement timeouts for external operations
- Use proper cleanup in cancellation handlers
- Plan retry strategies for unreliable services
Common Patterns
- Timeout wrappers for operations
- Retry mechanisms with backoff
- Graceful task cancellation
- Exception aggregation in concurrent operations
In Part 8, we’ll explore advanced communication patterns including events, queues, and coordination mechanisms.
Advanced Communication Patterns
Async applications need sophisticated ways for coroutines to communicate and coordinate. Let’s explore events, queues, and other coordination mechanisms.
Communication with Events
Use events to coordinate between coroutines:
import asyncio
async def waiter(event, name, work_duration):
"""Wait for event, then do work"""
print(f"{name}: Waiting for event...")
await event.wait()
print(f"{name}: Event received, starting work")
await asyncio.sleep(work_duration)
print(f"{name}: Work completed")
async def event_setter(event, delay):
"""Set event after delay"""
print(f"Preparing to set event in {delay}s...")
await asyncio.sleep(delay)
print("Setting event for all waiters")
event.set()
async def event_demo():
"""Demonstrate event-based communication"""
event = asyncio.Event()
# Start multiple waiters and setter
await asyncio.gather(
waiter(event, "Worker-1", 1),
waiter(event, "Worker-2", 2),
event_setter(event, 2)
)
asyncio.run(event_demo())
Producer-Consumer with Queues
Implement robust producer-consumer patterns:
import random
async def producer(queue, items, producer_name):
"""Produce items with error handling"""
try:
for item in items:
await queue.put(item)
print(f"📦 {producer_name}: Produced {item}")
await asyncio.sleep(0.1)
# Signal completion
await queue.put(None)
print(f"🏁 {producer_name}: Production complete")
except Exception as e:
print(f"{producer_name}: Production failed - {e}")
await queue.put(None) # Still signal completion
async def consumer(queue, consumer_name):
"""Consume items with error handling"""
processed = 0
try:
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=5.0)
except asyncio.TimeoutError:
print(f"{consumer_name}: Timeout waiting for items")
break
if item is None: # End signal
print(f"🏁 {consumer_name}: Received end signal")
break
await asyncio.sleep(0.2) # Simulate processing
processed += 1
print(f"{consumer_name}: Processed {item}")
except Exception as e:
print(f"{consumer_name}: Consumer failed - {e}")
finally:
print(f"{consumer_name}: Processed {processed} items total")
async def producer_consumer_demo():
"""Robust producer-consumer with error handling"""
queue = asyncio.Queue(maxsize=5) # Bounded queue
items = [f"item-{i}" for i in range(10)]
# Start producer and consumers
await asyncio.gather(
producer(queue, items, "Producer-1"),
consumer(queue, "Consumer-A"),
consumer(queue, "Consumer-B"),
return_exceptions=True # Don't let one failure stop others
)
asyncio.run(producer_consumer_demo())
Semaphores for Resource Control
Control access to limited resources:
async def rate_limited_operations(urls, max_concurrent=5):
"""Limit concurrent operations"""
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(url):
async with semaphore:
return await fetch_url(url)
tasks = [fetch_with_limit(url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch_url(url):
await asyncio.sleep(0.5) # Simulate network request
return f"Data from {url}"
async def semaphore_demo():
"""Demonstrate semaphore usage"""
urls = [f"https://api.example.com/data/{i}" for i in range(10)]
print("Fetching URLs with concurrency limit...")
results = await rate_limited_operations(urls, max_concurrent=3)
print(f"Fetched {len(results)} URLs")
asyncio.run(semaphore_demo())
Locks for Shared Resources
Protect shared resources with async locks:
class AsyncCounter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self, worker_name):
"""Thread-safe increment"""
async with self.lock:
# Critical section
current = self.value
await asyncio.sleep(0.01) # Simulate some work
self.value = current + 1
print(f"{worker_name}: Incremented to {self.value}")
async def worker(counter, worker_name, iterations):
"""Worker that increments counter"""
for i in range(iterations):
await counter.increment(f"{worker_name}-{i}")
await asyncio.sleep(0.05)
async def lock_demo():
"""Demonstrate async lock usage"""
counter = AsyncCounter()
# Start multiple workers
await asyncio.gather(
worker(counter, "Worker-A", 5),
worker(counter, "Worker-B", 5)
)
print(f"Final counter value: {counter.value}")
asyncio.run(lock_demo())
Barriers for Synchronization
Synchronize multiple coroutines at checkpoints:
async def worker_with_barrier(barrier, worker_name, work_time):
"""Worker that synchronizes at barrier"""
# Phase 1: Individual work
print(f"{worker_name}: Starting phase 1")
await asyncio.sleep(work_time)
print(f"{worker_name}: Phase 1 complete")
# Synchronization point
print(f"{worker_name}: Waiting at barrier")
await barrier.wait()
# Phase 2: Coordinated work
print(f"{worker_name}: Starting phase 2")
await asyncio.sleep(0.5)
print(f"{worker_name}: Phase 2 complete")
async def barrier_demo():
"""Demonstrate barrier synchronization"""
# Create barrier for 3 workers
barrier = asyncio.Barrier(3)
# Start workers with different work times
await asyncio.gather(
worker_with_barrier(barrier, "Worker-A", 1.0),
worker_with_barrier(barrier, "Worker-B", 2.0),
worker_with_barrier(barrier, "Worker-C", 1.5)
)
asyncio.run(barrier_demo())
Summary
Advanced communication patterns enable sophisticated coordination:
Key Mechanisms
- Events: Simple signaling between coroutines
- Queues: Producer-consumer patterns with backpressure
- Semaphores: Control access to limited resources
- Locks: Protect shared resources from race conditions
- Barriers: Synchronize multiple coroutines at checkpoints
Best Practices
- Use appropriate synchronization primitives for each scenario
- Handle timeouts in queue operations
- Implement proper error handling in producers and consumers
- Consider backpressure in high-throughput scenarios
Common Patterns
- Rate limiting with semaphores
- Shared resource protection with locks
- Phase synchronization with barriers
- Event-driven coordination
In Part 9, we’ll explore building web APIs that leverage these communication patterns.
Building Web APIs
Web APIs are perfect for async programming because they spend most of their time waiting - for database queries, external API calls, and file operations. Let’s build efficient APIs step by step.
Why Async APIs Matter
Consider a traditional synchronous web server:
- Request 1: Get user data (100ms database query)
- Request 2: Must wait for Request 1 to complete
- Request 3: Must wait for Requests 1 and 2
With async APIs, all three requests can run simultaneously, sharing the same server resources efficiently.
Basic FastAPI Setup
FastAPI makes async web development straightforward:
from fastapi import FastAPI, HTTPException
import asyncio
app = FastAPI(title="My Async API")
@app.get("/")
async def root():
return {"message": "Hello Async World!"}
The basic setup is simple - just add async
to your route handlers to enable async processing.
Add endpoints that demonstrate async benefits:
@app.get("/slow-endpoint")
async def slow_endpoint():
await asyncio.sleep(2) # Simulate slow database query
return {"message": "This took 2 seconds, but didn't block other requests!"}
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Simulate async database call
await asyncio.sleep(0.1)
if user_id == 404:
raise HTTPException(status_code=404, detail="User not found")
return {
"id": user_id,
"name": f"User {user_id}",
"email": f"user{user_id}@example.com"
}
# Run with: uvicorn main:app --reload
Database Integration
Integrate with async database drivers:
import asyncpg
from fastapi import Depends
# Database connection pool
db_pool = None
@app.on_event("startup")
async def startup():
global db_pool
db_pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/mydb",
min_size=5,
max_size=20
)
@app.on_event("shutdown")
async def shutdown():
if db_pool:
await db_pool.close()
async def get_db():
"""Dependency to get database connection"""
async with db_pool.acquire() as connection:
yield connection
@app.get("/users")
async def list_users(limit: int = 10, db=Depends(get_db)):
"""List users with pagination"""
query = "SELECT id, name, email FROM users LIMIT $1"
rows = await db.fetch(query, limit)
return [
{"id": row["id"], "name": row["name"], "email": row["email"]}
for row in rows
]
@app.post("/users")
async def create_user(user_data: dict, db=Depends(get_db)):
"""Create a new user"""
query = """
INSERT INTO users (name, email)
VALUES ($1, $2)
RETURNING id, name, email
"""
row = await db.fetchrow(query, user_data["name"], user_data["email"])
return {"id": row["id"], "name": row["name"], "email": row["email"]}
External API Integration
Make concurrent calls to external services:
import aiohttp
async def fetch_weather(city: str) -> dict:
"""Fetch weather data from external API"""
async with aiohttp.ClientSession() as session:
url = f"https://api.weather.com/v1/current?city={city}"
try:
async with session.get(url, timeout=5) as response:
if response.status == 200:
return await response.json()
else:
return {"error": "Weather service unavailable"}
except asyncio.TimeoutError:
return {"error": "Weather service timeout"}
async def fetch_news(category: str) -> dict:
"""Fetch news from external API"""
async with aiohttp.ClientSession() as session:
url = f"https://api.news.com/v1/headlines?category={category}"
try:
async with session.get(url, timeout=5) as response:
if response.status == 200:
return await response.json()
else:
return {"articles": []} # Graceful degradation
except:
return {"articles": []} # Graceful degradation
@app.get("/dashboard/{city}")
async def get_dashboard(city: str):
"""Get dashboard data from multiple sources concurrently"""
# Fetch data from multiple sources simultaneously
weather, news = await asyncio.gather(
fetch_weather(city),
fetch_news("technology"),
return_exceptions=True
)
# Handle partial failures gracefully
dashboard = {"city": city}
if isinstance(weather, dict):
dashboard["weather"] = weather
else:
dashboard["weather"] = {"error": "Weather unavailable"}
if isinstance(news, dict):
dashboard["news"] = news
else:
dashboard["news"] = {"articles": []}
return dashboard
Background Tasks
Handle long-running operations with background tasks:
from fastapi import BackgroundTasks
import logging
logger = logging.getLogger(__name__)
async def send_email(email: str, subject: str):
"""Simulate sending email"""
logger.info(f"Sending email to {email}: {subject}")
await asyncio.sleep(2) # Simulate email sending
logger.info(f"Email sent to {email}")
async def process_image(image_path: str, user_id: int):
"""Simulate image processing"""
logger.info(f"Processing image {image_path} for user {user_id}")
await asyncio.sleep(5) # Simulate processing
logger.info(f"Image processing complete for user {user_id}")
@app.post("/users/{user_id}/upload")
async def upload_image(
user_id: int,
image_data: dict,
background_tasks: BackgroundTasks
):
"""Upload and process image"""
# Save image immediately
image_path = f"/uploads/{user_id}/{image_data['filename']}"
# Process image in background
background_tasks.add_task(process_image, image_path, user_id)
# Send confirmation email in background
background_tasks.add_task(
send_email,
"[email protected]",
"Image Upload Confirmation"
)
return {
"message": "Image uploaded successfully",
"image_path": image_path,
"status": "processing"
}
Rate Limiting
Implement basic rate limiting:
import time
from collections import defaultdict
from fastapi import Request, HTTPException
class RateLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
async def is_allowed(self, client_id: str) -> bool:
"""Check if request is allowed"""
now = time.time()
# Clean old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if now - req_time < self.window_seconds
]
# Check if under limit
if len(self.requests[client_id]) < self.max_requests:
self.requests[client_id].append(now)
return True
return False
# Global rate limiter: 100 requests per minute
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
async def rate_limit_middleware(request: Request, call_next):
"""Rate limiting middleware"""
client_ip = request.client.host
if not await rate_limiter.is_allowed(client_ip):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded. Try again later."
)
response = await call_next(request)
return response
app.middleware("http")(rate_limit_middleware)
Summary
Building production-ready async APIs requires:
Key Components
- FastAPI Framework: Modern, fast async web framework
- Database Integration: Async database drivers and connection pooling
- External APIs: Concurrent HTTP requests with proper error handling
- Background Tasks: Long-running operations without blocking requests
- Rate Limiting: Protection against abuse
Best Practices
- Use connection pooling for databases and HTTP clients
- Implement proper error handling and logging
- Add rate limiting to protect against abuse
- Handle partial failures gracefully
- Use background tasks for long-running operations
Performance Tips
- Pool database connections
- Reuse HTTP sessions
- Implement timeouts for external calls
- Use concurrent requests where possible
- Cache expensive operations
In Part 10, we’ll explore data processing pipelines that can handle large-scale data efficiently.
Data Processing Pipelines
Async programming excels at data processing tasks that involve I/O operations. Let’s build efficient data processing systems that can handle large volumes of data.
Why Async for Data Processing
Traditional data processing is often I/O bound:
- Reading files: CPU waits for disk operations
- API calls: CPU waits for network responses
- Database queries: CPU waits for query results
With async processing, while one operation waits for I/O, others can continue processing. This dramatically improves throughput.
Processing Multiple Files Concurrently
Instead of processing files one by one, process them simultaneously:
import asyncio
import aiofiles
from pathlib import Path
import json
async def process_single_file(file_path: Path):
"""Process a single JSON file"""
try:
async with aiofiles.open(file_path, 'r') as file:
content = await file.read()
data = json.loads(content)
record_count = len(data) if isinstance(data, list) else 1
return {
"file": file_path.name,
"records": record_count,
"status": "success"
}
except Exception as e:
return {
"file": file_path.name,
"error": str(e),
"status": "failed"
}
The key insight here is using aiofiles
for non-blocking file operations. Regular file operations would block the event loop, preventing other files from being processed concurrently.
Now we need a way to process files in controlled batches to avoid overwhelming the system:
async def process_files_batch(file_paths, batch_size=10):
"""Process files in batches to control memory usage"""
results = []
for i in range(0, len(file_paths), batch_size):
batch = file_paths[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {len(batch)} files")
# Process batch concurrently
batch_tasks = [process_single_file(path) for path in batch]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
# Optional: brief pause between batches
await asyncio.sleep(0.1)
return results
Batching prevents memory exhaustion when processing thousands of files. Each batch runs concurrently, but we limit how many files are processed simultaneously.
Here’s how to put it all together:
async def file_processing_demo():
"""Demonstrate concurrent file processing"""
# Simulate file paths
file_paths = [Path(f"data/file_{i}.json") for i in range(50)]
start_time = asyncio.get_event_loop().time()
results = await process_files_batch(file_paths, batch_size=10)
elapsed = asyncio.get_event_loop().time() - start_time
successful = len([r for r in results if r["status"] == "success"])
failed = len([r for r in results if r["status"] == "failed"])
print(f"Processed {len(results)} files in {elapsed:.2f}s")
print(f"Successful: {successful}")
print(f"Failed: {failed}")
asyncio.run(file_processing_demo())
ETL Pipeline with Async Generators
Build Extract-Transform-Load pipelines using async generators:
import aiohttp
async def extract_api_data(api_urls):
"""Extract data from multiple APIs"""
async with aiohttp.ClientSession() as session:
for url in api_urls:
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
data = await response.json()
yield data
else:
print(f"Failed to fetch {url}: {response.status}")
except Exception as e:
print(f"Error fetching {url}: {e}")
The extraction phase uses async generators to yield data as it becomes available. This allows the transformation phase to start processing while extraction continues, creating a streaming pipeline.
Next, we transform the raw data:
async def transform_data(data_stream):
"""Transform raw data"""
async for raw_data in data_stream:
# Clean and transform data
transformed = {
"id": raw_data.get("id"),
"name": raw_data.get("name", "").strip().title(),
"email": raw_data.get("email", "").lower(),
"processed_at": asyncio.get_event_loop().time()
}
# Skip invalid records
if transformed["id"] and transformed["email"]:
yield transformed
The transformation happens record-by-record as data flows through. Invalid records are filtered out immediately rather than being processed further.
Finally, we load the data in batches for efficiency:
async def load_to_database(transformed_stream):
"""Load transformed data to database (simulated)"""
batch = []
batch_size = 100
async for record in transformed_stream:
batch.append(record)
if len(batch) >= batch_size:
await insert_batch(batch)
batch.clear()
# Insert remaining records
if batch:
await insert_batch(batch)
async def insert_batch(records):
"""Insert batch of records to database (simulated)"""
# Simulate database insert
await asyncio.sleep(0.1)
print(f"Inserted batch of {len(records)} records")
Batching reduces database overhead by grouping multiple records into single operations. The pipeline processes data continuously without waiting for all extraction to complete.
Here’s how to wire the pipeline together:
# Load to database
await load_to_database(transformed_data)
print("ETL pipeline completed successfully")
asyncio.run(etl_pipeline())
Real-time Data Processing
Process streaming data in real-time:
import random
from datetime import datetime
async def data_stream_simulator():
"""Simulate real-time data stream"""
while True:
# Generate sample event
event = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": random.randint(1, 1000),
"event_type": random.choice(["login", "logout", "purchase", "view"]),
"value": random.uniform(0, 100)
}
yield event
await asyncio.sleep(0.1) # 10 events per second
async def event_aggregator(event_stream, window_seconds=5):
"""Aggregate events in time windows"""
window_data = {}
last_flush = asyncio.get_event_loop().time()
async for event in event_stream:
current_time = asyncio.get_event_loop().time()
# Add event to current window
event_type = event["event_type"]
if event_type not in window_data:
window_data[event_type] = {"count": 0, "total_value": 0}
window_data[event_type]["count"] += 1
window_data[event_type]["total_value"] += event["value"]
# Flush window if time elapsed
if current_time - last_flush >= window_seconds:
if window_data:
yield {
"window_start": last_flush,
"window_end": current_time,
"aggregates": window_data.copy()
}
window_data.clear()
last_flush = current_time
async def process_aggregates(aggregate_stream):
"""Process aggregated data"""
async for window in aggregate_stream:
print(f"Window {window['window_start']:.1f}-{window['window_end']:.1f}:")
for event_type, stats in window["aggregates"].items():
avg_value = stats["total_value"] / stats["count"]
print(f" {event_type}: {stats['count']} events, avg value: {avg_value:.1f}")
async def real_time_processing():
"""Real-time data processing pipeline"""
# Create processing pipeline
events = data_stream_simulator()
aggregates = event_aggregator(events, window_seconds=3)
# Process aggregates (this will run indefinitely)
try:
await process_aggregates(aggregates)
except KeyboardInterrupt:
print("Stopping real-time processing...")
# Run for a limited time in demo
async def demo_real_time():
task = asyncio.create_task(real_time_processing())
# Run for 15 seconds
await asyncio.sleep(15)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Demo completed")
asyncio.run(demo_real_time())
Summary
Async data processing enables efficient handling of I/O-bound operations:
Key Patterns
- Concurrent File Processing: Process multiple files simultaneously
- ETL Pipelines: Extract, transform, and load data using async generators
- Real-time Processing: Handle streaming data with time-windowed aggregation
- Batch Processing: Control memory usage with batching
Performance Benefits
- Higher Throughput: Process more data in less time
- Better Resource Utilization: CPU stays busy while waiting for I/O
- Scalability: Handle larger datasets with the same resources
- Responsiveness: System remains responsive during processing
Best Practices
- Process data in batches to control memory usage
- Use async generators for streaming data
- Implement proper error handling for partial failures
- Monitor memory usage and processing rates
- Use connection pooling for database operations
In Part 11, we’ll explore data enrichment and validation patterns for complex processing scenarios.
Data Enrichment and Validation
Complex data processing often requires enriching data from multiple sources and validating it asynchronously. Let’s explore patterns for parallel data enrichment and validation.
Parallel Data Enrichment
Enrich data by calling multiple services concurrently:
import asyncio
async def enrich_user_data(user_id: int):
"""Enrich user data from multiple sources"""
async def get_profile(user_id):
await asyncio.sleep(0.2) # Simulate API call
return {"name": f"User {user_id}", "age": 25 + (user_id % 40)}
async def get_preferences(user_id):
await asyncio.sleep(0.3) # Simulate API call
return {"theme": "dark", "notifications": True}
async def get_activity(user_id):
await asyncio.sleep(0.1) # Simulate API call
return {"last_login": "2024-01-01", "login_count": user_id * 10}
These three functions simulate calling different microservices or APIs. In a real application, these might be calls to user service, preferences service, and analytics service.
The key is to fetch all data concurrently rather than sequentially:
# Fetch all data concurrently
profile, preferences, activity = await asyncio.gather(
get_profile(user_id),
get_preferences(user_id),
get_activity(user_id),
return_exceptions=True
)
Using return_exceptions=True
ensures that if one service fails, the others can still succeed. This makes the enrichment process resilient to partial failures.
Finally, we combine the results safely:
# Combine results, handling failures gracefully
enriched_data = {"user_id": user_id}
if isinstance(profile, dict):
enriched_data.update(profile)
if isinstance(preferences, dict):
enriched_data["preferences"] = preferences
if isinstance(activity, dict):
enriched_data["activity"] = activity
return enriched_data
This pattern fetches data from multiple sources simultaneously, significantly reducing total processing time.
Batch Enrichment Processing
Process multiple users efficiently:
async def batch_enrich_users(user_ids, batch_size=20):
"""Enrich multiple users in batches"""
results = []
for i in range(0, len(user_ids), batch_size):
batch = user_ids[i:i + batch_size]
print(f"Enriching batch {i//batch_size + 1}: {len(batch)} users")
# Enrich batch concurrently
batch_tasks = [enrich_user_data(uid) for uid in batch]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
return results
async def enrichment_demo():
"""Demonstrate parallel data enrichment"""
user_ids = list(range(1, 51)) # 50 users
start_time = asyncio.get_event_loop().time()
enriched_users = await batch_enrich_users(user_ids, batch_size=10)
elapsed = asyncio.get_event_loop().time() - start_time
print(f"Enriched {len(enriched_users)} users in {elapsed:.2f}s")
asyncio.run(enrichment_demo())
Data Validation Pipeline
Validate and clean data asynchronously:
import re
async def validate_email(email: str) -> bool:
"""Validate email format"""
await asyncio.sleep(0.01) # Simulate validation work
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
async def validate_phone(phone: str) -> bool:
"""Validate phone number"""
await asyncio.sleep(0.01) # Simulate validation work
cleaned = re.sub(r'[^\d]', '', phone)
return 10 <= len(cleaned) <= 15
async def validate_age(age) -> bool:
"""Validate age"""
await asyncio.sleep(0.005) # Simulate validation work
try:
age_int = int(age)
return 0 <= age_int <= 150
except (ValueError, TypeError):
return False
async def validate_record(record: dict) -> dict:
"""Validate a single record"""
# Run all validations concurrently
email_valid, phone_valid, age_valid = await asyncio.gather(
validate_email(record.get("email", "")),
validate_phone(record.get("phone", "")),
validate_age(record.get("age")),
return_exceptions=True
)
validation_results = {
"email_valid": email_valid if isinstance(email_valid, bool) else False,
"phone_valid": phone_valid if isinstance(phone_valid, bool) else False,
"age_valid": age_valid if isinstance(age_valid, bool) else False
}
# Overall validity
validation_results["is_valid"] = all(validation_results.values())
return {
**record,
"validation": validation_results
}
Batch Validation Processing
Process validation efficiently:
async def validation_pipeline(records):
"""Validate multiple records"""
batch_size = 50
validated_records = []
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
# Validate batch concurrently
batch_tasks = [validate_record(record) for record in batch]
batch_results = await asyncio.gather(*batch_tasks)
validated_records.extend(batch_results)
print(f"Validated batch {i//batch_size + 1}: {len(batch)} records")
return validated_records
async def validation_demo():
"""Demonstrate data validation pipeline"""
# Sample data with various quality issues
sample_records = [
{"name": "John Doe", "email": "[email protected]", "phone": "555-1234", "age": 30},
{"name": "Jane Smith", "email": "invalid-email", "phone": "555-5678", "age": 25},
{"name": "Bob Johnson", "email": "[email protected]", "phone": "not-a-phone", "age": "thirty"},
] * 50 # Multiply to simulate larger dataset
start_time = asyncio.get_event_loop().time()
validated = await validation_pipeline(sample_records)
elapsed = asyncio.get_event_loop().time() - start_time
# Analyze results
valid_count = len([r for r in validated if r["validation"]["is_valid"]])
invalid_count = len(validated) - valid_count
print(f"\nValidation Results:")
print(f"Total records: {len(validated)}")
print(f"Valid: {valid_count}")
print(f"Invalid: {invalid_count}")
print(f"Processing time: {elapsed:.2f}s")
asyncio.run(validation_demo())
Memory-Efficient Processing
Process large datasets without loading everything into memory:
async def memory_efficient_processor(data_size: int):
"""Process large dataset without loading it all into memory"""
async def process_chunk(chunk_data):
"""Process a chunk of data"""
processed_rows = []
for row in chunk_data:
# Simulate processing each row
await asyncio.sleep(0.001)
processed_row = {
"id": row.get("id"),
"processed_value": float(row.get("value", 0)) * 1.1,
"status": "processed"
}
processed_rows.append(processed_row)
return processed_rows
# Process in chunks
total_processed = 0
chunk_size = 100
for chunk_start in range(0, data_size, chunk_size):
chunk_data = [
{"id": i, "value": i * 0.5}
for i in range(chunk_start, min(chunk_start + chunk_size, data_size))
]
# Process chunk
processed_chunk = await process_chunk(chunk_data)
total_processed += len(processed_chunk)
if total_processed % 1000 == 0:
print(f"Processed {total_processed} items...")
print(f"Processed {total_processed} items total")
asyncio.run(memory_efficient_processor(5000))
Summary
Advanced data processing patterns enable robust, scalable data handling:
Key Patterns
- Parallel Enrichment: Fetch additional data from multiple sources concurrently
- Batch Processing: Process data in manageable chunks
- Validation Pipelines: Validate data using concurrent validation functions
- Memory-Efficient Processing: Handle large datasets without memory issues
Performance Benefits
- Concurrent Operations: Multiple data sources accessed simultaneously
- Batch Processing: Optimal memory usage and throughput
- Resilient Processing: Graceful handling of partial failures
- Scalable Architecture: Handle growing data volumes efficiently
Best Practices
- Use batching to control memory usage and concurrency
- Implement proper error handling and retry logic
- Monitor processing rates and resource usage
- Design for partial failures and graceful degradation
In Part 12, we’ll explore caching and connection pooling for high-performance applications.
Caching and Connection Pooling
Advanced async applications need sophisticated resource management. Let’s explore caching patterns and connection pooling for optimal performance.
Basic Async Cache
Start with a simple cache implementation:
import asyncio
import time
from typing import Any, Optional, Dict
class AsyncCache:
def __init__(self, default_ttl: int = 300):
self.cache: Dict[str, Dict[str, Any]] = {}
self.default_ttl = default_ttl
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
if key not in self.cache:
return None
entry = self.cache[key]
# Check if expired
if time.time() > entry['expires']:
del self.cache[key]
return None
return entry['value']
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Set value in cache"""
ttl = ttl or self.default_ttl
self.cache[key] = {
'value': value,
'expires': time.time() + ttl
}
async def delete(self, key: str) -> bool:
"""Delete key from cache"""
if key in self.cache:
del self.cache[key]
return True
return False
async def clear(self) -> None:
"""Clear all cache entries"""
self.cache.clear()
# Usage
cache = AsyncCache(default_ttl=600)
async def get_user_data(user_id: str):
# Try cache first
cached_data = await cache.get(f"user:{user_id}")
if cached_data:
return cached_data
# Fetch from database
user_data = await fetch_user_from_db(user_id)
# Cache the result
await cache.set(f"user:{user_id}", user_data, ttl=300)
return user_data
Redis-based Async Cache
Use Redis for distributed caching:
import aioredis
import json
import asyncio
class RedisCache:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.redis = None
async def connect(self):
"""Connect to Redis"""
self.redis = aioredis.from_url(self.redis_url)
async def get(self, key: str):
"""Get value from Redis cache"""
if not self.redis:
await self.connect()
value = await self.redis.get(key)
if value:
return json.loads(value)
return None
async def set(self, key: str, value, ttl: int = 300):
"""Set value in Redis cache"""
if not self.redis:
await self.connect()
serialized_value = json.dumps(value)
await self.redis.setex(key, ttl, serialized_value)
async def close(self):
"""Close Redis connection"""
if self.redis:
await self.redis.close()
Cache Decorators
Create reusable cache decorators:
import functools
import hashlib
def async_cache(ttl: int = 300):
"""Decorator for caching async function results"""
def decorator(func):
cache = AsyncCache(default_ttl=ttl)
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Create cache key from function name and arguments
key_data = f"{func.__name__}:{str(args)}:{str(sorted(kwargs.items()))}"
cache_key = hashlib.md5(key_data.encode()).hexdigest()
# Try cache first
cached_result = await cache.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function
result = await func(*args, **kwargs)
# Cache the result
await cache.set(cache_key, result, ttl=ttl)
return result
return wrapper
return decorator
Connection Pooling
Implement efficient database connection pooling:
import asyncpg
import asyncio
from contextlib import asynccontextmanager
class DatabasePool:
def __init__(self, database_url: str, min_size: int = 10, max_size: int = 20):
self.database_url = database_url
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def initialize(self):
"""Initialize the connection pool"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=self.min_size,
max_size=self.max_size
)
@asynccontextmanager
async def acquire(self):
"""Acquire a connection from the pool"""
if not self.pool:
await self.initialize()
async with self.pool.acquire() as connection:
yield connection
async def fetch(self, query: str, *args):
"""Fetch results using the pool"""
async with self.acquire() as conn:
return await conn.fetch(query, *args)
async def close(self):
"""Close the connection pool"""
if self.pool:
await self.pool.close()
Best Practices
Key caching and connection pooling principles:
Caching Strategy:
- Use appropriate TTL values based on data freshness requirements
- Implement cache invalidation for critical data updates
- Use distributed caching (Redis) for multi-instance applications
Connection Pooling:
- Set appropriate pool sizes based on expected load
- Monitor connection usage and adjust pool sizes
- Implement proper connection cleanup and error handling
Performance Optimization:
- Cache expensive computations and database queries
- Reuse connections across multiple requests
- Monitor cache hit rates and connection pool utilization
Summary
Caching and connection pooling essentials:
- Implement async caching for expensive operations
- Use Redis for distributed caching across multiple instances
- Create reusable cache decorators for common patterns
- Implement database connection pooling for better performance
- Monitor and tune cache and pool configurations
Proper caching and connection pooling significantly improve async application performance and resource utilization.
In Part 13, we’ll explore circuit breakers and resilience patterns.
Circuit Breakers and Resilience
When external services fail, they often fail spectacularly. Without protection, one failing API can bring down your entire async application as requests pile up waiting for timeouts. Circuit breakers act like electrical fuses - they trip when things go wrong, protecting your system from cascading failures.
Basic Circuit Breaker
Build a simple but effective circuit breaker:
import asyncio
import time
from enum import Enum
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Blocking requests
HALF_OPEN = "half_open" # Testing if service recovered
A circuit breaker has three states: closed (normal), open (blocking requests), and half-open (testing recovery). The state transitions based on success/failure patterns.
Here’s the core circuit breaker logic:
class CircuitBreaker:
def __init__(self,
failure_threshold: int = 5,
timeout: float = 60.0,
expected_exception: type = Exception):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
The circuit breaker tracks failures and automatically opens when the threshold is exceeded. After a timeout period, it enters half-open state to test if the service has recovered.
The main execution method handles state transitions:
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
When the circuit is open, it either blocks the request or allows one test request if enough time has passed.
The helper methods manage state transitions:
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset"""
return (time.time() - self.last_failure_time) >= self.timeout
def _on_success(self):
"""Handle successful call"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30.0)
async def unreliable_service():
"""Simulate an unreliable external service"""
import random
if random.random() < 0.7: # 70% failure rate
raise Exception("Service unavailable")
return "Success"
Advanced Circuit Breaker with Metrics
Add monitoring and metrics:
import asyncio
import time
from dataclasses import dataclass
@dataclass
class CircuitBreakerMetrics:
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
class AdvancedCircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: float = 60.0):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self.metrics = CircuitBreakerMetrics()
async def call(self, func, *args, **kwargs):
"""Execute function with advanced circuit breaker protection"""
self.metrics.total_requests += 1
if self.state == CircuitState.OPEN:
if (time.time() - self.last_failure_time) >= self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self.metrics.successful_requests += 1
self.failure_count = 0
self.state = CircuitState.CLOSED
return result
except Exception as e:
self.metrics.failed_requests += 1
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise e
def get_metrics(self):
"""Get circuit breaker metrics"""
return {
"state": self.state.value,
"total_requests": self.metrics.total_requests,
"success_rate": (
self.metrics.successful_requests / self.metrics.total_requests
if self.metrics.total_requests > 0 else 0
)
}
Retry with Exponential Backoff
Combine circuit breakers with retry logic:
import asyncio
import random
class RetryableCircuitBreaker:
def __init__(self,
max_retries: int = 3,
base_delay: float = 1.0,
failure_threshold: int = 5):
self.max_retries = max_retries
self.base_delay = base_delay
self.circuit_breaker = AdvancedCircuitBreaker(failure_threshold=failure_threshold)
async def call_with_retry(self, func, *args, **kwargs):
"""Execute function with retry and circuit breaker protection"""
for attempt in range(self.max_retries + 1):
try:
return await self.circuit_breaker.call(func, *args, **kwargs)
except Exception as e:
if attempt == self.max_retries:
raise e
# Exponential backoff with jitter
delay = self.base_delay * (2 ** attempt)
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)
# Usage
retryable_breaker = RetryableCircuitBreaker(max_retries=3, base_delay=1.0)
async def flaky_service():
if random.random() < 0.5:
raise Exception("Temporary failure")
return "Success"
Making Circuit Breakers Work in Real Systems
Lessons learned from implementing resilience patterns:
Circuit Breaker Design:
- Set appropriate failure thresholds based on service characteristics
- Use different timeouts for different services
- Monitor circuit breaker state and metrics
- Implement graceful degradation when circuits are open
Retry Strategy:
- Use exponential backoff with jitter
- Set maximum retry limits
- Don’t retry on certain error types (4xx HTTP errors)
- Combine with circuit breakers for better protection
Resource Isolation:
- Use bulkheads to isolate different resource types
- Set appropriate concurrency limits
- Monitor resource utilization
- Implement separate thread pools for different operations
Summary
Resilience pattern essentials:
- Implement circuit breakers to prevent cascading failures
- Use retry with exponential backoff for transient failures
- Apply bulkhead pattern to isolate resources
- Set appropriate timeouts for all operations
- Monitor metrics and adjust thresholds based on behavior
- Combine patterns for comprehensive resilience
These patterns ensure your async applications remain stable and responsive even when dependencies fail.
In Part 14, we’ll explore state machines and observer patterns.
State Machines and Observer Patterns
Complex async applications need sophisticated coordination mechanisms. Let’s build state machines and observer patterns step by step.
Basic State Machine Structure
Start with a simple state machine foundation:
from enum import Enum
import asyncio
class OrderState(Enum):
PENDING = "PENDING"
PROCESSING = "PROCESSING"
SHIPPED = "SHIPPED"
DELIVERED = "DELIVERED"
class AsyncStateMachine:
def __init__(self, initial_state):
self.current_state = initial_state
self.transitions = {}
self.lock = asyncio.Lock()
def add_transition(self, from_state, event, to_state, handler=None):
"""Register a state transition"""
self.transitions[(from_state, event)] = (to_state, handler)
This creates the basic structure for managing states and transitions safely.
Implementing State Transitions
Add the transition logic with proper error handling:
async def trigger(self, event: str, **kwargs):
"""Trigger a state transition"""
async with self.lock:
key = (self.current_state, event)
if key not in self.transitions:
raise ValueError(f"No transition from {self.current_state} on '{event}'")
new_state, handler = self.transitions[key]
# Execute transition handler if exists
if handler:
await handler(self.current_state, new_state, **kwargs)
# Update state
old_state = self.current_state
self.current_state = new_state
print(f"State transition: {old_state.value} -> {new_state.value}")
return new_state
The lock ensures thread-safe state transitions in concurrent environments.
Order Processing Example
Create a practical order processing state machine:
class OrderProcessor:
def __init__(self, order_id: str):
self.order_id = order_id
self.state_machine = AsyncStateMachine(OrderState.PENDING)
self._setup_transitions()
def _setup_transitions(self):
"""Configure valid state transitions"""
self.state_machine.add_transition(
OrderState.PENDING, "process", OrderState.PROCESSING,
self._handle_process
)
self.state_machine.add_transition(
OrderState.PROCESSING, "ship", OrderState.SHIPPED,
self._handle_ship
)
self.state_machine.add_transition(
OrderState.SHIPPED, "deliver", OrderState.DELIVERED,
self._handle_deliver
)
This defines the valid flow: pending → processing → shipped → delivered.
Transition Handlers
Implement the actual business logic for each transition:
async def _handle_process(self, from_state, to_state, **kwargs):
"""Handle order processing"""
print(f"📦 Processing order {self.order_id}")
await asyncio.sleep(1) # Simulate processing time
async def _handle_ship(self, from_state, to_state, **kwargs):
"""Handle order shipping"""
tracking = kwargs.get("tracking_number", "TRK123")
print(f"🚚 Shipping order {self.order_id} (Tracking: {tracking})")
await asyncio.sleep(0.5)
async def _handle_deliver(self, from_state, to_state, **kwargs):
"""Handle order delivery"""
print(f"📬 Delivering order {self.order_id}")
await asyncio.sleep(0.3)
# Public methods for triggering transitions
async def process_order(self):
await self.state_machine.trigger("process")
async def ship_order(self, tracking_number=None):
await self.state_machine.trigger("ship", tracking_number=tracking_number)
async def deliver_order(self):
await self.state_machine.trigger("deliver")
Each handler performs the actual work and can accept additional parameters.
Basic Observer Pattern
Create a simple event emitter for the observer pattern:
from typing import List, Callable
class AsyncEventEmitter:
def __init__(self):
self.listeners = {} # event_name -> list of functions
def on(self, event_name: str, listener: Callable):
"""Register event listener"""
if event_name not in self.listeners:
self.listeners[event_name] = []
self.listeners[event_name].append(listener)
async def emit(self, event_name: str, *args, **kwargs):
"""Emit event to all listeners"""
if event_name not in self.listeners:
return
# Run all listeners concurrently
tasks = [
listener(*args, **kwargs)
for listener in self.listeners[event_name]
]
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
This allows multiple components to react to events without tight coupling.
User Service with Events
Implement a service that emits events for important actions:
class UserService:
def __init__(self):
self.events = AsyncEventEmitter()
async def create_user(self, user_data: dict):
"""Create user and emit events"""
user = {"id": 123, **user_data}
# Emit event for other components to handle
await self.events.emit("user_created", user)
return user
async def update_user(self, user_id: int, updates: dict):
"""Update user and emit events"""
user = {"id": user_id, **updates}
await self.events.emit("user_updated", user)
return user
Services emit events without knowing who will handle them.
Event Listeners
Create focused event handlers for different concerns:
async def send_welcome_email(user: dict):
"""Send welcome email when user is created"""
print(f"📧 Sending welcome email to {user.get('email')}")
await asyncio.sleep(0.5) # Simulate email sending
print(f"Email sent to {user.get('email')}")
async def create_user_profile(user: dict):
"""Create user profile when user is created"""
print(f"👤 Creating profile for user {user['id']}")
await asyncio.sleep(0.3) # Simulate profile creation
print(f"Profile created for user {user['id']}")
async def log_user_activity(user: dict):
"""Log user activity"""
print(f"📝 Logging activity for user {user['id']}")
await asyncio.sleep(0.1) # Simulate logging
print(f"Activity logged")
Each listener handles one specific concern and can run independently.
Putting It All Together
Wire up the complete system:
async def demo_state_machine():
"""Demonstrate state machine"""
order = OrderProcessor("ORD-12345")
print(f"Initial state: {order.state_machine.current_state.value}")
await order.process_order()
await order.ship_order(tracking_number="TRK789")
await order.deliver_order()
print(f"Final state: {order.state_machine.current_state.value}")
async def demo_observer_pattern():
"""Demonstrate observer pattern"""
user_service = UserService()
# Register event listeners
user_service.events.on("user_created", send_welcome_email)
user_service.events.on("user_created", create_user_profile)
user_service.events.on("user_created", log_user_activity)
# Create user - triggers all listeners
user = await user_service.create_user({
"name": "John Doe",
"email": "[email protected]"
})
print(f"User created: {user}")
# Run demonstrations
asyncio.run(demo_state_machine())
asyncio.run(demo_observer_pattern())
This shows how both patterns work together in a complete system.
Summary
State machines and observer patterns provide powerful coordination:
State Machines
- Clear State Management: Explicit transitions and validation
- Business Logic: Handlers contain the actual work
- Thread Safety: Locks prevent race conditions
- Auditability: Track state changes easily
Observer Pattern
- Loose Coupling: Components don’t know about each other
- Extensibility: Easy to add new event handlers
- Concurrent Execution: All listeners run simultaneously
- Event-Driven: React to changes as they happen
Best Practices
- Keep transition handlers focused and simple
- Use events for cross-cutting concerns
- Implement proper error handling in listeners
- Design states and events to match business processes
These patterns enable building complex, maintainable async applications with clear separation of concerns.
Performance Profiling and Monitoring
Optimizing async applications requires understanding bottlenecks and monitoring performance. Let’s explore profiling techniques and real-time monitoring.
Basic Async Profiling
Profile async functions to identify bottlenecks:
import asyncio
import time
import cProfile
from functools import wraps
def async_profile(func):
"""Decorator to profile async functions"""
@wraps(func)
async def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
start_time = time.time()
result = await func(*args, **kwargs)
end_time = time.time()
pr.disable()
print(f"Function {func.__name__} took {end_time - start_time:.3f} seconds")
# Show top time consumers
import pstats
stats = pstats.Stats(pr)
stats.sort_stats('cumulative')
stats.print_stats(5) # Top 5 functions
return result
return wrapper
This decorator profiles both the async function execution time and the CPU-intensive operations within it. The profiler shows which functions consume the most time.
Use the profiler on your async functions:
@async_profile
async def slow_function():
"""Function to profile"""
await asyncio.sleep(0.1)
total = sum(i * i for i in range(10000)) # CPU work
await asyncio.sleep(0.05)
return total
asyncio.run(slow_function())
Event Loop Monitoring
Monitor event loop health in real-time:
from collections import deque
class EventLoopMonitor:
def __init__(self, sample_interval=1.0):
self.sample_interval = sample_interval
self.metrics = {
'blocked_time': deque(maxlen=60),
'task_count': deque(maxlen=60)
}
self.monitoring = False
The monitor tracks event loop responsiveness and task counts over time. Using deques with maxlen keeps memory usage bounded.
Start monitoring and measure loop responsiveness:
async def start_monitoring(self):
"""Start monitoring the event loop"""
self.monitoring = True
asyncio.create_task(self._monitor_loop())
async def _monitor_loop(self):
"""Monitor loop performance"""
while self.monitoring:
start_time = time.time()
# Measure loop responsiveness
await asyncio.sleep(0) # Yield to other tasks
end_time = time.time()
blocked_time = end_time - start_time
# Collect metrics
self.metrics['blocked_time'].append(blocked_time)
# Count tasks
all_tasks = asyncio.all_tasks()
self.metrics['task_count'].append(len(all_tasks))
await asyncio.sleep(self.sample_interval)
The key insight: await asyncio.sleep(0)
should return almost immediately in a healthy event loop. If it takes longer, the loop is blocked by CPU-intensive work.
Generate performance statistics:
def get_stats(self):
"""Get performance statistics"""
if not self.metrics['blocked_time']:
return {}
blocked_times = list(self.metrics['blocked_time'])
task_counts = list(self.metrics['task_count'])
return {
'avg_blocked_time': sum(blocked_times) / len(blocked_times),
'max_blocked_time': max(blocked_times),
'avg_task_count': sum(task_counts) / len(task_counts),
'max_task_count': max(task_counts)
}
Application Metrics Collection
Collect comprehensive application metrics:
import psutil
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class Metrics:
timestamp: float
cpu_percent: float
memory_mb: float
active_tasks: int
requests_per_second: float
The Metrics dataclass provides a clean structure for performance data. Using dataclasses makes the code more readable and type-safe.
Set up the metrics collector:
class AsyncMetricsCollector:
def __init__(self):
self.request_times = deque(maxlen=1000)
self.task_counts = defaultdict(int)
self.process = psutil.Process()
def record_request(self, duration: float):
"""Record request completion time"""
self.request_times.append((time.time(), duration))
def record_task_completion(self, success: bool = True):
"""Record task completion"""
if success:
self.task_counts['completed'] += 1
else:
self.task_counts['failed'] += 1
The collector tracks request timing and task completion rates. Using deque with maxlen prevents memory growth over time.
Generate comprehensive metrics:
def get_current_metrics(self) -> Metrics:
"""Get current system and application metrics"""
now = time.time()
# System metrics
cpu_percent = self.process.cpu_percent()
memory_mb = self.process.memory_info().rss / 1024 / 1024
# Task metrics
all_tasks = asyncio.all_tasks()
active_tasks = len([t for t in all_tasks if not t.done()])
# Request metrics
recent_requests = [
(ts, duration) for ts, duration in self.request_times
if now - ts < 60 # Last minute
]
requests_per_second = len(recent_requests) / 60 if recent_requests else 0
return Metrics(
timestamp=now,
cpu_percent=cpu_percent,
memory_mb=memory_mb,
active_tasks=active_tasks,
requests_per_second=requests_per_second
)
This method combines system metrics (CPU, memory) with application metrics (tasks, requests) for a complete performance picture.
Create a monitoring decorator:
# Monitoring decorator
def monitor_async_function(metrics_collector: AsyncMetricsCollector):
"""Decorator to monitor async function performance"""
def decorator(func):
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
metrics_collector.record_task_completion(success=True)
return result
except Exception as e:
metrics_collector.record_task_completion(success=False)
raise
finally:
duration = time.time() - start_time
metrics_collector.record_request(duration)
return wrapper
return decorator
Performance Alerting
Implement alerting for performance issues:
from dataclasses import dataclass
@dataclass
class Alert:
level: str # "warning", "critical"
message: str
metric_name: str
current_value: float
threshold: float
The Alert dataclass structures alert information for consistent handling across different alert channels.
Set up the alerting system:
class PerformanceAlerter:
def __init__(self):
self.thresholds = {}
self.alert_handlers = []
def add_threshold(self, metric_name: str, warning: float, critical: float):
"""Add performance threshold"""
self.thresholds[metric_name] = {
'warning': warning,
'critical': critical
}
def add_alert_handler(self, handler):
"""Add alert handler function"""
self.alert_handlers.append(handler)
The alerter supports multiple thresholds and handlers, making it flexible for different notification channels (email, Slack, logs).
Check metrics against thresholds:
async def check_metrics(self, metrics: Metrics):
"""Check metrics against thresholds"""
alerts = []
# Check CPU usage
if 'cpu_percent' in self.thresholds:
alerts.extend(self._check_threshold('cpu_percent', metrics.cpu_percent))
# Check memory usage
if 'memory_mb' in self.thresholds:
alerts.extend(self._check_threshold('memory_mb', metrics.memory_mb))
# Send alerts
for alert in alerts:
await self._send_alert(alert)
The system checks each configured metric and generates alerts when thresholds are exceeded.
Implement threshold checking logic:
def _check_threshold(self, metric_name: str, current_value: float):
"""Check if metric exceeds thresholds"""
alerts = []
thresholds = self.thresholds[metric_name]
if current_value >= thresholds['critical']:
alerts.append(Alert(
level="critical",
message=f"{metric_name} is critically high: {current_value:.2f}",
metric_name=metric_name,
current_value=current_value,
threshold=thresholds['critical']
))
elif current_value >= thresholds['warning']:
alerts.append(Alert(
level="warning",
message=f"{metric_name} above warning: {current_value:.2f}",
metric_name=metric_name,
current_value=current_value,
threshold=thresholds['warning']
))
return alerts
Critical alerts take precedence over warnings. This prevents alert spam when metrics are extremely high.
Handle alert delivery:
async def _send_alert(self, alert: Alert):
"""Send alert to all handlers"""
for handler in self.alert_handlers:
try:
await handler(alert)
except Exception as e:
print(f"Alert handler failed: {e}")
# Alert handlers
async def log_alert(alert: Alert):
"""Log alert to console"""
level_icon = "🚨" if alert.level == "critical" else "⚠️"
print(f"{level_icon} {alert.level.upper()}: {alert.message}")
Complete Monitoring Demo
Put everything together:
async def monitoring_demo():
"""Demonstrate complete monitoring system"""
collector = AsyncMetricsCollector()
alerter = PerformanceAlerter()
# Configure thresholds
alerter.add_threshold('cpu_percent', warning=70.0, critical=90.0)
alerter.add_threshold('memory_mb', warning=500.0, critical=1000.0)
# Add alert handlers
alerter.add_alert_handler(log_alert)
Set up the monitoring infrastructure with appropriate thresholds for your application’s normal operating parameters.
Create a monitored task for demonstration:
@monitor_async_function(collector)
async def sample_task(task_id: int):
"""Sample task with monitoring"""
await asyncio.sleep(0.1)
# Simulate occasional failures
import random
if random.random() < 0.1:
raise Exception(f"Task {task_id} failed")
return f"Task {task_id} completed"
The decorator automatically tracks execution time and success/failure rates for any async function.
Run the monitoring system:
# Run monitored tasks
for i in range(20):
try:
await sample_task(i)
except Exception:
pass
# Check metrics periodically
if i % 5 == 0:
metrics = collector.get_current_metrics()
await alerter.check_metrics(metrics)
print(f"Metrics - CPU: {metrics.cpu_percent:.1f}%, "
f"Memory: {metrics.memory_mb:.1f}MB, "
f"Tasks: {metrics.active_tasks}")
asyncio.run(monitoring_demo())
Summary
Performance profiling and monitoring are essential for async applications:
Key Components
- Profiling: Identify bottlenecks with cProfile and custom decorators
- Event Loop Monitoring: Track loop health and responsiveness
- Metrics Collection: Gather system and application metrics
- Alerting: Notify on performance issues
Best Practices
- Profile regularly to identify performance regressions
- Monitor event loop responsiveness
- Set appropriate thresholds for alerts
- Collect both system and application metrics
- Use sampling to avoid monitoring overhead
In Part 16, we’ll explore memory optimization and I/O optimization techniques.
Memory and I/O Optimization
Efficient memory and I/O management are crucial for high-performance async applications. Let’s explore key optimization techniques.
Memory Optimization
Manage memory efficiently in async applications:
import asyncio
import gc
import weakref
from typing import AsyncGenerator
class MemoryEfficientProcessor:
def __init__(self):
self._cache = weakref.WeakValueDictionary()
self._processed_count = 0
Weak references allow objects to be garbage collected even when cached, preventing memory leaks in long-running applications.
Process data in batches to control memory usage:
async def process_data_stream(self, data_source: AsyncGenerator):
"""Process data stream with memory optimization"""
batch = []
batch_size = 1000
async for item in data_source:
batch.append(item)
if len(batch) >= batch_size:
await self._process_batch(batch)
batch.clear() # Clear batch to free memory
# Periodic garbage collection
if self._processed_count % 10000 == 0:
gc.collect()
# Process remaining items
if batch:
await self._process_batch(batch)
Batching prevents memory from growing unbounded when processing large datasets. Explicit garbage collection helps in memory-constrained environments.
Handle batch processing with weak reference caching:
async def _process_batch(self, batch):
"""Process a batch of items"""
# Simulate processing
await asyncio.sleep(0.01)
self._processed_count += len(batch)
# Use weak references for caching
for item in batch:
if hasattr(item, 'id'):
self._cache[item.id] = item
# Usage
processor = MemoryEfficientProcessor()
async def data_generator():
"""Generate data efficiently"""
for i in range(100000):
yield {"id": i, "data": f"item_{i}"}
async def main():
await processor.process_data_stream(data_generator())
I/O Optimization
Optimize file and network I/O operations:
import asyncio
import aiofiles
import aiohttp
class IOOptimizer:
def __init__(self, max_concurrent_requests=10):
self.semaphore = asyncio.Semaphore(max_concurrent_requests)
self.session = None
async def get_session(self):
"""Get or create HTTP session"""
if not self.session:
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=30,
keepalive_timeout=30
)
self.session = aiohttp.ClientSession(connector=connector)
return self.session
async def fetch_with_optimization(self, url: str):
"""Fetch URL with connection pooling and rate limiting"""
async with self.semaphore:
session = await self.get_session()
async with session.get(url) as response:
return await response.text()
async def read_file_efficiently(self, filename: str, chunk_size: int = 8192):
"""Read large files efficiently"""
async with aiofiles.open(filename, 'rb') as file:
while chunk := await file.read(chunk_size):
yield chunk
async def close(self):
"""Clean up resources"""
if self.session:
await self.session.close()
Connection Pooling
Implement efficient connection pooling:
import asyncio
import asyncpg
from contextlib import asynccontextmanager
class DatabasePool:
def __init__(self, database_url: str, min_size: int = 10, max_size: int = 20):
self.database_url = database_url
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def initialize(self):
"""Initialize connection pool"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=60
)
async def execute_query(self, query: str, *args):
"""Execute query using connection pool"""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def close(self):
"""Close connection pool"""
if self.pool:
await self.pool.close()
Buffering and Batching
Optimize data processing with buffering:
import asyncio
from collections import deque
class BufferedProcessor:
def __init__(self, batch_size: int = 100, flush_interval: float = 1.0):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer = deque()
self._running = False
async def start(self):
"""Start the buffered processor"""
self._running = True
asyncio.create_task(self._periodic_flush())
async def add_item(self, item):
"""Add item to buffer"""
self.buffer.append(item)
if len(self.buffer) >= self.batch_size:
await self._flush_buffer()
async def _periodic_flush(self):
"""Periodically flush buffer"""
while self._running:
await asyncio.sleep(self.flush_interval)
if self.buffer:
await self._flush_buffer()
async def _flush_buffer(self):
"""Flush current buffer"""
if not self.buffer:
return
items = list(self.buffer)
self.buffer.clear()
# Process batch
print(f"Processed batch of {len(items)} items")
async def stop(self):
"""Stop processor and flush remaining items"""
self._running = False
await self._flush_buffer()
Best Practices
Key optimization principles:
Memory Management:
- Use weak references for caches
- Clear collections explicitly
- Implement periodic garbage collection
- Monitor memory usage continuously
I/O Optimization:
- Reuse connections and sessions
- Implement connection pooling
- Use appropriate buffer sizes
- Batch operations when possible
Resource Management:
- Set proper limits and timeouts
- Clean up resources in finally blocks
- Use context managers for resource handling
- Monitor resource usage
Summary
Memory and I/O optimization techniques:
- Implement efficient memory management with weak references and garbage collection
- Use connection pooling for database and HTTP operations
- Buffer and batch operations for better throughput
- Monitor performance continuously
- Clean up resources properly
Proper optimization ensures your async applications perform efficiently under load.
In Part 17, we’ll explore CPU-bound task optimization.
CPU-Bound Task Optimization
Async programming shines with I/O-bound tasks, but CPU-intensive work presents a different challenge. The event loop can’t help when your CPU is genuinely busy crunching numbers or processing data. Here’s how to handle CPU-bound operations without blocking your async application.
Process Pools for CPU-Intensive Work
When you need serious computational power:
import asyncio
import concurrent.futures
import multiprocessing
def cpu_intensive_task(data: list, multiplier: int = 2):
"""CPU-intensive task that should run in separate process"""
result = []
for item in data:
# Simulate CPU-intensive work
value = sum(i * multiplier for i in range(item * 1000))
result.append(value)
return result
async def optimize_cpu_bound_work():
"""Optimize CPU-bound work using process pools"""
# Prepare data chunks
data_chunks = [
list(range(10, 20)),
list(range(20, 30)),
list(range(30, 40)),
list(range(40, 50))
]
# Use process pool for CPU-bound work
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
loop = asyncio.get_event_loop()
# Submit tasks to process pool
tasks = [
loop.run_in_executor(executor, cpu_intensive_task, chunk)
for chunk in data_chunks
]
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
return results
# Usage
async def main():
results = await optimize_cpu_bound_work()
print(f"Processed {len(results)} chunks")
if __name__ == "__main__":
asyncio.run(main())
Thread Pools for Mixed Workloads
Use thread pools when you have mixed I/O and CPU work:
import asyncio
import concurrent.futures
import requests
import json
def process_api_response(response_data: dict):
"""CPU-intensive processing of API response"""
# Simulate complex data processing
processed = {}
for key, value in response_data.items():
if isinstance(value, (int, float)):
processed[key] = value ** 2
elif isinstance(value, str):
processed[key] = len(value)
else:
processed[key] = str(value)
return processed
async def fetch_and_process_data(url: str):
"""Fetch data and process it using thread pool"""
# I/O-bound: fetch data asynchronously
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
# CPU-bound: process data in thread pool
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
processed_data = await loop.run_in_executor(
executor, process_api_response, data
)
return processed_data
async def process_multiple_apis():
"""Process multiple APIs concurrently"""
urls = [
"https://api1.example.com/data",
"https://api2.example.com/data",
"https://api3.example.com/data"
]
tasks = [fetch_and_process_data(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Async Generators for Large Datasets
Process large datasets efficiently:
import asyncio
import aiofiles
async def process_large_file(filename: str):
"""Process large file line by line"""
batch = []
batch_size = 1000
async with aiofiles.open(filename, 'r') as file:
async for line in file:
processed_line = line.strip().upper()
batch.append(processed_line)
if len(batch) >= batch_size:
# Process batch in thread pool
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor, lambda: "\n".join(batch)
)
yield result
batch = []
# Process remaining items
if batch:
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor, lambda: "\n".join(batch)
)
yield result
Choosing the Right Approach
Select the appropriate optimization strategy:
Process Pool: True CPU-bound tasks, can utilize multiple CPU cores
Thread Pool: Mixed I/O and CPU work, lower overhead than processes
Async Generators: Large datasets, memory-efficient streaming processing
What Actually Works in Production
From experience optimizing CPU-bound async applications:
1. Profile First
- Identify actual bottlenecks before optimizing
- Use profiling tools to measure performance
- Focus on the most time-consuming operations
2. Choose the Right Tool
- Process pools for true CPU-bound work
- Thread pools for mixed I/O and CPU work
- Async generators for large datasets
3. Batch Processing
- Process data in batches to reduce overhead
- Balance batch size with memory usage
- Use appropriate batch sizes for your use case
4. Resource Management
- Limit the number of worker processes/threads
- Monitor system resources during processing
- Clean up resources properly
Summary
Optimizing CPU-bound tasks in async applications:
- Use process pools for true CPU-intensive work
- Use thread pools for mixed I/O and CPU workloads
- Implement async generators for large dataset processing
- Monitor performance and resource usage
- Choose the right approach based on your specific use case
Proper CPU-bound optimization ensures your async applications can handle computationally intensive work efficiently.
In Part 18, we’ll explore WebSockets and real-time features.
WebSockets and Real-time Features
WebSockets enable real-time, bidirectional communication between clients and servers. Let’s explore implementing WebSocket servers and clients with async Python.
Basic WebSocket Server
Create a WebSocket server with FastAPI:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio
import json
app = FastAPI()
First, we need a connection manager to handle multiple WebSocket connections:
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
The connection manager maintains a list of active WebSocket connections. When clients connect, we accept the connection and add it to our list.
For messaging, we need both personal and broadcast capabilities:
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
# Remove dead connections
self.active_connections.remove(connection)
manager = ConnectionManager()
Broadcasting handles connection failures gracefully by removing dead connections. This prevents the server from trying to send to closed connections.
Now we can create the WebSocket endpoint:
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
# Receive message from client
data = await websocket.receive_text()
message = json.loads(data)
# Process message based on type
if message["type"] == "broadcast":
await manager.broadcast(f"Client {client_id}: {message['content']}")
elif message["type"] == "echo":
await manager.send_personal_message(
f"Echo: {message['content']}", websocket
)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client {client_id} disconnected")
WebSocket Client
Create a WebSocket client:
import asyncio
import websockets
import json
class WebSocketClient:
def __init__(self, uri: str):
self.uri = uri
self.websocket = None
async def connect(self):
self.websocket = await websockets.connect(self.uri)
async def send_message(self, message_type: str, content: str):
if self.websocket:
message = json.dumps({"type": message_type, "content": content})
await self.websocket.send(message)
async def listen_for_messages(self):
if not self.websocket:
return
try:
async for message in self.websocket:
print(f"Received: {message}")
except websockets.exceptions.ConnectionClosed:
print("Connection closed")
async def close(self):
if self.websocket:
await self.websocket.close()
Real-time Chat Application
Build a simple chat system:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json
from datetime import datetime
class ChatRoom:
def __init__(self):
self.connections = {}
self.message_history = []
async def add_user(self, websocket: WebSocket, username: str):
await websocket.accept()
self.connections[username] = websocket
# Send recent messages to new user
for message in self.message_history[-10:]:
await websocket.send_text(json.dumps(message))
def remove_user(self, username: str):
if username in self.connections:
del self.connections[username]
async def broadcast_message(self, message: dict):
"""Broadcast message to all connected users"""
self.message_history.append(message)
message_json = json.dumps(message)
dead_connections = []
for username, websocket in self.connections.items():
try:
await websocket.send_text(message_json)
except:
dead_connections.append(username)
# Clean up dead connections
for username in dead_connections:
self.remove_user(username)
chat_room = ChatRoom()
@app.websocket("/chat/{username}")
async def chat_endpoint(websocket: WebSocket, username: str):
await chat_room.add_user(websocket, username)
try:
while True:
data = await websocket.receive_text()
message_data = json.loads(data)
message = {
"username": username,
"content": message_data["content"],
"timestamp": datetime.now().isoformat()
}
await chat_room.broadcast_message(message)
except WebSocketDisconnect:
chat_room.remove_user(username)
Real-time Data Streaming
Stream live data to clients:
import asyncio
import json
import random
from datetime import datetime
class DataStreamer:
def __init__(self):
self.subscribers = set()
def subscribe(self, websocket):
self.subscribers.add(websocket)
def unsubscribe(self, websocket):
self.subscribers.discard(websocket)
async def start_streaming(self):
"""Start streaming data to all subscribers"""
while True:
# Generate sample data
data = {
"timestamp": datetime.now().isoformat(),
"temperature": round(random.uniform(20, 30), 2),
"humidity": round(random.uniform(40, 80), 2)
}
# Send to all subscribers
dead_connections = set()
for websocket in self.subscribers:
try:
await websocket.send_text(json.dumps(data))
except:
dead_connections.add(websocket)
# Remove dead connections
self.subscribers -= dead_connections
await asyncio.sleep(1)
streamer = DataStreamer()
@app.websocket("/stream")
async def stream_endpoint(websocket: WebSocket):
await websocket.accept()
streamer.subscribe(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
streamer.unsubscribe(websocket)
Best Practices
Key WebSocket implementation principles:
Connection Management:
- Track active connections properly
- Clean up dead connections automatically
- Handle disconnections gracefully
Message Handling:
- Use JSON for structured messages
- Implement message types for different actions
- Validate incoming messages
Error Handling:
- Implement reconnection logic on client side
- Use try-catch blocks around WebSocket operations
- Log connection events for debugging
Performance:
- Limit message history size
- Remove inactive connections promptly
- Use connection pooling for multiple clients
Summary
WebSocket implementation essentials:
- Use FastAPI or similar frameworks for WebSocket servers
- Implement proper connection management
- Handle disconnections and errors gracefully
- Use structured messages with JSON
- Implement reconnection logic for robust clients
- Clean up resources properly
WebSockets enable powerful real-time features in async applications, from chat systems to live data streaming.
In Part 19, we’ll explore health checks and monitoring.
Health Checks and Monitoring
Health checks and monitoring are essential for maintaining reliable async applications in production. Let’s explore implementing comprehensive health monitoring.
Basic Health Checks
Implement fundamental health check endpoints:
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
import asyncio
import time
import psutil
from typing import Dict, Any
app = FastAPI()
class HealthChecker:
def __init__(self):
self.start_time = time.time()
self.checks = {}
def register_check(self, name: str, check_func):
"""Register a health check function"""
self.checks[name] = check_func
async def run_checks(self) -> Dict[str, Any]:
"""Run all registered health checks"""
results = {
"status": "healthy",
"timestamp": time.time(),
"uptime": time.time() - self.start_time,
"checks": {}
}
overall_healthy = True
for name, check_func in self.checks.items():
try:
check_result = await check_func()
results["checks"][name] = {
"status": "healthy" if check_result else "unhealthy",
"details": check_result
}
if not check_result:
overall_healthy = False
except Exception as e:
results["checks"][name] = {
"status": "error",
"error": str(e)
}
overall_healthy = False
results["status"] = "healthy" if overall_healthy else "unhealthy"
return results
health_checker = HealthChecker()
@app.get("/health")
async def health_check():
"""Main health check endpoint"""
results = await health_checker.run_checks()
status_code = 200 if results["status"] == "healthy" else 503
return JSONResponse(content=results, status_code=status_code)
@app.get("/health/live")
async def liveness_check():
"""Kubernetes liveness probe"""
return {"status": "alive", "timestamp": time.time()}
@app.get("/health/ready")
async def readiness_check():
"""Kubernetes readiness probe"""
results = await health_checker.run_checks()
if results["status"] == "healthy":
return {"status": "ready", "timestamp": time.time()}
else:
raise HTTPException(status_code=503, detail="Service not ready")
Database Health Checks
Monitor database connectivity:
import asyncpg
import aioredis
async def check_database():
"""Check PostgreSQL database connectivity"""
try:
conn = await asyncpg.connect("postgresql://user:pass@localhost/db")
result = await conn.fetchval("SELECT 1")
await conn.close()
return {"connected": True, "query_result": result}
except Exception as e:
return {"connected": False, "error": str(e)}
async def check_redis():
"""Check Redis connectivity"""
try:
redis = aioredis.from_url("redis://localhost:6379")
pong = await redis.ping()
await redis.close()
return {"connected": True, "ping": pong}
except Exception as e:
return {"connected": False, "error": str(e)}
# Register database checks
health_checker.register_check("database", check_database)
health_checker.register_check("redis", check_redis)
External Service Health Checks
Monitor external dependencies:
import aiohttp
import asyncio
async def check_external_api():
"""Check external API availability"""
try:
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get("https://api.example.com/health") as response:
return {
"available": response.status == 200,
"status_code": response.status
}
except asyncio.TimeoutError:
return {"available": False, "error": "timeout"}
except Exception as e:
return {"available": False, "error": str(e)}
async def check_message_queue():
"""Check message queue connectivity"""
try:
# Simulate queue check
await asyncio.sleep(0.1)
return {
"connected": True,
"queue_size": 42
}
except Exception as e:
return {"connected": False, "error": str(e)}
# Register external service checks
health_checker.register_check("external_api", check_external_api)
health_checker.register_check("message_queue", check_message_queue)
System Resource Monitoring
Monitor system resources:
import psutil
import asyncio
async def check_system_resources():
"""Check system resource usage"""
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"disk_percent": (disk.used / disk.total) * 100,
"healthy": (
cpu_percent < 80 and
memory.percent < 85 and
(disk.used / disk.total) * 100 < 90
)
}
except Exception as e:
return {"healthy": False, "error": str(e)}
async def check_application_metrics():
"""Check application-specific metrics"""
try:
active_tasks = len(asyncio.all_tasks())
return {
"active_tasks": active_tasks,
"healthy": active_tasks < 100
}
except Exception as e:
return {"healthy": False, "error": str(e)}
# Register system checks
health_checker.register_check("system_resources", check_system_resources)
health_checker.register_check("application_metrics", check_application_metrics)
Health Monitoring That Actually Helps
Hard-won insights from monitoring async applications in production:
Health Check Design:
- Implement both liveness and readiness probes
- Keep checks lightweight and fast
- Test actual dependencies, not just connectivity
- Return appropriate HTTP status codes
Monitoring Strategy:
- Monitor both technical and business metrics
- Set up alerting for critical failures
- Track performance trends over time
- Use structured logging for better analysis
Production Considerations:
- Set reasonable timeouts for health checks
- Implement circuit breakers for external dependencies
- Cache health check results when appropriate
- Monitor the health check endpoints themselves
Summary
Health monitoring essentials:
- Implement comprehensive health check endpoints for all dependencies
- Monitor system resources and application metrics
- Use structured logging and integrate with monitoring systems
- Set up appropriate alerting for critical failures
- Design checks to be fast, reliable, and informative
Proper health monitoring ensures early detection of issues and maintains application reliability in production.
In Part 20, we’ll explore graceful shutdown and resource cleanup.
Graceful Shutdown and Resource Cleanup
Proper shutdown handling ensures your async applications clean up resources gracefully and don’t lose data. Let’s explore shutdown patterns and cleanup strategies.
Signal Handling
Handle shutdown signals properly:
import asyncio
import signal
import logging
from typing import Set
class GracefulShutdown:
def __init__(self):
self.shutdown_event = asyncio.Event()
self.running_tasks: Set[asyncio.Task] = set()
self.cleanup_callbacks = []
The shutdown handler tracks running tasks and cleanup callbacks. The event signals when shutdown should begin.
Set up signal handlers to catch termination signals:
def setup_signal_handlers(self):
"""Setup signal handlers for graceful shutdown"""
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals"""
logging.info(f"Received signal {signum}, initiating graceful shutdown...")
self.shutdown_event.set()
async def wait_for_shutdown(self):
"""Wait for shutdown signal"""
await self.shutdown_event.wait()
SIGTERM comes from process managers like systemd, while SIGINT comes from Ctrl+C. Both trigger the same graceful shutdown process.
Add cleanup callbacks and manage tasks:
def add_cleanup_callback(self, callback):
"""Add cleanup callback to be called during shutdown"""
self.cleanup_callbacks.append(callback)
async def cleanup(self):
"""Perform cleanup operations"""
logging.info("Starting cleanup process...")
# Cancel running tasks
if self.running_tasks:
logging.info(f"Cancelling {len(self.running_tasks)} running tasks...")
for task in self.running_tasks:
task.cancel()
# Wait for tasks to complete cancellation
await asyncio.gather(*self.running_tasks, return_exceptions=True)
# Run cleanup callbacks
for callback in self.cleanup_callbacks:
try:
if asyncio.iscoroutinefunction(callback):
await callback()
else:
callback()
except Exception as e:
logging.error(f"Error in cleanup callback: {e}")
logging.info("Cleanup completed")
# Global shutdown handler
shutdown_handler = GracefulShutdown()
FastAPI Graceful Shutdown
Implement graceful shutdown in FastAPI applications:
from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio
import logging
# Application state
app_state = {
"database_pool": None,
"redis_connection": None,
"background_tasks": set()
}
FastAPI’s lifespan context manager handles startup and shutdown events. We track resources in app_state for proper cleanup.
The lifespan function manages the application lifecycle:
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logging.info("Starting application...")
# Initialize resources
app_state["database_pool"] = await create_database_pool()
app_state["redis_connection"] = await create_redis_connection()
# Start background tasks
task = asyncio.create_task(background_worker())
app_state["background_tasks"].add(task)
yield
# Shutdown
logging.info("Shutting down application...")
# Cancel background tasks
for task in app_state["background_tasks"]:
task.cancel()
await asyncio.gather(*app_state["background_tasks"], return_exceptions=True)
# Close connections
if app_state["database_pool"]:
await app_state["database_pool"].close()
if app_state["redis_connection"]:
await app_state["redis_connection"].close()
app = FastAPI(lifespan=lifespan)
Everything before yield
runs at startup, everything after runs at shutdown. This ensures proper resource management throughout the application lifecycle.
Background tasks need to handle cancellation gracefully:
async def background_worker():
"""Background task that needs graceful shutdown"""
try:
while True:
await asyncio.sleep(1)
logging.info("Background worker running...")
except asyncio.CancelledError:
logging.info("Background worker cancelled, cleaning up...")
raise
Database Connection Cleanup
Handle database connections properly:
import asyncio
import asyncpg
from contextlib import asynccontextmanager
class DatabaseManager:
def __init__(self, database_url: str):
self.database_url = database_url
self.pool = None
async def initialize(self):
"""Initialize database pool"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=5,
max_size=20
)
Connection pools are essential for async applications. They manage multiple database connections efficiently and handle connection lifecycle automatically.
Provide safe connection access with automatic cleanup:
@asynccontextmanager
async def get_connection(self):
"""Get database connection with automatic cleanup"""
connection = await self.pool.acquire()
try:
yield connection
finally:
await self.pool.release(connection)
The context manager ensures connections are always returned to the pool, even if exceptions occur during database operations.
Handle shutdown cleanup:
async def close_all_connections(self):
"""Close all database connections"""
if self.pool:
await self.pool.close()
logging.info("Database pool closed")
# Usage
db_manager = DatabaseManager("postgresql://user:pass@localhost/db")
shutdown_handler.add_cleanup_callback(db_manager.close_all_connections)
File and Resource Cleanup
Clean up files and other resources:
import asyncio
import aiofiles
import tempfile
import os
class ResourceManager:
def __init__(self):
self.temp_files = set()
self.open_files = set()
Track temporary files and open file handles to ensure they’re cleaned up during shutdown.
Create and track temporary files:
async def create_temp_file(self, suffix=".tmp"):
"""Create temporary file with automatic cleanup"""
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
temp_file.close()
self.temp_files.add(temp_file.name)
return temp_file.name
async def open_file(self, filename: str, mode: str = 'r'):
"""Open file with tracking for cleanup"""
file_handle = await aiofiles.open(filename, mode)
self.open_files.add(file_handle)
return file_handle
By tracking file operations, we can ensure proper cleanup even if the application terminates unexpectedly.
Perform comprehensive resource cleanup:
async def cleanup_resources(self):
"""Clean up all managed resources"""
# Close open files
for file_handle in list(self.open_files):
try:
await file_handle.close()
except Exception as e:
logging.error(f"Error closing file: {e}")
# Remove temporary files
for temp_file in self.temp_files:
try:
if os.path.exists(temp_file):
os.unlink(temp_file)
except Exception as e:
logging.error(f"Error removing temp file: {e}")
# Global resource manager
resource_manager = ResourceManager()
shutdown_handler.add_cleanup_callback(resource_manager.cleanup_resources)
Best Practices
Key principles for graceful shutdown:
Signal Handling:
- Handle SIGTERM and SIGINT signals
- Set shutdown flags instead of immediate exit
- Log shutdown initiation
Task Management:
- Track all running tasks
- Cancel tasks gracefully
- Wait for task completion with timeout
Resource Cleanup:
- Close database connections
- Clean up temporary files
- Release network connections
- Save application state if needed
Error Handling:
- Handle cleanup errors gracefully
- Log cleanup progress
- Don’t let cleanup errors prevent shutdown
Summary
Graceful shutdown essentials:
- Implement proper signal handling for shutdown events
- Track and cancel running tasks during shutdown
- Clean up resources (databases, files, connections) properly
- Use FastAPI lifespan events for web applications
- Log shutdown progress for debugging
- Handle cleanup errors without blocking shutdown
Proper shutdown handling ensures data integrity and resource cleanup in production async applications.
In Part 21, we’ll explore containerization and deployment strategies.
Containerization and Deployment
Moving async applications to production involves more than just “docker build && docker run”. Async apps have specific needs around resource limits, signal handling, and graceful shutdowns that can make or break your deployment.
Docker Configuration
Build containers that work well with async applications:
# Multi-stage Dockerfile for async Python application
FROM python:3.11-slim as builder
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1
The builder stage handles dependency installation. We use environment variables to optimize Python for containers - no buffering, no .pyc files, and no pip cache.
Install system dependencies and create a virtual environment:
# Install dependencies
RUN apt-get update && apt-get install -y build-essential \
&& rm -rf /var/lib/apt/lists/*
# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
Using a virtual environment in containers might seem redundant, but it provides better isolation and makes the final image cleaner.
The production stage creates a minimal runtime environment:
# Production stage
FROM python:3.11-slim
ENV PYTHONUNBUFFERED=1 \
PATH="/opt/venv/bin:$PATH"
# Copy virtual environment from builder
COPY --from=builder /opt/venv /opt/venv
Multi-stage builds keep the final image small by excluding build tools and intermediate files.
Security and application setup:
# Create non-root user
RUN useradd --create-home --shell /bin/bash app
USER app
WORKDIR /home/app
# Copy application code
COPY --chown=app:app . .
# Expose port
EXPOSE 8000
Running as a non-root user is crucial for security. The --chown
flag ensures the app user owns the files.
Health checks and startup:
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Start application
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Application Configuration
Configure your async application for containerized environments:
import os
import asyncio
import signal
from contextlib import asynccontextmanager
from fastapi import FastAPI
class AppConfig:
def __init__(self):
self.host = os.getenv("HOST", "0.0.0.0")
self.port = int(os.getenv("PORT", 8000))
self.workers = int(os.getenv("WORKERS", 1))
self.max_connections = int(os.getenv("MAX_CONNECTIONS", 1000))
self.keepalive_timeout = int(os.getenv("KEEPALIVE_TIMEOUT", 5))
config = AppConfig()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print("Starting async application...")
# Initialize resources
await initialize_database_pool()
await initialize_redis_connection()
yield
# Shutdown
print("Shutting down async application...")
await cleanup_database_pool()
await cleanup_redis_connection()
app = FastAPI(lifespan=lifespan)
@app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": asyncio.get_event_loop().time()}
async def initialize_database_pool():
# Initialize database connection pool
pass
async def cleanup_database_pool():
# Clean up database connections
pass
async def initialize_redis_connection():
# Initialize Redis connection
pass
async def cleanup_redis_connection():
# Clean up Redis connection
pass
Kubernetes Deployment
Deploy with Kubernetes for scalability:
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: async-app
labels:
app: async-app
spec:
replicas: 3
selector:
matchLabels:
app: async-app
template:
metadata:
labels:
app: async-app
spec:
containers:
- name: async-app
image: async-app:latest
ports:
- containerPort: 8000
env:
- name: MAX_CONNECTIONS
value: "1000"
- name: WORKERS
value: "1"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: async-app-service
spec:
selector:
app: async-app
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
Environment Variables
Configure your application through environment variables:
import os
class Settings:
# Database
DATABASE_URL: str = os.getenv("DATABASE_URL", "postgresql://localhost/mydb")
DATABASE_POOL_SIZE: int = int(os.getenv("DATABASE_POOL_SIZE", "10"))
# Redis
REDIS_URL: str = os.getenv("REDIS_URL", "redis://localhost:6379")
REDIS_MAX_CONNECTIONS: int = int(os.getenv("REDIS_MAX_CONNECTIONS", "10"))
# Application
DEBUG: bool = os.getenv("DEBUG", "false").lower() == "true"
LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
# Performance
MAX_WORKERS: int = int(os.getenv("MAX_WORKERS", "4"))
REQUEST_TIMEOUT: int = int(os.getenv("REQUEST_TIMEOUT", "30"))
settings = Settings()
Production Checklist
Before deploying to production:
Security:
- Run as non-root user
- Use secrets management for sensitive data
- Enable HTTPS/TLS
- Implement rate limiting
Performance:
- Set appropriate resource limits
- Configure connection pooling
- Enable compression
Monitoring:
- Health check endpoints
- Structured logging
- Metrics collection
- Error tracking
Reliability:
- Graceful shutdown handling
- Circuit breakers for external services
- Retry mechanisms with backoff
Summary
Key deployment considerations for async applications:
- Use multi-stage Docker builds for smaller images
- Configure proper resource limits and health checks
- Implement graceful shutdown handling
- Use structured logging for better observability
- Set up monitoring and alerting
- Follow security best practices
Proper containerization and deployment ensure your async applications run reliably in production environments.
In Part 22, we’ll explore security best practices for async applications.
Security Best Practices
Async applications handle many concurrent connections, making security both more critical and more complex. A single vulnerability can affect hundreds of simultaneous users. Here are the security patterns that matter most for async Python applications.
Input Validation with Pydantic
Never trust incoming data, especially in concurrent environments:
from pydantic import BaseModel, validator, EmailStr
import re
class UserInput(BaseModel):
username: str
email: EmailStr
age: int
@validator('username')
def validate_username(cls, v):
if not re.match(r'^[a-zA-Z0-9_]{3,20}$', v):
raise ValueError('Username must be 3-20 characters, alphanumeric only')
return v
@validator('age')
def validate_age(cls, v):
if not 13 <= v <= 120:
raise ValueError('Age must be between 13 and 120')
return v
Pydantic validates data structure and types automatically. Custom validators add business logic validation. The regex pattern prevents injection attacks through usernames.
Async Input Validation
Add async validation for database checks:
from fastapi import FastAPI, HTTPException
app = FastAPI()
async def is_username_taken(username: str) -> bool:
"""Check if username exists in database"""
# Simulate async database check
await asyncio.sleep(0.1)
return username.lower() in ['admin', 'root', 'test']
Async validators can check against databases or external services without blocking the event loop.
Use async validation in endpoints:
@app.post("/users")
async def create_user(user_data: UserInput):
"""Create user with async validation"""
if await is_username_taken(user_data.username):
raise HTTPException(status_code=400, detail="Username taken")
# Process user creation
return {"message": "User created successfully"}
Password Hashing
Implement secure password hashing with bcrypt:
import bcrypt
import asyncio
class PasswordManager:
async def hash_password(self, password: str) -> str:
"""Hash password asynchronously"""
loop = asyncio.get_running_loop()
salt = bcrypt.gensalt()
def hash_sync():
return bcrypt.hashpw(password.encode('utf-8'), salt)
hashed = await loop.run_in_executor(None, hash_sync)
return hashed.decode('utf-8')
async def verify_password(self, password: str, hashed: str) -> bool:
"""Verify password asynchronously"""
loop = asyncio.get_running_loop()
def verify_sync():
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
return await loop.run_in_executor(None, verify_sync)
Using thread pools prevents the CPU-intensive hashing from blocking the event loop.
JWT Token Management
Create and verify JWT tokens securely:
import jwt
from datetime import datetime, timedelta
from fastapi.security import HTTPBearer
class TokenManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.algorithm = "HS256"
def create_token(self, user_id: str, permissions: list = None) -> str:
"""Create JWT access token"""
expire = datetime.utcnow() + timedelta(minutes=30)
payload = {
"sub": user_id,
"exp": expire,
"permissions": permissions or []
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
async def verify_token(self, token: str) -> dict:
"""Verify JWT token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
Keep token creation simple and focused on essential claims only.
Authentication Dependency
Create a reusable authentication dependency:
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer
security = HTTPBearer()
token_manager = TokenManager("your-secret-key")
async def get_current_user(credentials = Depends(security)):
"""Get authenticated user from token"""
token = credentials.credentials
payload = await token_manager.verify_token(token)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")
return {"user_id": user_id, "permissions": payload.get("permissions", [])}
@app.get("/protected")
async def protected_endpoint(current_user = Depends(get_current_user)):
"""Protected endpoint example"""
return {"message": f"Hello user {current_user['user_id']}"}
This dependency can be reused across all protected endpoints.
Rate Limiting
Implement simple rate limiting:
import time
from collections import defaultdict, deque
class RateLimiter:
def __init__(self):
self.clients = defaultdict(lambda: deque())
async def is_allowed(self, client_id: str, max_requests: int = 100) -> bool:
"""Check if request is under rate limit"""
now = time.time()
client_requests = self.clients[client_id]
# Remove old requests (older than 60 seconds)
while client_requests and client_requests[0] < now - 60:
client_requests.popleft()
# Check limit
if len(client_requests) < max_requests:
client_requests.append(now)
return True
return False
rate_limiter = RateLimiter()
async def rate_limit_middleware(request, call_next):
"""Rate limiting middleware"""
client_ip = request.client.host
if not await rate_limiter.is_allowed(client_ip):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
return await call_next(request)
This provides basic protection against abuse without complex dependencies.
Secure Database Queries
Always use parameterized queries:
import asyncpg
async def get_user_safely(user_id: str):
"""Safe database query with parameters"""
query = "SELECT id, username, email FROM users WHERE id = $1"
async with db_pool.acquire() as conn:
row = await conn.fetchrow(query, user_id)
return dict(row) if row else None
async def search_users_safely(search_term: str):
"""Safe search with input validation"""
if not search_term or len(search_term) > 100:
return []
query = """
SELECT id, username, email
FROM users
WHERE username ILIKE $1
LIMIT 50
"""
async with db_pool.acquire() as conn:
rows = await conn.fetch(query, f"%{search_term}%")
return [dict(row) for row in rows]
Never concatenate user input directly into SQL queries.
Security Headers
Add essential security headers:
async def security_headers_middleware(request, call_next):
"""Add security headers to responses"""
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
return response
app.middleware("http")(security_headers_middleware)
These headers provide basic protection against common web vulnerabilities.
Summary
Security in async applications requires:
Essential Practices
- Input Validation: Validate all user inputs with Pydantic
- Password Security: Use bcrypt with async thread pools
- Token Management: Implement secure JWT handling
- Rate Limiting: Protect against abuse and DDoS
- Database Security: Always use parameterized queries
- Security Headers: Add protective HTTP headers
Key Principles
- Never trust user input
- Use async-safe security libraries
- Implement defense in depth
- Monitor and log security events
- Keep dependencies updated
Each security measure builds upon the others to create a robust defense system for your async applications.
Testing Async Applications
Testing async code feels different from regular Python testing. You’re dealing with timing issues, concurrent operations, and the challenge of mocking async dependencies. The good news? Once you understand the patterns, async testing becomes straightforward.
Basic Async Testing
Start with pytest-asyncio for async test support:
import pytest
import asyncio
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_simple_async_function():
async def fetch_data():
await asyncio.sleep(0.1)
return {"status": "success"}
result = await fetch_data()
assert result["status"] == "success"
The @pytest.mark.asyncio
decorator tells pytest to run this test in an async context. Without it, you’d get errors about coroutines not being awaited.
Testing concurrent operations requires careful setup:
@pytest.mark.asyncio
async def test_concurrent_operations():
async def worker(worker_id, delay):
await asyncio.sleep(delay)
return f"worker-{worker_id}"
tasks = [worker(1, 0.1), worker(2, 0.05), worker(3, 0.15)]
results = await asyncio.gather(*tasks)
assert len(results) == 3
assert all("worker-" in result for result in results)
This test verifies that multiple async operations run concurrently and all complete successfully.
Mocking Async Dependencies
Mock external async services effectively:
class UserService:
def __init__(self, api_client):
self.api_client = api_client
async def get_user_profile(self, user_id: str):
try:
user_data = await self.api_client.fetch_user_data(user_id)
return {
"id": user_data["id"],
"name": user_data["name"],
"profile_complete": bool(user_data.get("name"))
}
except Exception as e:
return {"error": str(e)}
This service depends on an external API client. In tests, we need to mock this dependency to avoid making real API calls.
Here’s how to mock the async dependency:
@pytest.mark.asyncio
async def test_user_service_success():
# Mock the API client
mock_api_client = AsyncMock()
mock_api_client.fetch_user_data.return_value = {
"id": "123",
"name": "John Doe"
}
service = UserService(mock_api_client)
profile = await service.get_user_profile("123")
assert profile["id"] == "123"
assert profile["profile_complete"] is True
mock_api_client.fetch_user_data.assert_called_once_with("123")
@pytest.mark.asyncio
async def test_user_service_error_handling():
mock_api_client = AsyncMock()
mock_api_client.fetch_user_data.side_effect = Exception("API timeout")
service = UserService(mock_api_client)
profile = await service.get_user_profile("123")
assert "error" in profile
assert "API timeout" in profile["error"]
Testing FastAPI Endpoints
Test async web endpoints with httpx:
from fastapi import FastAPI
from httpx import AsyncClient
app = FastAPI()
@app.get("/users/{user_id}")
async def get_user(user_id: str):
await asyncio.sleep(0.1) # Simulate database query
return {"id": user_id, "name": f"User {user_id}"}
@pytest.mark.asyncio
async def test_get_user_endpoint():
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/users/123")
assert response.status_code == 200
data = response.json()
assert data["id"] == "123"
assert data["name"] == "User 123"
Testing Concurrent Behavior
Test race conditions and concurrent operations:
class AsyncCounter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def safe_increment(self):
async with self.lock:
current = self.value
await asyncio.sleep(0.001)
self.value = current + 1
async def unsafe_increment(self):
current = self.value
await asyncio.sleep(0.001)
self.value = current + 1
@pytest.mark.asyncio
async def test_concurrent_safe_increment():
counter = AsyncCounter()
# Run 100 concurrent increments
tasks = [counter.safe_increment() for _ in range(100)]
await asyncio.gather(*tasks)
assert counter.value == 100
@pytest.mark.asyncio
async def test_concurrent_unsafe_increment():
counter = AsyncCounter()
tasks = [counter.unsafe_increment() for _ in range(100)]
await asyncio.gather(*tasks)
# Will likely be less than 100 due to race conditions
assert counter.value < 100
Performance Testing
Test performance characteristics:
import time
@pytest.mark.asyncio
async def test_response_time():
async def api_operation():
await asyncio.sleep(0.1)
return "success"
start_time = time.time()
result = await api_operation()
response_time = time.time() - start_time
assert result == "success"
assert response_time < 0.2 # Should complete within 200ms
@pytest.mark.asyncio
async def test_throughput():
async def process_request(request_id):
await asyncio.sleep(0.05)
return f"processed_{request_id}"
start_time = time.time()
tasks = [process_request(i) for i in range(100)]
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
throughput = len(results) / total_time
assert throughput > 500 # Should handle 500+ requests per second
Integration Testing
Test complete async workflows:
@pytest.mark.asyncio
async def test_user_registration_workflow():
# Mock dependencies
mock_database = AsyncMock()
mock_email_service = AsyncMock()
mock_database.create_user.return_value = {"id": "user_123"}
mock_database.get_user.return_value = {"id": "user_123", "name": "John"}
async def register_user(user_data):
user = await mock_database.create_user(user_data)
await mock_email_service.send_welcome_email(user["id"])
return await mock_database.get_user(user["id"])
result = await register_user({"name": "John", "email": "[email protected]"})
assert result["id"] == "user_123"
mock_database.create_user.assert_called_once()
mock_email_service.send_welcome_email.assert_called_once_with("user_123")
Common Testing Pitfalls
Avoid these async testing mistakes:
1. Forgetting @pytest.mark.asyncio
# Wrong - will fail
async def test_async_function():
result = await some_async_function()
# Correct
@pytest.mark.asyncio
async def test_async_function():
result = await some_async_function()
2. Using regular Mock instead of AsyncMock
# Wrong
mock_service = Mock()
# Correct
mock_service = AsyncMock()
3. Not testing error scenarios Always test both success and failure cases.
Summary
Essential async testing strategies:
- Use
@pytest.mark.asyncio
for async test functions - Mock async dependencies with
AsyncMock
- Test concurrent behavior and race conditions
- Measure performance characteristics
- Test complete workflows end-to-end
- Always test both success and error scenarios
Key tools: pytest-asyncio
, httpx.AsyncClient
, AsyncMock
, and aioresponses
.
In Part 24, we’ll conclude with best practices and future considerations.
Best Practices and Future Considerations
As we conclude this comprehensive guide to async programming in Python, let’s consolidate the key best practices and explore future developments.
Core Best Practices Summary
Design Principles
1. Async All the Way Down
# Good: Async throughout the call stack
async def handle_request():
user_data = await fetch_user_from_db()
enriched_data = await enrich_user_data(user_data)
return await format_response(enriched_data)
# Avoid: Mixing sync and async unnecessarily
async def mixed_approach(): # Not recommended
user_data = fetch_user_sync() # Blocks event loop
return await process_data(user_data)
2. Proper Error Handling
# Always handle exceptions in concurrent operations
async def robust_operations():
tasks = [risky_operation(i) for i in range(10)]
# Use return_exceptions=True to handle partial failures
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
return {"successful": len(successful), "failed": len(failed)}
3. Resource Management
# Always clean up resources properly
class AsyncResourceManager:
async def __aenter__(self):
self.resource = await acquire_resource()
return self.resource
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.resource.close()
Performance Guidelines
Connection Pooling and Rate Limiting
# Reuse connections and control concurrency
import aiohttp
import asyncio
async def efficient_operations():
# Connection pooling
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
# Rate limiting
semaphore = asyncio.Semaphore(10)
async with aiohttp.ClientSession(connector=connector) as session:
async def limited_request(url):
async with semaphore:
return await session.get(url)
tasks = [limited_request(url) for url in urls]
return await asyncio.gather(*tasks)
Common Pitfalls to Avoid
1. Blocking the Event Loop
# Wrong: Blocking operations
async def bad_example():
time.sleep(1) # Blocks entire event loop
# Correct: Use async alternatives
async def good_example():
await asyncio.sleep(1) # Non-blocking
2. Not Handling Task Cancellation
# Correct: Handle cancellation gracefully
async def good_task():
try:
while True:
await do_work()
except asyncio.CancelledError:
await cleanup()
raise
3. Creating Too Many Tasks
# Correct: Limit concurrency
async def good_concurrent():
semaphore = asyncio.Semaphore(50)
async def limited_process(item):
async with semaphore:
return await process_item(item)
tasks = [limited_process(item) for item in huge_list]
await asyncio.gather(*tasks)
Future of Async Python
Python 3.12+ Features
- Improved error messages for async code
- Better performance optimizations
- Enhanced debugging support
Emerging Patterns
- Structured concurrency
- Better integration with type hints
- Improved async context managers
Key Tools and Libraries
- FastAPI: Web API development
- asyncpg: PostgreSQL driver
- aioredis: Redis integration
- httpx: Modern HTTP client
- pytest-asyncio: Testing framework
Migration Strategies
From Sync to Async
- Start with I/O-bound operations
- Migrate one component at a time
- Use thread pools for legacy sync code
- Test thoroughly at each step
Performance Optimization
- Profile before optimizing
- Focus on I/O bottlenecks first
- Implement connection pooling
- Add monitoring and metrics
Final Recommendations
For New Projects:
- Start with async from the beginning
- Use modern libraries (FastAPI, httpx, asyncpg)
- Implement proper error handling and logging
- Set up monitoring from day one
For Existing Projects:
- Identify I/O bottlenecks first
- Migrate incrementally
- Use compatibility layers when needed
- Measure performance improvements
For Production:
- Implement comprehensive monitoring
- Use connection pooling and rate limiting
- Set up proper health checks
- Plan for graceful shutdowns
Summary
You’ve covered a lot of ground - from basic coroutines to production deployment. Async programming isn’t just about making things faster; it’s about building applications that can handle real-world complexity gracefully.
The biggest lesson? Start simple. Don’t async everything on day one. Pick one I/O-bound bottleneck, apply async patterns, measure the improvement, then expand from there. I’ve seen too many projects get overwhelmed trying to make everything async at once.
A few things that took me years to learn:
- Error handling matters more in async code - one unhandled exception can kill your entire event loop
- Connection pooling isn’t optional - it’s the difference between 100 and 10,000 concurrent users
- Monitoring is your lifeline - async bugs are often timing-related and hard to reproduce locally
- Testing async code is different - embrace pytest-asyncio and mock your external dependencies
The Python async ecosystem keeps improving. FastAPI made async web development mainstream. Libraries like httpx and asyncpg provide excellent async alternatives to traditional tools. The community is building better patterns and tools every year.
Most importantly: async programming is a tool, not a goal. Use it when it solves real problems - slow APIs, database bottlenecks, or handling many concurrent users. Don’t use it just because it’s trendy.
You now have the knowledge to build async applications that scale. Start with something small, apply these patterns, and watch your applications handle load that would have crushed their synchronous counterparts.