Health Checks and Monitoring
Health checks and monitoring are essential for maintaining reliable async applications in production. Let’s explore implementing comprehensive health monitoring.
Basic Health Checks
Implement fundamental health check endpoints:
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
import asyncio
import time
import psutil
from typing import Dict, Any
app = FastAPI()
class HealthChecker:
def __init__(self):
self.start_time = time.time()
self.checks = {}
def register_check(self, name: str, check_func):
"""Register a health check function"""
self.checks[name] = check_func
async def run_checks(self) -> Dict[str, Any]:
"""Run all registered health checks"""
results = {
"status": "healthy",
"timestamp": time.time(),
"uptime": time.time() - self.start_time,
"checks": {}
}
overall_healthy = True
for name, check_func in self.checks.items():
try:
check_result = await check_func()
results["checks"][name] = {
"status": "healthy" if check_result else "unhealthy",
"details": check_result
}
if not check_result:
overall_healthy = False
except Exception as e:
results["checks"][name] = {
"status": "error",
"error": str(e)
}
overall_healthy = False
results["status"] = "healthy" if overall_healthy else "unhealthy"
return results
health_checker = HealthChecker()
@app.get("/health")
async def health_check():
"""Main health check endpoint"""
results = await health_checker.run_checks()
status_code = 200 if results["status"] == "healthy" else 503
return JSONResponse(content=results, status_code=status_code)
@app.get("/health/live")
async def liveness_check():
"""Kubernetes liveness probe"""
return {"status": "alive", "timestamp": time.time()}
@app.get("/health/ready")
async def readiness_check():
"""Kubernetes readiness probe"""
results = await health_checker.run_checks()
if results["status"] == "healthy":
return {"status": "ready", "timestamp": time.time()}
else:
raise HTTPException(status_code=503, detail="Service not ready")
Database Health Checks
Monitor database connectivity:
import asyncpg
import aioredis
async def check_database():
"""Check PostgreSQL database connectivity"""
try:
conn = await asyncpg.connect("postgresql://user:pass@localhost/db")
result = await conn.fetchval("SELECT 1")
await conn.close()
return {"connected": True, "query_result": result}
except Exception as e:
return {"connected": False, "error": str(e)}
async def check_redis():
"""Check Redis connectivity"""
try:
redis = aioredis.from_url("redis://localhost:6379")
pong = await redis.ping()
await redis.close()
return {"connected": True, "ping": pong}
except Exception as e:
return {"connected": False, "error": str(e)}
# Register database checks
health_checker.register_check("database", check_database)
health_checker.register_check("redis", check_redis)
External Service Health Checks
Monitor external dependencies:
import aiohttp
import asyncio
async def check_external_api():
"""Check external API availability"""
try:
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get("https://api.example.com/health") as response:
return {
"available": response.status == 200,
"status_code": response.status
}
except asyncio.TimeoutError:
return {"available": False, "error": "timeout"}
except Exception as e:
return {"available": False, "error": str(e)}
async def check_message_queue():
"""Check message queue connectivity"""
try:
# Simulate queue check
await asyncio.sleep(0.1)
return {
"connected": True,
"queue_size": 42
}
except Exception as e:
return {"connected": False, "error": str(e)}
# Register external service checks
health_checker.register_check("external_api", check_external_api)
health_checker.register_check("message_queue", check_message_queue)
System Resource Monitoring
Monitor system resources:
import psutil
import asyncio
async def check_system_resources():
"""Check system resource usage"""
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"disk_percent": (disk.used / disk.total) * 100,
"healthy": (
cpu_percent < 80 and
memory.percent < 85 and
(disk.used / disk.total) * 100 < 90
)
}
except Exception as e:
return {"healthy": False, "error": str(e)}
async def check_application_metrics():
"""Check application-specific metrics"""
try:
active_tasks = len(asyncio.all_tasks())
return {
"active_tasks": active_tasks,
"healthy": active_tasks < 100
}
except Exception as e:
return {"healthy": False, "error": str(e)}
# Register system checks
health_checker.register_check("system_resources", check_system_resources)
health_checker.register_check("application_metrics", check_application_metrics)
Health Monitoring That Actually Helps
Hard-won insights from monitoring async applications in production:
Health Check Design:
- Implement both liveness and readiness probes
- Keep checks lightweight and fast
- Test actual dependencies, not just connectivity
- Return appropriate HTTP status codes
Monitoring Strategy:
- Monitor both technical and business metrics
- Set up alerting for critical failures
- Track performance trends over time
- Use structured logging for better analysis
Production Considerations:
- Set reasonable timeouts for health checks
- Implement circuit breakers for external dependencies
- Cache health check results when appropriate
- Monitor the health check endpoints themselves
Summary
Health monitoring essentials:
- Implement comprehensive health check endpoints for all dependencies
- Monitor system resources and application metrics
- Use structured logging and integrate with monitoring systems
- Set up appropriate alerting for critical failures
- Design checks to be fast, reliable, and informative
Proper health monitoring ensures early detection of issues and maintains application reliability in production.
In Part 20, we’ll explore graceful shutdown and resource cleanup.