Error Handling Basics
Robust async applications need sophisticated error handling. When multiple coroutines run concurrently, failures can cascade quickly without proper safeguards.
Exception Propagation in Async Code
Exceptions in async code behave differently than synchronous code:
import asyncio
import random
async def risky_task(task_id):
"""Task that might fail"""
await asyncio.sleep(0.5)
if random.random() < 0.3: # 30% failure rate
raise ValueError(f"Task {task_id} failed!")
return f"Task {task_id} completed"
async def basic_error_handling():
"""Basic error handling with try/except"""
for i in range(5):
try:
result = await risky_task(i)
print(f"Success: {result}")
except ValueError as e:
print(f"Error: {e}")
asyncio.run(basic_error_handling())
Handling Multiple Task Failures
When running multiple tasks concurrently, use return_exceptions=True
:
async def concurrent_error_handling():
"""Handle errors in concurrent tasks"""
tasks = [risky_task(i) for i in range(10)]
# Gather all results, including exceptions
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = []
failures = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failures.append(f"Task {i}: {result}")
else:
successes.append(result)
print(f"Successful tasks: {len(successes)}")
print(f"Failed tasks: {len(failures)}")
# Continue processing successful results
for success in successes:
print(f"Processing: {success}")
asyncio.run(concurrent_error_handling())
Task Cancellation and Cleanup
Properly cancel tasks and clean up resources:
async def long_running_task(name, duration):
"""Task that can be cancelled"""
try:
print(f"{name}: Starting work for {duration}s")
await asyncio.sleep(duration)
print(f"{name}: Work completed")
return f"{name} result"
except asyncio.CancelledError:
print(f"{name}: Task was cancelled, cleaning up...")
# Perform cleanup here
await asyncio.sleep(0.1) # Simulate cleanup
print(f"{name}: Cleanup complete")
raise # Re-raise to properly cancel
async def cancellation_demo():
"""Demonstrate task cancellation"""
# Start multiple tasks
tasks = [
asyncio.create_task(long_running_task("Task-A", 3)),
asyncio.create_task(long_running_task("Task-B", 5)),
asyncio.create_task(long_running_task("Task-C", 2))
]
try:
# Wait for 2.5 seconds, then cancel remaining tasks
await asyncio.sleep(2.5)
print("🛑 Cancelling remaining tasks...")
for task in tasks:
if not task.done():
task.cancel()
# Wait for all tasks to finish (including cancellation)
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, asyncio.CancelledError):
print(f"Task {i}: Cancelled")
elif isinstance(result, Exception):
print(f"Task {i}: Failed - {result}")
else:
print(f"Task {i}: Completed - {result}")
except KeyboardInterrupt:
print("Received interrupt, cancelling all tasks...")
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
asyncio.run(cancellation_demo())
Timeout Management
Handle timeouts gracefully across your application:
async def timeout_wrapper(coroutine, timeout_seconds, operation_name="Operation"):
"""Wrapper for timeout handling"""
try:
return await asyncio.wait_for(coroutine, timeout=timeout_seconds)
except asyncio.TimeoutError:
print(f"{operation_name} timed out after {timeout_seconds}s")
raise
except Exception as e:
print(f"{operation_name} failed: {e}")
raise
async def slow_operation():
"""Simulate slow operation"""
await asyncio.sleep(3)
return {"users": ["alice", "bob", "charlie"]}
async def timeout_demo():
"""Demonstrate timeout handling"""
operations = [
(slow_operation(), 2.5, "Database Query"),
(slow_operation(), 1.5, "API Call"),
(slow_operation(), 4.0, "File Processing")
]
for coroutine, timeout, name in operations:
try:
result = await timeout_wrapper(coroutine, timeout, name)
print(f"{name}: Success")
except asyncio.TimeoutError:
print(f"{name}: Timed out")
except Exception as e:
print(f"{name}: Failed - {e}")
asyncio.run(timeout_demo())
Retry with Exponential Backoff
Implement sophisticated retry mechanisms:
async def retry_with_backoff(
coroutine_func,
max_retries=3,
base_delay=1,
backoff_factor=2
):
"""Retry with exponential backoff"""
for attempt in range(max_retries + 1):
try:
return await coroutine_func()
except Exception as e:
if attempt == max_retries:
print(f"🚫 All {max_retries + 1} attempts failed")
raise e
# Calculate delay with exponential backoff
delay = base_delay * (backoff_factor ** attempt)
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {delay:.1f}s...")
await asyncio.sleep(delay)
async def flaky_api_call():
"""API call that fails randomly"""
await asyncio.sleep(0.2)
if random.random() < 0.6: # 60% failure rate
raise Exception("API temporarily unavailable")
return {"status": "success", "data": "API response"}
async def retry_demo():
"""Demonstrate retry with backoff"""
try:
result = await retry_with_backoff(
flaky_api_call,
max_retries=4,
base_delay=0.5,
backoff_factor=2
)
print(f"Success: {result}")
except Exception as e:
print(f"💥 Final failure: {e}")
asyncio.run(retry_demo())
Summary
Basic error handling in async applications:
Key Patterns
- Exception Handling: Use
return_exceptions=True
for concurrent tasks - Task Cancellation: Implement proper cleanup in cancelled tasks
- Timeouts: Set appropriate timeouts for all operations
- Retry Logic: Implement exponential backoff for transient failures
Best Practices
- Always handle exceptions in concurrent tasks
- Implement timeouts for external operations
- Use proper cleanup in cancellation handlers
- Plan retry strategies for unreliable services
Common Patterns
- Timeout wrappers for operations
- Retry mechanisms with backoff
- Graceful task cancellation
- Exception aggregation in concurrent operations
In Part 8, we’ll explore advanced communication patterns including events, queues, and coordination mechanisms.