Memory and I/O Optimization
Efficient memory and I/O management are crucial for high-performance async applications. Let’s explore key optimization techniques.
Memory Optimization
Manage memory efficiently in async applications:
import asyncio
import gc
import weakref
from typing import AsyncGenerator
class MemoryEfficientProcessor:
def __init__(self):
self._cache = weakref.WeakValueDictionary()
self._processed_count = 0
Weak references allow objects to be garbage collected even when cached, preventing memory leaks in long-running applications.
Process data in batches to control memory usage:
async def process_data_stream(self, data_source: AsyncGenerator):
"""Process data stream with memory optimization"""
batch = []
batch_size = 1000
async for item in data_source:
batch.append(item)
if len(batch) >= batch_size:
await self._process_batch(batch)
batch.clear() # Clear batch to free memory
# Periodic garbage collection
if self._processed_count % 10000 == 0:
gc.collect()
# Process remaining items
if batch:
await self._process_batch(batch)
Batching prevents memory from growing unbounded when processing large datasets. Explicit garbage collection helps in memory-constrained environments.
Handle batch processing with weak reference caching:
async def _process_batch(self, batch):
"""Process a batch of items"""
# Simulate processing
await asyncio.sleep(0.01)
self._processed_count += len(batch)
# Use weak references for caching
for item in batch:
if hasattr(item, 'id'):
self._cache[item.id] = item
# Usage
processor = MemoryEfficientProcessor()
async def data_generator():
"""Generate data efficiently"""
for i in range(100000):
yield {"id": i, "data": f"item_{i}"}
async def main():
await processor.process_data_stream(data_generator())
I/O Optimization
Optimize file and network I/O operations:
import asyncio
import aiofiles
import aiohttp
class IOOptimizer:
def __init__(self, max_concurrent_requests=10):
self.semaphore = asyncio.Semaphore(max_concurrent_requests)
self.session = None
async def get_session(self):
"""Get or create HTTP session"""
if not self.session:
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=30,
keepalive_timeout=30
)
self.session = aiohttp.ClientSession(connector=connector)
return self.session
async def fetch_with_optimization(self, url: str):
"""Fetch URL with connection pooling and rate limiting"""
async with self.semaphore:
session = await self.get_session()
async with session.get(url) as response:
return await response.text()
async def read_file_efficiently(self, filename: str, chunk_size: int = 8192):
"""Read large files efficiently"""
async with aiofiles.open(filename, 'rb') as file:
while chunk := await file.read(chunk_size):
yield chunk
async def close(self):
"""Clean up resources"""
if self.session:
await self.session.close()
Connection Pooling
Implement efficient connection pooling:
import asyncio
import asyncpg
from contextlib import asynccontextmanager
class DatabasePool:
def __init__(self, database_url: str, min_size: int = 10, max_size: int = 20):
self.database_url = database_url
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def initialize(self):
"""Initialize connection pool"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=60
)
async def execute_query(self, query: str, *args):
"""Execute query using connection pool"""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def close(self):
"""Close connection pool"""
if self.pool:
await self.pool.close()
Buffering and Batching
Optimize data processing with buffering:
import asyncio
from collections import deque
class BufferedProcessor:
def __init__(self, batch_size: int = 100, flush_interval: float = 1.0):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer = deque()
self._running = False
async def start(self):
"""Start the buffered processor"""
self._running = True
asyncio.create_task(self._periodic_flush())
async def add_item(self, item):
"""Add item to buffer"""
self.buffer.append(item)
if len(self.buffer) >= self.batch_size:
await self._flush_buffer()
async def _periodic_flush(self):
"""Periodically flush buffer"""
while self._running:
await asyncio.sleep(self.flush_interval)
if self.buffer:
await self._flush_buffer()
async def _flush_buffer(self):
"""Flush current buffer"""
if not self.buffer:
return
items = list(self.buffer)
self.buffer.clear()
# Process batch
print(f"Processed batch of {len(items)} items")
async def stop(self):
"""Stop processor and flush remaining items"""
self._running = False
await self._flush_buffer()
Best Practices
Key optimization principles:
Memory Management:
- Use weak references for caches
- Clear collections explicitly
- Implement periodic garbage collection
- Monitor memory usage continuously
I/O Optimization:
- Reuse connections and sessions
- Implement connection pooling
- Use appropriate buffer sizes
- Batch operations when possible
Resource Management:
- Set proper limits and timeouts
- Clean up resources in finally blocks
- Use context managers for resource handling
- Monitor resource usage
Summary
Memory and I/O optimization techniques:
- Implement efficient memory management with weak references and garbage collection
- Use connection pooling for database and HTTP operations
- Buffer and batch operations for better throughput
- Monitor performance continuously
- Clean up resources properly
Proper optimization ensures your async applications perform efficiently under load.
In Part 17, we’ll explore CPU-bound task optimization.