Performance Profiling and Monitoring
Optimizing async applications requires understanding bottlenecks and monitoring performance. Let’s explore profiling techniques and real-time monitoring.
Basic Async Profiling
Profile async functions to identify bottlenecks:
import asyncio
import time
import cProfile
from functools import wraps
def async_profile(func):
"""Decorator to profile async functions"""
@wraps(func)
async def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
start_time = time.time()
result = await func(*args, **kwargs)
end_time = time.time()
pr.disable()
print(f"Function {func.__name__} took {end_time - start_time:.3f} seconds")
# Show top time consumers
import pstats
stats = pstats.Stats(pr)
stats.sort_stats('cumulative')
stats.print_stats(5) # Top 5 functions
return result
return wrapper
This decorator profiles both the async function execution time and the CPU-intensive operations within it. The profiler shows which functions consume the most time.
Use the profiler on your async functions:
@async_profile
async def slow_function():
"""Function to profile"""
await asyncio.sleep(0.1)
total = sum(i * i for i in range(10000)) # CPU work
await asyncio.sleep(0.05)
return total
asyncio.run(slow_function())
Event Loop Monitoring
Monitor event loop health in real-time:
from collections import deque
class EventLoopMonitor:
def __init__(self, sample_interval=1.0):
self.sample_interval = sample_interval
self.metrics = {
'blocked_time': deque(maxlen=60),
'task_count': deque(maxlen=60)
}
self.monitoring = False
The monitor tracks event loop responsiveness and task counts over time. Using deques with maxlen keeps memory usage bounded.
Start monitoring and measure loop responsiveness:
async def start_monitoring(self):
"""Start monitoring the event loop"""
self.monitoring = True
asyncio.create_task(self._monitor_loop())
async def _monitor_loop(self):
"""Monitor loop performance"""
while self.monitoring:
start_time = time.time()
# Measure loop responsiveness
await asyncio.sleep(0) # Yield to other tasks
end_time = time.time()
blocked_time = end_time - start_time
# Collect metrics
self.metrics['blocked_time'].append(blocked_time)
# Count tasks
all_tasks = asyncio.all_tasks()
self.metrics['task_count'].append(len(all_tasks))
await asyncio.sleep(self.sample_interval)
The key insight: await asyncio.sleep(0)
should return almost immediately in a healthy event loop. If it takes longer, the loop is blocked by CPU-intensive work.
Generate performance statistics:
def get_stats(self):
"""Get performance statistics"""
if not self.metrics['blocked_time']:
return {}
blocked_times = list(self.metrics['blocked_time'])
task_counts = list(self.metrics['task_count'])
return {
'avg_blocked_time': sum(blocked_times) / len(blocked_times),
'max_blocked_time': max(blocked_times),
'avg_task_count': sum(task_counts) / len(task_counts),
'max_task_count': max(task_counts)
}
Application Metrics Collection
Collect comprehensive application metrics:
import psutil
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class Metrics:
timestamp: float
cpu_percent: float
memory_mb: float
active_tasks: int
requests_per_second: float
The Metrics dataclass provides a clean structure for performance data. Using dataclasses makes the code more readable and type-safe.
Set up the metrics collector:
class AsyncMetricsCollector:
def __init__(self):
self.request_times = deque(maxlen=1000)
self.task_counts = defaultdict(int)
self.process = psutil.Process()
def record_request(self, duration: float):
"""Record request completion time"""
self.request_times.append((time.time(), duration))
def record_task_completion(self, success: bool = True):
"""Record task completion"""
if success:
self.task_counts['completed'] += 1
else:
self.task_counts['failed'] += 1
The collector tracks request timing and task completion rates. Using deque with maxlen prevents memory growth over time.
Generate comprehensive metrics:
def get_current_metrics(self) -> Metrics:
"""Get current system and application metrics"""
now = time.time()
# System metrics
cpu_percent = self.process.cpu_percent()
memory_mb = self.process.memory_info().rss / 1024 / 1024
# Task metrics
all_tasks = asyncio.all_tasks()
active_tasks = len([t for t in all_tasks if not t.done()])
# Request metrics
recent_requests = [
(ts, duration) for ts, duration in self.request_times
if now - ts < 60 # Last minute
]
requests_per_second = len(recent_requests) / 60 if recent_requests else 0
return Metrics(
timestamp=now,
cpu_percent=cpu_percent,
memory_mb=memory_mb,
active_tasks=active_tasks,
requests_per_second=requests_per_second
)
This method combines system metrics (CPU, memory) with application metrics (tasks, requests) for a complete performance picture.
Create a monitoring decorator:
# Monitoring decorator
def monitor_async_function(metrics_collector: AsyncMetricsCollector):
"""Decorator to monitor async function performance"""
def decorator(func):
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
metrics_collector.record_task_completion(success=True)
return result
except Exception as e:
metrics_collector.record_task_completion(success=False)
raise
finally:
duration = time.time() - start_time
metrics_collector.record_request(duration)
return wrapper
return decorator
Performance Alerting
Implement alerting for performance issues:
from dataclasses import dataclass
@dataclass
class Alert:
level: str # "warning", "critical"
message: str
metric_name: str
current_value: float
threshold: float
The Alert dataclass structures alert information for consistent handling across different alert channels.
Set up the alerting system:
class PerformanceAlerter:
def __init__(self):
self.thresholds = {}
self.alert_handlers = []
def add_threshold(self, metric_name: str, warning: float, critical: float):
"""Add performance threshold"""
self.thresholds[metric_name] = {
'warning': warning,
'critical': critical
}
def add_alert_handler(self, handler):
"""Add alert handler function"""
self.alert_handlers.append(handler)
The alerter supports multiple thresholds and handlers, making it flexible for different notification channels (email, Slack, logs).
Check metrics against thresholds:
async def check_metrics(self, metrics: Metrics):
"""Check metrics against thresholds"""
alerts = []
# Check CPU usage
if 'cpu_percent' in self.thresholds:
alerts.extend(self._check_threshold('cpu_percent', metrics.cpu_percent))
# Check memory usage
if 'memory_mb' in self.thresholds:
alerts.extend(self._check_threshold('memory_mb', metrics.memory_mb))
# Send alerts
for alert in alerts:
await self._send_alert(alert)
The system checks each configured metric and generates alerts when thresholds are exceeded.
Implement threshold checking logic:
def _check_threshold(self, metric_name: str, current_value: float):
"""Check if metric exceeds thresholds"""
alerts = []
thresholds = self.thresholds[metric_name]
if current_value >= thresholds['critical']:
alerts.append(Alert(
level="critical",
message=f"{metric_name} is critically high: {current_value:.2f}",
metric_name=metric_name,
current_value=current_value,
threshold=thresholds['critical']
))
elif current_value >= thresholds['warning']:
alerts.append(Alert(
level="warning",
message=f"{metric_name} above warning: {current_value:.2f}",
metric_name=metric_name,
current_value=current_value,
threshold=thresholds['warning']
))
return alerts
Critical alerts take precedence over warnings. This prevents alert spam when metrics are extremely high.
Handle alert delivery:
async def _send_alert(self, alert: Alert):
"""Send alert to all handlers"""
for handler in self.alert_handlers:
try:
await handler(alert)
except Exception as e:
print(f"Alert handler failed: {e}")
# Alert handlers
async def log_alert(alert: Alert):
"""Log alert to console"""
level_icon = "🚨" if alert.level == "critical" else "⚠️"
print(f"{level_icon} {alert.level.upper()}: {alert.message}")
Complete Monitoring Demo
Put everything together:
async def monitoring_demo():
"""Demonstrate complete monitoring system"""
collector = AsyncMetricsCollector()
alerter = PerformanceAlerter()
# Configure thresholds
alerter.add_threshold('cpu_percent', warning=70.0, critical=90.0)
alerter.add_threshold('memory_mb', warning=500.0, critical=1000.0)
# Add alert handlers
alerter.add_alert_handler(log_alert)
Set up the monitoring infrastructure with appropriate thresholds for your application’s normal operating parameters.
Create a monitored task for demonstration:
@monitor_async_function(collector)
async def sample_task(task_id: int):
"""Sample task with monitoring"""
await asyncio.sleep(0.1)
# Simulate occasional failures
import random
if random.random() < 0.1:
raise Exception(f"Task {task_id} failed")
return f"Task {task_id} completed"
The decorator automatically tracks execution time and success/failure rates for any async function.
Run the monitoring system:
# Run monitored tasks
for i in range(20):
try:
await sample_task(i)
except Exception:
pass
# Check metrics periodically
if i % 5 == 0:
metrics = collector.get_current_metrics()
await alerter.check_metrics(metrics)
print(f"Metrics - CPU: {metrics.cpu_percent:.1f}%, "
f"Memory: {metrics.memory_mb:.1f}MB, "
f"Tasks: {metrics.active_tasks}")
asyncio.run(monitoring_demo())
Summary
Performance profiling and monitoring are essential for async applications:
Key Components
- Profiling: Identify bottlenecks with cProfile and custom decorators
- Event Loop Monitoring: Track loop health and responsiveness
- Metrics Collection: Gather system and application metrics
- Alerting: Notify on performance issues
Best Practices
- Profile regularly to identify performance regressions
- Monitor event loop responsiveness
- Set appropriate thresholds for alerts
- Collect both system and application metrics
- Use sampling to avoid monitoring overhead
In Part 16, we’ll explore memory optimization and I/O optimization techniques.