Coroutines Fundamentals
Coroutines are functions that can pause and resume execution. They’re the building blocks of async programming, allowing the event loop to switch between tasks efficiently.
Understanding Coroutines
A coroutine is created with async def
and can pause execution at await
points:
import asyncio
async def simple_coroutine():
print("Coroutine started")
await asyncio.sleep(1) # Pause here
print("Coroutine resumed")
return "Done"
# Running a coroutine
result = asyncio.run(simple_coroutine())
print(result) # "Done"
Key insight: When the coroutine hits await asyncio.sleep(1)
, it pauses and lets other coroutines run. After 1 second, it resumes from exactly where it left off.
Coroutine States
Coroutines have different states during their lifecycle:
import inspect
async def example_coroutine():
await asyncio.sleep(1)
return "completed"
# Create but don't run the coroutine
coro = example_coroutine()
print(f"State: {inspect.getcoroutinestate(coro)}") # CORO_CREATED
# Must run to see other states
asyncio.run(coro)
The states are:
- CORO_CREATED: Just created, not started
- CORO_RUNNING: Currently executing
- CORO_SUSPENDED: Paused at an await point
- CORO_CLOSED: Finished or cancelled
Coroutine Communication
Coroutines can pass data through return values:
async def fetch_user(user_id):
await asyncio.sleep(0.5) # Simulate API call
return {
"id": user_id,
"name": f"User {user_id}",
"email": f"user{user_id}@example.com"
}
async def process_user(user_id):
user = await fetch_user(user_id)
user["processed"] = True
return user
async def main():
# Process multiple users concurrently
user_ids = [1, 2, 3]
tasks = [process_user(uid) for uid in user_ids]
users = await asyncio.gather(*tasks)
for user in users:
print(f"Processed: {user['name']}")
asyncio.run(main())
Using Queues for Communication
Queues enable producer-consumer patterns:
async def producer(queue, items):
"""Produce items and put them in queue"""
for item in items:
await queue.put(item)
print(f"📦 Produced: {item}")
await asyncio.sleep(0.1)
await queue.put(None) # Signal completion
async def consumer(queue, name):
"""Consume items from queue"""
while True:
item = await queue.get()
if item is None: # End signal
break
print(f"Processing {name}: {item}")
await asyncio.sleep(0.2) # Simulate processing
async def queue_demo():
queue = asyncio.Queue(maxsize=2) # Bounded queue
items = ["task-1", "task-2", "task-3"]
await asyncio.gather(
producer(queue, items),
consumer(queue, "Worker-A")
)
asyncio.run(queue_demo())
Async Context Managers
Combine coroutines with context managers for resource management:
class AsyncDatabase:
def __init__(self, name):
self.name = name
self.connected = False
async def __aenter__(self):
print(f"🔌 Connecting to {self.name}")
await asyncio.sleep(0.1) # Simulate connection time
self.connected = True
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"🔌 Disconnecting from {self.name}")
await asyncio.sleep(0.05) # Simulate cleanup time
self.connected = False
async def query(self, sql):
if not self.connected:
raise RuntimeError("Database not connected")
await asyncio.sleep(0.2) # Simulate query time
return f"Results for: {sql}"
async def database_operations():
"""Use database with proper cleanup"""
async with AsyncDatabase("UserDB") as db:
result = await db.query("SELECT * FROM users")
return result
result = asyncio.run(database_operations())
print(f"Query result: {result}")
Error Handling in Coroutines
Proper error handling is crucial in async code:
import random
async def unreliable_task(task_id):
"""Task that might fail"""
await asyncio.sleep(0.5)
if random.random() < 0.3: # 30% chance of failure
raise Exception(f"Task {task_id} failed!")
return f"Task {task_id} succeeded"
async def robust_task_runner():
"""Run multiple tasks with proper error handling"""
tasks = [unreliable_task(i) for i in range(5)]
# Use return_exceptions to handle failures gracefully
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = []
failures = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failures.append(f"Task {i}: {result}")
else:
successes.append(result)
print(f"Successes: {len(successes)}")
print(f"Failures: {len(failures)}")
asyncio.run(robust_task_runner())
Summary
Coroutines are the foundation of async programming:
Key Concepts
- Pausable Functions: Can suspend and resume execution
- Cooperative: Yield control voluntarily at
await
points - Stateful: Maintain state between suspensions
- Composable: Can be combined and chained together
Best Practices
- Use
async with
for resource management - Handle exceptions with
return_exceptions=True
- Use queues for inter-coroutine communication
- Always await coroutines properly
Common Patterns
- Producer-consumer with queues
- Resource management with async context managers
- Error handling with gather and return_exceptions
In Part 6, we’ll explore async generators and how they enable powerful streaming data patterns.