Advanced Communication Patterns

Async applications need sophisticated ways for coroutines to communicate and coordinate. Let’s explore events, queues, and other coordination mechanisms.

Communication with Events

Use events to coordinate between coroutines:

import asyncio

async def waiter(event, name, work_duration):
    """Wait for event, then do work"""
    print(f"{name}: Waiting for event...")
    await event.wait()
    
    print(f"{name}: Event received, starting work")
    await asyncio.sleep(work_duration)
    print(f"{name}: Work completed")

async def event_setter(event, delay):
    """Set event after delay"""
    print(f"Preparing to set event in {delay}s...")
    await asyncio.sleep(delay)
    
    print("Setting event for all waiters")
    event.set()

async def event_demo():
    """Demonstrate event-based communication"""
    event = asyncio.Event()
    
    # Start multiple waiters and setter
    await asyncio.gather(
        waiter(event, "Worker-1", 1),
        waiter(event, "Worker-2", 2),
        event_setter(event, 2)
    )

asyncio.run(event_demo())

Producer-Consumer with Queues

Implement robust producer-consumer patterns:

import random

async def producer(queue, items, producer_name):
    """Produce items with error handling"""
    try:
        for item in items:
            await queue.put(item)
            print(f"📦 {producer_name}: Produced {item}")
            await asyncio.sleep(0.1)
        
        # Signal completion
        await queue.put(None)
        print(f"🏁 {producer_name}: Production complete")
        
    except Exception as e:
        print(f"{producer_name}: Production failed - {e}")
        await queue.put(None)  # Still signal completion

async def consumer(queue, consumer_name):
    """Consume items with error handling"""
    processed = 0
    
    try:
        while True:
            try:
                item = await asyncio.wait_for(queue.get(), timeout=5.0)
            except asyncio.TimeoutError:
                print(f"{consumer_name}: Timeout waiting for items")
                break
            
            if item is None:  # End signal
                print(f"🏁 {consumer_name}: Received end signal")
                break
            
            await asyncio.sleep(0.2)  # Simulate processing
            processed += 1
            print(f"{consumer_name}: Processed {item}")
    
    except Exception as e:
        print(f"{consumer_name}: Consumer failed - {e}")
    
    finally:
        print(f"{consumer_name}: Processed {processed} items total")

async def producer_consumer_demo():
    """Robust producer-consumer with error handling"""
    queue = asyncio.Queue(maxsize=5)  # Bounded queue
    
    items = [f"item-{i}" for i in range(10)]
    
    # Start producer and consumers
    await asyncio.gather(
        producer(queue, items, "Producer-1"),
        consumer(queue, "Consumer-A"),
        consumer(queue, "Consumer-B"),
        return_exceptions=True  # Don't let one failure stop others
    )

asyncio.run(producer_consumer_demo())

Semaphores for Resource Control

Control access to limited resources:

async def rate_limited_operations(urls, max_concurrent=5):
    """Limit concurrent operations"""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_with_limit(url):
        async with semaphore:
            return await fetch_url(url)
    
    tasks = [fetch_with_limit(url) for url in urls]
    return await asyncio.gather(*tasks)

async def fetch_url(url):
    await asyncio.sleep(0.5)  # Simulate network request
    return f"Data from {url}"

async def semaphore_demo():
    """Demonstrate semaphore usage"""
    urls = [f"https://api.example.com/data/{i}" for i in range(10)]
    
    print("Fetching URLs with concurrency limit...")
    results = await rate_limited_operations(urls, max_concurrent=3)
    
    print(f"Fetched {len(results)} URLs")

asyncio.run(semaphore_demo())

Locks for Shared Resources

Protect shared resources with async locks:

class AsyncCounter:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()
    
    async def increment(self, worker_name):
        """Thread-safe increment"""
        async with self.lock:
            # Critical section
            current = self.value
            await asyncio.sleep(0.01)  # Simulate some work
            self.value = current + 1
            print(f"{worker_name}: Incremented to {self.value}")

async def worker(counter, worker_name, iterations):
    """Worker that increments counter"""
    for i in range(iterations):
        await counter.increment(f"{worker_name}-{i}")
        await asyncio.sleep(0.05)

async def lock_demo():
    """Demonstrate async lock usage"""
    counter = AsyncCounter()
    
    # Start multiple workers
    await asyncio.gather(
        worker(counter, "Worker-A", 5),
        worker(counter, "Worker-B", 5)
    )
    
    print(f"Final counter value: {counter.value}")

asyncio.run(lock_demo())

Barriers for Synchronization

Synchronize multiple coroutines at checkpoints:

async def worker_with_barrier(barrier, worker_name, work_time):
    """Worker that synchronizes at barrier"""
    
    # Phase 1: Individual work
    print(f"{worker_name}: Starting phase 1")
    await asyncio.sleep(work_time)
    print(f"{worker_name}: Phase 1 complete")
    
    # Synchronization point
    print(f"{worker_name}: Waiting at barrier")
    await barrier.wait()
    
    # Phase 2: Coordinated work
    print(f"{worker_name}: Starting phase 2")
    await asyncio.sleep(0.5)
    print(f"{worker_name}: Phase 2 complete")

async def barrier_demo():
    """Demonstrate barrier synchronization"""
    # Create barrier for 3 workers
    barrier = asyncio.Barrier(3)
    
    # Start workers with different work times
    await asyncio.gather(
        worker_with_barrier(barrier, "Worker-A", 1.0),
        worker_with_barrier(barrier, "Worker-B", 2.0),
        worker_with_barrier(barrier, "Worker-C", 1.5)
    )

asyncio.run(barrier_demo())

Summary

Advanced communication patterns enable sophisticated coordination:

Key Mechanisms

  • Events: Simple signaling between coroutines
  • Queues: Producer-consumer patterns with backpressure
  • Semaphores: Control access to limited resources
  • Locks: Protect shared resources from race conditions
  • Barriers: Synchronize multiple coroutines at checkpoints

Best Practices

  • Use appropriate synchronization primitives for each scenario
  • Handle timeouts in queue operations
  • Implement proper error handling in producers and consumers
  • Consider backpressure in high-throughput scenarios

Common Patterns

  • Rate limiting with semaphores
  • Shared resource protection with locks
  • Phase synchronization with barriers
  • Event-driven coordination

In Part 9, we’ll explore building web APIs that leverage these communication patterns.