Advanced Communication Patterns
Async applications need sophisticated ways for coroutines to communicate and coordinate. Let’s explore events, queues, and other coordination mechanisms.
Communication with Events
Use events to coordinate between coroutines:
import asyncio
async def waiter(event, name, work_duration):
"""Wait for event, then do work"""
print(f"{name}: Waiting for event...")
await event.wait()
print(f"{name}: Event received, starting work")
await asyncio.sleep(work_duration)
print(f"{name}: Work completed")
async def event_setter(event, delay):
"""Set event after delay"""
print(f"Preparing to set event in {delay}s...")
await asyncio.sleep(delay)
print("Setting event for all waiters")
event.set()
async def event_demo():
"""Demonstrate event-based communication"""
event = asyncio.Event()
# Start multiple waiters and setter
await asyncio.gather(
waiter(event, "Worker-1", 1),
waiter(event, "Worker-2", 2),
event_setter(event, 2)
)
asyncio.run(event_demo())
Producer-Consumer with Queues
Implement robust producer-consumer patterns:
import random
async def producer(queue, items, producer_name):
"""Produce items with error handling"""
try:
for item in items:
await queue.put(item)
print(f"📦 {producer_name}: Produced {item}")
await asyncio.sleep(0.1)
# Signal completion
await queue.put(None)
print(f"🏁 {producer_name}: Production complete")
except Exception as e:
print(f"{producer_name}: Production failed - {e}")
await queue.put(None) # Still signal completion
async def consumer(queue, consumer_name):
"""Consume items with error handling"""
processed = 0
try:
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=5.0)
except asyncio.TimeoutError:
print(f"{consumer_name}: Timeout waiting for items")
break
if item is None: # End signal
print(f"🏁 {consumer_name}: Received end signal")
break
await asyncio.sleep(0.2) # Simulate processing
processed += 1
print(f"{consumer_name}: Processed {item}")
except Exception as e:
print(f"{consumer_name}: Consumer failed - {e}")
finally:
print(f"{consumer_name}: Processed {processed} items total")
async def producer_consumer_demo():
"""Robust producer-consumer with error handling"""
queue = asyncio.Queue(maxsize=5) # Bounded queue
items = [f"item-{i}" for i in range(10)]
# Start producer and consumers
await asyncio.gather(
producer(queue, items, "Producer-1"),
consumer(queue, "Consumer-A"),
consumer(queue, "Consumer-B"),
return_exceptions=True # Don't let one failure stop others
)
asyncio.run(producer_consumer_demo())
Semaphores for Resource Control
Control access to limited resources:
async def rate_limited_operations(urls, max_concurrent=5):
"""Limit concurrent operations"""
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(url):
async with semaphore:
return await fetch_url(url)
tasks = [fetch_with_limit(url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch_url(url):
await asyncio.sleep(0.5) # Simulate network request
return f"Data from {url}"
async def semaphore_demo():
"""Demonstrate semaphore usage"""
urls = [f"https://api.example.com/data/{i}" for i in range(10)]
print("Fetching URLs with concurrency limit...")
results = await rate_limited_operations(urls, max_concurrent=3)
print(f"Fetched {len(results)} URLs")
asyncio.run(semaphore_demo())
Locks for Shared Resources
Protect shared resources with async locks:
class AsyncCounter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self, worker_name):
"""Thread-safe increment"""
async with self.lock:
# Critical section
current = self.value
await asyncio.sleep(0.01) # Simulate some work
self.value = current + 1
print(f"{worker_name}: Incremented to {self.value}")
async def worker(counter, worker_name, iterations):
"""Worker that increments counter"""
for i in range(iterations):
await counter.increment(f"{worker_name}-{i}")
await asyncio.sleep(0.05)
async def lock_demo():
"""Demonstrate async lock usage"""
counter = AsyncCounter()
# Start multiple workers
await asyncio.gather(
worker(counter, "Worker-A", 5),
worker(counter, "Worker-B", 5)
)
print(f"Final counter value: {counter.value}")
asyncio.run(lock_demo())
Barriers for Synchronization
Synchronize multiple coroutines at checkpoints:
async def worker_with_barrier(barrier, worker_name, work_time):
"""Worker that synchronizes at barrier"""
# Phase 1: Individual work
print(f"{worker_name}: Starting phase 1")
await asyncio.sleep(work_time)
print(f"{worker_name}: Phase 1 complete")
# Synchronization point
print(f"{worker_name}: Waiting at barrier")
await barrier.wait()
# Phase 2: Coordinated work
print(f"{worker_name}: Starting phase 2")
await asyncio.sleep(0.5)
print(f"{worker_name}: Phase 2 complete")
async def barrier_demo():
"""Demonstrate barrier synchronization"""
# Create barrier for 3 workers
barrier = asyncio.Barrier(3)
# Start workers with different work times
await asyncio.gather(
worker_with_barrier(barrier, "Worker-A", 1.0),
worker_with_barrier(barrier, "Worker-B", 2.0),
worker_with_barrier(barrier, "Worker-C", 1.5)
)
asyncio.run(barrier_demo())
Summary
Advanced communication patterns enable sophisticated coordination:
Key Mechanisms
- Events: Simple signaling between coroutines
- Queues: Producer-consumer patterns with backpressure
- Semaphores: Control access to limited resources
- Locks: Protect shared resources from race conditions
- Barriers: Synchronize multiple coroutines at checkpoints
Best Practices
- Use appropriate synchronization primitives for each scenario
- Handle timeouts in queue operations
- Implement proper error handling in producers and consumers
- Consider backpressure in high-throughput scenarios
Common Patterns
- Rate limiting with semaphores
- Shared resource protection with locks
- Phase synchronization with barriers
- Event-driven coordination
In Part 9, we’ll explore building web APIs that leverage these communication patterns.