Performance Profiling and Monitoring

Optimizing async applications requires understanding bottlenecks and monitoring performance. Let’s explore profiling techniques and real-time monitoring.

Basic Async Profiling

Profile async functions to identify bottlenecks:

import asyncio
import time
import cProfile
from functools import wraps

def async_profile(func):
    """Decorator to profile async functions"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        pr = cProfile.Profile()
        pr.enable()
        
        start_time = time.time()
        result = await func(*args, **kwargs)
        end_time = time.time()
        
        pr.disable()
        
        print(f"Function {func.__name__} took {end_time - start_time:.3f} seconds")
        
        # Show top time consumers
        import pstats
        stats = pstats.Stats(pr)
        stats.sort_stats('cumulative')
        stats.print_stats(5)  # Top 5 functions
        
        return result
    return wrapper

This decorator profiles both the async function execution time and the CPU-intensive operations within it. The profiler shows which functions consume the most time.

Use the profiler on your async functions:

@async_profile
async def slow_function():
    """Function to profile"""
    await asyncio.sleep(0.1)
    total = sum(i * i for i in range(10000))  # CPU work
    await asyncio.sleep(0.05)
    return total

asyncio.run(slow_function())

Event Loop Monitoring

Monitor event loop health in real-time:

from collections import deque

class EventLoopMonitor:
    def __init__(self, sample_interval=1.0):
        self.sample_interval = sample_interval
        self.metrics = {
            'blocked_time': deque(maxlen=60),
            'task_count': deque(maxlen=60)
        }
        self.monitoring = False

The monitor tracks event loop responsiveness and task counts over time. Using deques with maxlen keeps memory usage bounded.

Start monitoring and measure loop responsiveness:

    async def start_monitoring(self):
        """Start monitoring the event loop"""
        self.monitoring = True
        asyncio.create_task(self._monitor_loop())
    
    async def _monitor_loop(self):
        """Monitor loop performance"""
        while self.monitoring:
            start_time = time.time()
            
            # Measure loop responsiveness
            await asyncio.sleep(0)  # Yield to other tasks
            
            end_time = time.time()
            blocked_time = end_time - start_time
            
            # Collect metrics
            self.metrics['blocked_time'].append(blocked_time)
            
            # Count tasks
            all_tasks = asyncio.all_tasks()
            self.metrics['task_count'].append(len(all_tasks))
            
            await asyncio.sleep(self.sample_interval)

The key insight: await asyncio.sleep(0) should return almost immediately in a healthy event loop. If it takes longer, the loop is blocked by CPU-intensive work.

Generate performance statistics:

    def get_stats(self):
        """Get performance statistics"""
        if not self.metrics['blocked_time']:
            return {}
        
        blocked_times = list(self.metrics['blocked_time'])
        task_counts = list(self.metrics['task_count'])
        
        return {
            'avg_blocked_time': sum(blocked_times) / len(blocked_times),
            'max_blocked_time': max(blocked_times),
            'avg_task_count': sum(task_counts) / len(task_counts),
            'max_task_count': max(task_counts)
        }

Application Metrics Collection

Collect comprehensive application metrics:

import psutil
from dataclasses import dataclass
from collections import defaultdict

@dataclass
class Metrics:
    timestamp: float
    cpu_percent: float
    memory_mb: float
    active_tasks: int
    requests_per_second: float

The Metrics dataclass provides a clean structure for performance data. Using dataclasses makes the code more readable and type-safe.

Set up the metrics collector:

class AsyncMetricsCollector:
    def __init__(self):
        self.request_times = deque(maxlen=1000)
        self.task_counts = defaultdict(int)
        self.process = psutil.Process()
        
    def record_request(self, duration: float):
        """Record request completion time"""
        self.request_times.append((time.time(), duration))
        
    def record_task_completion(self, success: bool = True):
        """Record task completion"""
        if success:
            self.task_counts['completed'] += 1
        else:
            self.task_counts['failed'] += 1

The collector tracks request timing and task completion rates. Using deque with maxlen prevents memory growth over time.

Generate comprehensive metrics:

    def get_current_metrics(self) -> Metrics:
        """Get current system and application metrics"""
        now = time.time()
        
        # System metrics
        cpu_percent = self.process.cpu_percent()
        memory_mb = self.process.memory_info().rss / 1024 / 1024
        
        # Task metrics
        all_tasks = asyncio.all_tasks()
        active_tasks = len([t for t in all_tasks if not t.done()])
        
        # Request metrics
        recent_requests = [
            (ts, duration) for ts, duration in self.request_times
            if now - ts < 60  # Last minute
        ]
        
        requests_per_second = len(recent_requests) / 60 if recent_requests else 0
        
        return Metrics(
            timestamp=now,
            cpu_percent=cpu_percent,
            memory_mb=memory_mb,
            active_tasks=active_tasks,
            requests_per_second=requests_per_second
        )

This method combines system metrics (CPU, memory) with application metrics (tasks, requests) for a complete performance picture.

Create a monitoring decorator:

# Monitoring decorator
def monitor_async_function(metrics_collector: AsyncMetricsCollector):
    """Decorator to monitor async function performance"""
    def decorator(func):
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            
            try:
                result = await func(*args, **kwargs)
                metrics_collector.record_task_completion(success=True)
                return result
            except Exception as e:
                metrics_collector.record_task_completion(success=False)
                raise
            finally:
                duration = time.time() - start_time
                metrics_collector.record_request(duration)
        
        return wrapper
    return decorator

Performance Alerting

Implement alerting for performance issues:

from dataclasses import dataclass

@dataclass
class Alert:
    level: str  # "warning", "critical"
    message: str
    metric_name: str
    current_value: float
    threshold: float

The Alert dataclass structures alert information for consistent handling across different alert channels.

Set up the alerting system:

class PerformanceAlerter:
    def __init__(self):
        self.thresholds = {}
        self.alert_handlers = []
    
    def add_threshold(self, metric_name: str, warning: float, critical: float):
        """Add performance threshold"""
        self.thresholds[metric_name] = {
            'warning': warning,
            'critical': critical
        }
    
    def add_alert_handler(self, handler):
        """Add alert handler function"""
        self.alert_handlers.append(handler)

The alerter supports multiple thresholds and handlers, making it flexible for different notification channels (email, Slack, logs).

Check metrics against thresholds:

    async def check_metrics(self, metrics: Metrics):
        """Check metrics against thresholds"""
        alerts = []
        
        # Check CPU usage
        if 'cpu_percent' in self.thresholds:
            alerts.extend(self._check_threshold('cpu_percent', metrics.cpu_percent))
        
        # Check memory usage
        if 'memory_mb' in self.thresholds:
            alerts.extend(self._check_threshold('memory_mb', metrics.memory_mb))
        
        # Send alerts
        for alert in alerts:
            await self._send_alert(alert)

The system checks each configured metric and generates alerts when thresholds are exceeded.

Implement threshold checking logic:

    def _check_threshold(self, metric_name: str, current_value: float):
        """Check if metric exceeds thresholds"""
        alerts = []
        thresholds = self.thresholds[metric_name]
        
        if current_value >= thresholds['critical']:
            alerts.append(Alert(
                level="critical",
                message=f"{metric_name} is critically high: {current_value:.2f}",
                metric_name=metric_name,
                current_value=current_value,
                threshold=thresholds['critical']
            ))
        elif current_value >= thresholds['warning']:
            alerts.append(Alert(
                level="warning",
                message=f"{metric_name} above warning: {current_value:.2f}",
                metric_name=metric_name,
                current_value=current_value,
                threshold=thresholds['warning']
            ))
        
        return alerts

Critical alerts take precedence over warnings. This prevents alert spam when metrics are extremely high.

Handle alert delivery:

    async def _send_alert(self, alert: Alert):
        """Send alert to all handlers"""
        for handler in self.alert_handlers:
            try:
                await handler(alert)
            except Exception as e:
                print(f"Alert handler failed: {e}")

# Alert handlers
async def log_alert(alert: Alert):
    """Log alert to console"""
    level_icon = "🚨" if alert.level == "critical" else "⚠️"
    print(f"{level_icon} {alert.level.upper()}: {alert.message}")

Complete Monitoring Demo

Put everything together:

async def monitoring_demo():
    """Demonstrate complete monitoring system"""
    collector = AsyncMetricsCollector()
    alerter = PerformanceAlerter()
    
    # Configure thresholds
    alerter.add_threshold('cpu_percent', warning=70.0, critical=90.0)
    alerter.add_threshold('memory_mb', warning=500.0, critical=1000.0)
    
    # Add alert handlers
    alerter.add_alert_handler(log_alert)

Set up the monitoring infrastructure with appropriate thresholds for your application’s normal operating parameters.

Create a monitored task for demonstration:

    @monitor_async_function(collector)
    async def sample_task(task_id: int):
        """Sample task with monitoring"""
        await asyncio.sleep(0.1)
        
        # Simulate occasional failures
        import random
        if random.random() < 0.1:
            raise Exception(f"Task {task_id} failed")
        
        return f"Task {task_id} completed"

The decorator automatically tracks execution time and success/failure rates for any async function.

Run the monitoring system:

    # Run monitored tasks
    for i in range(20):
        try:
            await sample_task(i)
        except Exception:
            pass
        
        # Check metrics periodically
        if i % 5 == 0:
            metrics = collector.get_current_metrics()
            await alerter.check_metrics(metrics)
            print(f"Metrics - CPU: {metrics.cpu_percent:.1f}%, "
                  f"Memory: {metrics.memory_mb:.1f}MB, "
                  f"Tasks: {metrics.active_tasks}")

asyncio.run(monitoring_demo())

Summary

Performance profiling and monitoring are essential for async applications:

Key Components

  • Profiling: Identify bottlenecks with cProfile and custom decorators
  • Event Loop Monitoring: Track loop health and responsiveness
  • Metrics Collection: Gather system and application metrics
  • Alerting: Notify on performance issues

Best Practices

  • Profile regularly to identify performance regressions
  • Monitor event loop responsiveness
  • Set appropriate thresholds for alerts
  • Collect both system and application metrics
  • Use sampling to avoid monitoring overhead

In Part 16, we’ll explore memory optimization and I/O optimization techniques.