Error Handling Basics

Robust async applications need sophisticated error handling. When multiple coroutines run concurrently, failures can cascade quickly without proper safeguards.

Exception Propagation in Async Code

Exceptions in async code behave differently than synchronous code:

import asyncio
import random

async def risky_task(task_id):
    """Task that might fail"""
    await asyncio.sleep(0.5)
    
    if random.random() < 0.3:  # 30% failure rate
        raise ValueError(f"Task {task_id} failed!")
    
    return f"Task {task_id} completed"

async def basic_error_handling():
    """Basic error handling with try/except"""
    for i in range(5):
        try:
            result = await risky_task(i)
            print(f"Success: {result}")
        except ValueError as e:
            print(f"Error: {e}")

asyncio.run(basic_error_handling())

Handling Multiple Task Failures

When running multiple tasks concurrently, use return_exceptions=True:

async def concurrent_error_handling():
    """Handle errors in concurrent tasks"""
    tasks = [risky_task(i) for i in range(10)]
    
    # Gather all results, including exceptions
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    successes = []
    failures = []
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            failures.append(f"Task {i}: {result}")
        else:
            successes.append(result)
    
    print(f"Successful tasks: {len(successes)}")
    print(f"Failed tasks: {len(failures)}")
    
    # Continue processing successful results
    for success in successes:
        print(f"Processing: {success}")

asyncio.run(concurrent_error_handling())

Task Cancellation and Cleanup

Properly cancel tasks and clean up resources:

async def long_running_task(name, duration):
    """Task that can be cancelled"""
    try:
        print(f"{name}: Starting work for {duration}s")
        await asyncio.sleep(duration)
        print(f"{name}: Work completed")
        return f"{name} result"
    except asyncio.CancelledError:
        print(f"{name}: Task was cancelled, cleaning up...")
        # Perform cleanup here
        await asyncio.sleep(0.1)  # Simulate cleanup
        print(f"{name}: Cleanup complete")
        raise  # Re-raise to properly cancel

async def cancellation_demo():
    """Demonstrate task cancellation"""
    # Start multiple tasks
    tasks = [
        asyncio.create_task(long_running_task("Task-A", 3)),
        asyncio.create_task(long_running_task("Task-B", 5)),
        asyncio.create_task(long_running_task("Task-C", 2))
    ]
    
    try:
        # Wait for 2.5 seconds, then cancel remaining tasks
        await asyncio.sleep(2.5)
        
        print("🛑 Cancelling remaining tasks...")
        for task in tasks:
            if not task.done():
                task.cancel()
        
        # Wait for all tasks to finish (including cancellation)
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for i, result in enumerate(results):
            if isinstance(result, asyncio.CancelledError):
                print(f"Task {i}: Cancelled")
            elif isinstance(result, Exception):
                print(f"Task {i}: Failed - {result}")
            else:
                print(f"Task {i}: Completed - {result}")
                
    except KeyboardInterrupt:
        print("Received interrupt, cancelling all tasks...")
        for task in tasks:
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)

asyncio.run(cancellation_demo())

Timeout Management

Handle timeouts gracefully across your application:

async def timeout_wrapper(coroutine, timeout_seconds, operation_name="Operation"):
    """Wrapper for timeout handling"""
    try:
        return await asyncio.wait_for(coroutine, timeout=timeout_seconds)
    except asyncio.TimeoutError:
        print(f"{operation_name} timed out after {timeout_seconds}s")
        raise
    except Exception as e:
        print(f"{operation_name} failed: {e}")
        raise

async def slow_operation():
    """Simulate slow operation"""
    await asyncio.sleep(3)
    return {"users": ["alice", "bob", "charlie"]}

async def timeout_demo():
    """Demonstrate timeout handling"""
    operations = [
        (slow_operation(), 2.5, "Database Query"),
        (slow_operation(), 1.5, "API Call"),
        (slow_operation(), 4.0, "File Processing")
    ]
    
    for coroutine, timeout, name in operations:
        try:
            result = await timeout_wrapper(coroutine, timeout, name)
            print(f"{name}: Success")
        except asyncio.TimeoutError:
            print(f"{name}: Timed out")
        except Exception as e:
            print(f"{name}: Failed - {e}")

asyncio.run(timeout_demo())

Retry with Exponential Backoff

Implement sophisticated retry mechanisms:

async def retry_with_backoff(
    coroutine_func, 
    max_retries=3, 
    base_delay=1, 
    backoff_factor=2
):
    """Retry with exponential backoff"""
    
    for attempt in range(max_retries + 1):
        try:
            return await coroutine_func()
        
        except Exception as e:
            if attempt == max_retries:
                print(f"🚫 All {max_retries + 1} attempts failed")
                raise e
            
            # Calculate delay with exponential backoff
            delay = base_delay * (backoff_factor ** attempt)
            
            print(f"Attempt {attempt + 1} failed: {e}")
            print(f"Retrying in {delay:.1f}s...")
            
            await asyncio.sleep(delay)

async def flaky_api_call():
    """API call that fails randomly"""
    await asyncio.sleep(0.2)
    
    if random.random() < 0.6:  # 60% failure rate
        raise Exception("API temporarily unavailable")
    
    return {"status": "success", "data": "API response"}

async def retry_demo():
    """Demonstrate retry with backoff"""
    try:
        result = await retry_with_backoff(
            flaky_api_call,
            max_retries=4,
            base_delay=0.5,
            backoff_factor=2
        )
        print(f"Success: {result}")
    
    except Exception as e:
        print(f"💥 Final failure: {e}")

asyncio.run(retry_demo())

Summary

Basic error handling in async applications:

Key Patterns

  • Exception Handling: Use return_exceptions=True for concurrent tasks
  • Task Cancellation: Implement proper cleanup in cancelled tasks
  • Timeouts: Set appropriate timeouts for all operations
  • Retry Logic: Implement exponential backoff for transient failures

Best Practices

  • Always handle exceptions in concurrent tasks
  • Implement timeouts for external operations
  • Use proper cleanup in cancellation handlers
  • Plan retry strategies for unreliable services

Common Patterns

  • Timeout wrappers for operations
  • Retry mechanisms with backoff
  • Graceful task cancellation
  • Exception aggregation in concurrent operations

In Part 8, we’ll explore advanced communication patterns including events, queues, and coordination mechanisms.