CPU-Bound Task Optimization
Async programming shines with I/O-bound tasks, but CPU-intensive work presents a different challenge. The event loop can’t help when your CPU is genuinely busy crunching numbers or processing data. Here’s how to handle CPU-bound operations without blocking your async application.
Process Pools for CPU-Intensive Work
When you need serious computational power:
import asyncio
import concurrent.futures
import multiprocessing
def cpu_intensive_task(data: list, multiplier: int = 2):
"""CPU-intensive task that should run in separate process"""
result = []
for item in data:
# Simulate CPU-intensive work
value = sum(i * multiplier for i in range(item * 1000))
result.append(value)
return result
async def optimize_cpu_bound_work():
"""Optimize CPU-bound work using process pools"""
# Prepare data chunks
data_chunks = [
list(range(10, 20)),
list(range(20, 30)),
list(range(30, 40)),
list(range(40, 50))
]
# Use process pool for CPU-bound work
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
loop = asyncio.get_event_loop()
# Submit tasks to process pool
tasks = [
loop.run_in_executor(executor, cpu_intensive_task, chunk)
for chunk in data_chunks
]
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
return results
# Usage
async def main():
results = await optimize_cpu_bound_work()
print(f"Processed {len(results)} chunks")
if __name__ == "__main__":
asyncio.run(main())
Thread Pools for Mixed Workloads
Use thread pools when you have mixed I/O and CPU work:
import asyncio
import concurrent.futures
import requests
import json
def process_api_response(response_data: dict):
"""CPU-intensive processing of API response"""
# Simulate complex data processing
processed = {}
for key, value in response_data.items():
if isinstance(value, (int, float)):
processed[key] = value ** 2
elif isinstance(value, str):
processed[key] = len(value)
else:
processed[key] = str(value)
return processed
async def fetch_and_process_data(url: str):
"""Fetch data and process it using thread pool"""
# I/O-bound: fetch data asynchronously
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
# CPU-bound: process data in thread pool
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
processed_data = await loop.run_in_executor(
executor, process_api_response, data
)
return processed_data
async def process_multiple_apis():
"""Process multiple APIs concurrently"""
urls = [
"https://api1.example.com/data",
"https://api2.example.com/data",
"https://api3.example.com/data"
]
tasks = [fetch_and_process_data(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Async Generators for Large Datasets
Process large datasets efficiently:
import asyncio
import aiofiles
async def process_large_file(filename: str):
"""Process large file line by line"""
batch = []
batch_size = 1000
async with aiofiles.open(filename, 'r') as file:
async for line in file:
processed_line = line.strip().upper()
batch.append(processed_line)
if len(batch) >= batch_size:
# Process batch in thread pool
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor, lambda: "\n".join(batch)
)
yield result
batch = []
# Process remaining items
if batch:
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor, lambda: "\n".join(batch)
)
yield result
Choosing the Right Approach
Select the appropriate optimization strategy:
Process Pool: True CPU-bound tasks, can utilize multiple CPU cores
Thread Pool: Mixed I/O and CPU work, lower overhead than processes
Async Generators: Large datasets, memory-efficient streaming processing
What Actually Works in Production
From experience optimizing CPU-bound async applications:
1. Profile First
- Identify actual bottlenecks before optimizing
- Use profiling tools to measure performance
- Focus on the most time-consuming operations
2. Choose the Right Tool
- Process pools for true CPU-bound work
- Thread pools for mixed I/O and CPU work
- Async generators for large datasets
3. Batch Processing
- Process data in batches to reduce overhead
- Balance batch size with memory usage
- Use appropriate batch sizes for your use case
4. Resource Management
- Limit the number of worker processes/threads
- Monitor system resources during processing
- Clean up resources properly
Summary
Optimizing CPU-bound tasks in async applications:
- Use process pools for true CPU-intensive work
- Use thread pools for mixed I/O and CPU workloads
- Implement async generators for large dataset processing
- Monitor performance and resource usage
- Choose the right approach based on your specific use case
Proper CPU-bound optimization ensures your async applications can handle computationally intensive work efficiently.
In Part 18, we’ll explore WebSockets and real-time features.