Caching and Connection Pooling
Advanced async applications need sophisticated resource management. Let’s explore caching patterns and connection pooling for optimal performance.
Basic Async Cache
Start with a simple cache implementation:
import asyncio
import time
from typing import Any, Optional, Dict
class AsyncCache:
def __init__(self, default_ttl: int = 300):
self.cache: Dict[str, Dict[str, Any]] = {}
self.default_ttl = default_ttl
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
if key not in self.cache:
return None
entry = self.cache[key]
# Check if expired
if time.time() > entry['expires']:
del self.cache[key]
return None
return entry['value']
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Set value in cache"""
ttl = ttl or self.default_ttl
self.cache[key] = {
'value': value,
'expires': time.time() + ttl
}
async def delete(self, key: str) -> bool:
"""Delete key from cache"""
if key in self.cache:
del self.cache[key]
return True
return False
async def clear(self) -> None:
"""Clear all cache entries"""
self.cache.clear()
# Usage
cache = AsyncCache(default_ttl=600)
async def get_user_data(user_id: str):
# Try cache first
cached_data = await cache.get(f"user:{user_id}")
if cached_data:
return cached_data
# Fetch from database
user_data = await fetch_user_from_db(user_id)
# Cache the result
await cache.set(f"user:{user_id}", user_data, ttl=300)
return user_data
Redis-based Async Cache
Use Redis for distributed caching:
import aioredis
import json
import asyncio
class RedisCache:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.redis = None
async def connect(self):
"""Connect to Redis"""
self.redis = aioredis.from_url(self.redis_url)
async def get(self, key: str):
"""Get value from Redis cache"""
if not self.redis:
await self.connect()
value = await self.redis.get(key)
if value:
return json.loads(value)
return None
async def set(self, key: str, value, ttl: int = 300):
"""Set value in Redis cache"""
if not self.redis:
await self.connect()
serialized_value = json.dumps(value)
await self.redis.setex(key, ttl, serialized_value)
async def close(self):
"""Close Redis connection"""
if self.redis:
await self.redis.close()
Cache Decorators
Create reusable cache decorators:
import functools
import hashlib
def async_cache(ttl: int = 300):
"""Decorator for caching async function results"""
def decorator(func):
cache = AsyncCache(default_ttl=ttl)
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Create cache key from function name and arguments
key_data = f"{func.__name__}:{str(args)}:{str(sorted(kwargs.items()))}"
cache_key = hashlib.md5(key_data.encode()).hexdigest()
# Try cache first
cached_result = await cache.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function
result = await func(*args, **kwargs)
# Cache the result
await cache.set(cache_key, result, ttl=ttl)
return result
return wrapper
return decorator
Connection Pooling
Implement efficient database connection pooling:
import asyncpg
import asyncio
from contextlib import asynccontextmanager
class DatabasePool:
def __init__(self, database_url: str, min_size: int = 10, max_size: int = 20):
self.database_url = database_url
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def initialize(self):
"""Initialize the connection pool"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=self.min_size,
max_size=self.max_size
)
@asynccontextmanager
async def acquire(self):
"""Acquire a connection from the pool"""
if not self.pool:
await self.initialize()
async with self.pool.acquire() as connection:
yield connection
async def fetch(self, query: str, *args):
"""Fetch results using the pool"""
async with self.acquire() as conn:
return await conn.fetch(query, *args)
async def close(self):
"""Close the connection pool"""
if self.pool:
await self.pool.close()
Best Practices
Key caching and connection pooling principles:
Caching Strategy:
- Use appropriate TTL values based on data freshness requirements
- Implement cache invalidation for critical data updates
- Use distributed caching (Redis) for multi-instance applications
Connection Pooling:
- Set appropriate pool sizes based on expected load
- Monitor connection usage and adjust pool sizes
- Implement proper connection cleanup and error handling
Performance Optimization:
- Cache expensive computations and database queries
- Reuse connections across multiple requests
- Monitor cache hit rates and connection pool utilization
Summary
Caching and connection pooling essentials:
- Implement async caching for expensive operations
- Use Redis for distributed caching across multiple instances
- Create reusable cache decorators for common patterns
- Implement database connection pooling for better performance
- Monitor and tune cache and pool configurations
Proper caching and connection pooling significantly improve async application performance and resource utilization.
In Part 13, we’ll explore circuit breakers and resilience patterns.