Coroutines Fundamentals

Coroutines are functions that can pause and resume execution. They’re the building blocks of async programming, allowing the event loop to switch between tasks efficiently.

Understanding Coroutines

A coroutine is created with async def and can pause execution at await points:

import asyncio

async def simple_coroutine():
    print("Coroutine started")
    await asyncio.sleep(1)  # Pause here
    print("Coroutine resumed")
    return "Done"

# Running a coroutine
result = asyncio.run(simple_coroutine())
print(result)  # "Done"

Key insight: When the coroutine hits await asyncio.sleep(1), it pauses and lets other coroutines run. After 1 second, it resumes from exactly where it left off.

Coroutine States

Coroutines have different states during their lifecycle:

import inspect

async def example_coroutine():
    await asyncio.sleep(1)
    return "completed"

# Create but don't run the coroutine
coro = example_coroutine()
print(f"State: {inspect.getcoroutinestate(coro)}")  # CORO_CREATED

# Must run to see other states
asyncio.run(coro)

The states are:

  • CORO_CREATED: Just created, not started
  • CORO_RUNNING: Currently executing
  • CORO_SUSPENDED: Paused at an await point
  • CORO_CLOSED: Finished or cancelled

Coroutine Communication

Coroutines can pass data through return values:

async def fetch_user(user_id):
    await asyncio.sleep(0.5)  # Simulate API call
    return {
        "id": user_id,
        "name": f"User {user_id}",
        "email": f"user{user_id}@example.com"
    }

async def process_user(user_id):
    user = await fetch_user(user_id)
    user["processed"] = True
    return user

async def main():
    # Process multiple users concurrently
    user_ids = [1, 2, 3]
    tasks = [process_user(uid) for uid in user_ids]
    users = await asyncio.gather(*tasks)
    
    for user in users:
        print(f"Processed: {user['name']}")

asyncio.run(main())

Using Queues for Communication

Queues enable producer-consumer patterns:

async def producer(queue, items):
    """Produce items and put them in queue"""
    for item in items:
        await queue.put(item)
        print(f"📦 Produced: {item}")
        await asyncio.sleep(0.1)
    
    await queue.put(None)  # Signal completion

async def consumer(queue, name):
    """Consume items from queue"""
    while True:
        item = await queue.get()
        
        if item is None:  # End signal
            break
        
        print(f"Processing {name}: {item}")
        await asyncio.sleep(0.2)  # Simulate processing

async def queue_demo():
    queue = asyncio.Queue(maxsize=2)  # Bounded queue
    
    items = ["task-1", "task-2", "task-3"]
    
    await asyncio.gather(
        producer(queue, items),
        consumer(queue, "Worker-A")
    )

asyncio.run(queue_demo())

Async Context Managers

Combine coroutines with context managers for resource management:

class AsyncDatabase:
    def __init__(self, name):
        self.name = name
        self.connected = False
    
    async def __aenter__(self):
        print(f"🔌 Connecting to {self.name}")
        await asyncio.sleep(0.1)  # Simulate connection time
        self.connected = True
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"🔌 Disconnecting from {self.name}")
        await asyncio.sleep(0.05)  # Simulate cleanup time
        self.connected = False
    
    async def query(self, sql):
        if not self.connected:
            raise RuntimeError("Database not connected")
        
        await asyncio.sleep(0.2)  # Simulate query time
        return f"Results for: {sql}"

async def database_operations():
    """Use database with proper cleanup"""
    async with AsyncDatabase("UserDB") as db:
        result = await db.query("SELECT * FROM users")
        return result

result = asyncio.run(database_operations())
print(f"Query result: {result}")

Error Handling in Coroutines

Proper error handling is crucial in async code:

import random

async def unreliable_task(task_id):
    """Task that might fail"""
    await asyncio.sleep(0.5)
    
    if random.random() < 0.3:  # 30% chance of failure
        raise Exception(f"Task {task_id} failed!")
    
    return f"Task {task_id} succeeded"

async def robust_task_runner():
    """Run multiple tasks with proper error handling"""
    tasks = [unreliable_task(i) for i in range(5)]
    
    # Use return_exceptions to handle failures gracefully
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    successes = []
    failures = []
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            failures.append(f"Task {i}: {result}")
        else:
            successes.append(result)
    
    print(f"Successes: {len(successes)}")
    print(f"Failures: {len(failures)}")

asyncio.run(robust_task_runner())

Summary

Coroutines are the foundation of async programming:

Key Concepts

  • Pausable Functions: Can suspend and resume execution
  • Cooperative: Yield control voluntarily at await points
  • Stateful: Maintain state between suspensions
  • Composable: Can be combined and chained together

Best Practices

  • Use async with for resource management
  • Handle exceptions with return_exceptions=True
  • Use queues for inter-coroutine communication
  • Always await coroutines properly

Common Patterns

  • Producer-consumer with queues
  • Resource management with async context managers
  • Error handling with gather and return_exceptions

In Part 6, we’ll explore async generators and how they enable powerful streaming data patterns.