Caching and Connection Pooling

Advanced async applications need sophisticated resource management. Let’s explore caching patterns and connection pooling for optimal performance.

Basic Async Cache

Start with a simple cache implementation:

import asyncio
import time
from typing import Any, Optional, Dict

class AsyncCache:
    def __init__(self, default_ttl: int = 300):
        self.cache: Dict[str, Dict[str, Any]] = {}
        self.default_ttl = default_ttl
    
    async def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        if key not in self.cache:
            return None
        
        entry = self.cache[key]
        
        # Check if expired
        if time.time() > entry['expires']:
            del self.cache[key]
            return None
        
        return entry['value']
    
    async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        """Set value in cache"""
        ttl = ttl or self.default_ttl
        
        self.cache[key] = {
            'value': value,
            'expires': time.time() + ttl
        }
    
    async def delete(self, key: str) -> bool:
        """Delete key from cache"""
        if key in self.cache:
            del self.cache[key]
            return True
        return False
    
    async def clear(self) -> None:
        """Clear all cache entries"""
        self.cache.clear()

# Usage
cache = AsyncCache(default_ttl=600)

async def get_user_data(user_id: str):
    # Try cache first
    cached_data = await cache.get(f"user:{user_id}")
    if cached_data:
        return cached_data
    
    # Fetch from database
    user_data = await fetch_user_from_db(user_id)
    
    # Cache the result
    await cache.set(f"user:{user_id}", user_data, ttl=300)
    
    return user_data

Redis-based Async Cache

Use Redis for distributed caching:

import aioredis
import json
import asyncio

class RedisCache:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_url = redis_url
        self.redis = None
    
    async def connect(self):
        """Connect to Redis"""
        self.redis = aioredis.from_url(self.redis_url)
    
    async def get(self, key: str):
        """Get value from Redis cache"""
        if not self.redis:
            await self.connect()
        
        value = await self.redis.get(key)
        if value:
            return json.loads(value)
        return None
    
    async def set(self, key: str, value, ttl: int = 300):
        """Set value in Redis cache"""
        if not self.redis:
            await self.connect()
        
        serialized_value = json.dumps(value)
        await self.redis.setex(key, ttl, serialized_value)
    
    async def close(self):
        """Close Redis connection"""
        if self.redis:
            await self.redis.close()

Cache Decorators

Create reusable cache decorators:

import functools
import hashlib

def async_cache(ttl: int = 300):
    """Decorator for caching async function results"""
    
    def decorator(func):
        cache = AsyncCache(default_ttl=ttl)
        
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # Create cache key from function name and arguments
            key_data = f"{func.__name__}:{str(args)}:{str(sorted(kwargs.items()))}"
            cache_key = hashlib.md5(key_data.encode()).hexdigest()
            
            # Try cache first
            cached_result = await cache.get(cache_key)
            if cached_result is not None:
                return cached_result
            
            # Execute function
            result = await func(*args, **kwargs)
            
            # Cache the result
            await cache.set(cache_key, result, ttl=ttl)
            
            return result
        
        return wrapper
    return decorator

Connection Pooling

Implement efficient database connection pooling:

import asyncpg
import asyncio
from contextlib import asynccontextmanager

class DatabasePool:
    def __init__(self, database_url: str, min_size: int = 10, max_size: int = 20):
        self.database_url = database_url
        self.min_size = min_size
        self.max_size = max_size
        self.pool = None
    
    async def initialize(self):
        """Initialize the connection pool"""
        self.pool = await asyncpg.create_pool(
            self.database_url,
            min_size=self.min_size,
            max_size=self.max_size
        )
    
    @asynccontextmanager
    async def acquire(self):
        """Acquire a connection from the pool"""
        if not self.pool:
            await self.initialize()
        
        async with self.pool.acquire() as connection:
            yield connection
    
    async def fetch(self, query: str, *args):
        """Fetch results using the pool"""
        async with self.acquire() as conn:
            return await conn.fetch(query, *args)
    
    async def close(self):
        """Close the connection pool"""
        if self.pool:
            await self.pool.close()

Best Practices

Key caching and connection pooling principles:

Caching Strategy:

  • Use appropriate TTL values based on data freshness requirements
  • Implement cache invalidation for critical data updates
  • Use distributed caching (Redis) for multi-instance applications

Connection Pooling:

  • Set appropriate pool sizes based on expected load
  • Monitor connection usage and adjust pool sizes
  • Implement proper connection cleanup and error handling

Performance Optimization:

  • Cache expensive computations and database queries
  • Reuse connections across multiple requests
  • Monitor cache hit rates and connection pool utilization

Summary

Caching and connection pooling essentials:

  • Implement async caching for expensive operations
  • Use Redis for distributed caching across multiple instances
  • Create reusable cache decorators for common patterns
  • Implement database connection pooling for better performance
  • Monitor and tune cache and pool configurations

Proper caching and connection pooling significantly improve async application performance and resource utilization.

In Part 13, we’ll explore circuit breakers and resilience patterns.