Memory and I/O Optimization

Efficient memory and I/O management are crucial for high-performance async applications. Let’s explore key optimization techniques.

Memory Optimization

Manage memory efficiently in async applications:

import asyncio
import gc
import weakref
from typing import AsyncGenerator

class MemoryEfficientProcessor:
    def __init__(self):
        self._cache = weakref.WeakValueDictionary()
        self._processed_count = 0

Weak references allow objects to be garbage collected even when cached, preventing memory leaks in long-running applications.

Process data in batches to control memory usage:

    async def process_data_stream(self, data_source: AsyncGenerator):
        """Process data stream with memory optimization"""
        
        batch = []
        batch_size = 1000
        
        async for item in data_source:
            batch.append(item)
            
            if len(batch) >= batch_size:
                await self._process_batch(batch)
                batch.clear()  # Clear batch to free memory
                
                # Periodic garbage collection
                if self._processed_count % 10000 == 0:
                    gc.collect()
        
        # Process remaining items
        if batch:
            await self._process_batch(batch)

Batching prevents memory from growing unbounded when processing large datasets. Explicit garbage collection helps in memory-constrained environments.

Handle batch processing with weak reference caching:

    async def _process_batch(self, batch):
        """Process a batch of items"""
        # Simulate processing
        await asyncio.sleep(0.01)
        self._processed_count += len(batch)
        
        # Use weak references for caching
        for item in batch:
            if hasattr(item, 'id'):
                self._cache[item.id] = item

# Usage
processor = MemoryEfficientProcessor()

async def data_generator():
    """Generate data efficiently"""
    for i in range(100000):
        yield {"id": i, "data": f"item_{i}"}

async def main():
    await processor.process_data_stream(data_generator())

I/O Optimization

Optimize file and network I/O operations:

import asyncio
import aiofiles
import aiohttp

class IOOptimizer:
    def __init__(self, max_concurrent_requests=10):
        self.semaphore = asyncio.Semaphore(max_concurrent_requests)
        self.session = None
    
    async def get_session(self):
        """Get or create HTTP session"""
        if not self.session:
            connector = aiohttp.TCPConnector(
                limit=100,
                limit_per_host=30,
                keepalive_timeout=30
            )
            self.session = aiohttp.ClientSession(connector=connector)
        return self.session
    
    async def fetch_with_optimization(self, url: str):
        """Fetch URL with connection pooling and rate limiting"""
        async with self.semaphore:
            session = await self.get_session()
            async with session.get(url) as response:
                return await response.text()
    
    async def read_file_efficiently(self, filename: str, chunk_size: int = 8192):
        """Read large files efficiently"""
        async with aiofiles.open(filename, 'rb') as file:
            while chunk := await file.read(chunk_size):
                yield chunk
    
    async def close(self):
        """Clean up resources"""
        if self.session:
            await self.session.close()

Connection Pooling

Implement efficient connection pooling:

import asyncio
import asyncpg
from contextlib import asynccontextmanager

class DatabasePool:
    def __init__(self, database_url: str, min_size: int = 10, max_size: int = 20):
        self.database_url = database_url
        self.min_size = min_size
        self.max_size = max_size
        self.pool = None
    
    async def initialize(self):
        """Initialize connection pool"""
        self.pool = await asyncpg.create_pool(
            self.database_url,
            min_size=self.min_size,
            max_size=self.max_size,
            command_timeout=60
        )
    
    async def execute_query(self, query: str, *args):
        """Execute query using connection pool"""
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)
    
    async def close(self):
        """Close connection pool"""
        if self.pool:
            await self.pool.close()

Buffering and Batching

Optimize data processing with buffering:

import asyncio
from collections import deque

class BufferedProcessor:
    def __init__(self, batch_size: int = 100, flush_interval: float = 1.0):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.buffer = deque()
        self._running = False
    
    async def start(self):
        """Start the buffered processor"""
        self._running = True
        asyncio.create_task(self._periodic_flush())
    
    async def add_item(self, item):
        """Add item to buffer"""
        self.buffer.append(item)
        
        if len(self.buffer) >= self.batch_size:
            await self._flush_buffer()
    
    async def _periodic_flush(self):
        """Periodically flush buffer"""
        while self._running:
            await asyncio.sleep(self.flush_interval)
            if self.buffer:
                await self._flush_buffer()
    
    async def _flush_buffer(self):
        """Flush current buffer"""
        if not self.buffer:
            return
        
        items = list(self.buffer)
        self.buffer.clear()
        
        # Process batch
        print(f"Processed batch of {len(items)} items")
    
    async def stop(self):
        """Stop processor and flush remaining items"""
        self._running = False
        await self._flush_buffer()

Best Practices

Key optimization principles:

Memory Management:

  • Use weak references for caches
  • Clear collections explicitly
  • Implement periodic garbage collection
  • Monitor memory usage continuously

I/O Optimization:

  • Reuse connections and sessions
  • Implement connection pooling
  • Use appropriate buffer sizes
  • Batch operations when possible

Resource Management:

  • Set proper limits and timeouts
  • Clean up resources in finally blocks
  • Use context managers for resource handling
  • Monitor resource usage

Summary

Memory and I/O optimization techniques:

  • Implement efficient memory management with weak references and garbage collection
  • Use connection pooling for database and HTTP operations
  • Buffer and batch operations for better throughput
  • Monitor performance continuously
  • Clean up resources properly

Proper optimization ensures your async applications perform efficiently under load.

In Part 17, we’ll explore CPU-bound task optimization.