State Machines and Observer Patterns
Complex async applications need sophisticated coordination mechanisms. Let’s build state machines and observer patterns step by step.
Basic State Machine Structure
Start with a simple state machine foundation:
from enum import Enum
import asyncio
class OrderState(Enum):
PENDING = "PENDING"
PROCESSING = "PROCESSING"
SHIPPED = "SHIPPED"
DELIVERED = "DELIVERED"
class AsyncStateMachine:
def __init__(self, initial_state):
self.current_state = initial_state
self.transitions = {}
self.lock = asyncio.Lock()
def add_transition(self, from_state, event, to_state, handler=None):
"""Register a state transition"""
self.transitions[(from_state, event)] = (to_state, handler)
This creates the basic structure for managing states and transitions safely.
Implementing State Transitions
Add the transition logic with proper error handling:
async def trigger(self, event: str, **kwargs):
"""Trigger a state transition"""
async with self.lock:
key = (self.current_state, event)
if key not in self.transitions:
raise ValueError(f"No transition from {self.current_state} on '{event}'")
new_state, handler = self.transitions[key]
# Execute transition handler if exists
if handler:
await handler(self.current_state, new_state, **kwargs)
# Update state
old_state = self.current_state
self.current_state = new_state
print(f"State transition: {old_state.value} -> {new_state.value}")
return new_state
The lock ensures thread-safe state transitions in concurrent environments.
Order Processing Example
Create a practical order processing state machine:
class OrderProcessor:
def __init__(self, order_id: str):
self.order_id = order_id
self.state_machine = AsyncStateMachine(OrderState.PENDING)
self._setup_transitions()
def _setup_transitions(self):
"""Configure valid state transitions"""
self.state_machine.add_transition(
OrderState.PENDING, "process", OrderState.PROCESSING,
self._handle_process
)
self.state_machine.add_transition(
OrderState.PROCESSING, "ship", OrderState.SHIPPED,
self._handle_ship
)
self.state_machine.add_transition(
OrderState.SHIPPED, "deliver", OrderState.DELIVERED,
self._handle_deliver
)
This defines the valid flow: pending → processing → shipped → delivered.
Transition Handlers
Implement the actual business logic for each transition:
async def _handle_process(self, from_state, to_state, **kwargs):
"""Handle order processing"""
print(f"📦 Processing order {self.order_id}")
await asyncio.sleep(1) # Simulate processing time
async def _handle_ship(self, from_state, to_state, **kwargs):
"""Handle order shipping"""
tracking = kwargs.get("tracking_number", "TRK123")
print(f"🚚 Shipping order {self.order_id} (Tracking: {tracking})")
await asyncio.sleep(0.5)
async def _handle_deliver(self, from_state, to_state, **kwargs):
"""Handle order delivery"""
print(f"📬 Delivering order {self.order_id}")
await asyncio.sleep(0.3)
# Public methods for triggering transitions
async def process_order(self):
await self.state_machine.trigger("process")
async def ship_order(self, tracking_number=None):
await self.state_machine.trigger("ship", tracking_number=tracking_number)
async def deliver_order(self):
await self.state_machine.trigger("deliver")
Each handler performs the actual work and can accept additional parameters.
Basic Observer Pattern
Create a simple event emitter for the observer pattern:
from typing import List, Callable
class AsyncEventEmitter:
def __init__(self):
self.listeners = {} # event_name -> list of functions
def on(self, event_name: str, listener: Callable):
"""Register event listener"""
if event_name not in self.listeners:
self.listeners[event_name] = []
self.listeners[event_name].append(listener)
async def emit(self, event_name: str, *args, **kwargs):
"""Emit event to all listeners"""
if event_name not in self.listeners:
return
# Run all listeners concurrently
tasks = [
listener(*args, **kwargs)
for listener in self.listeners[event_name]
]
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
This allows multiple components to react to events without tight coupling.
User Service with Events
Implement a service that emits events for important actions:
class UserService:
def __init__(self):
self.events = AsyncEventEmitter()
async def create_user(self, user_data: dict):
"""Create user and emit events"""
user = {"id": 123, **user_data}
# Emit event for other components to handle
await self.events.emit("user_created", user)
return user
async def update_user(self, user_id: int, updates: dict):
"""Update user and emit events"""
user = {"id": user_id, **updates}
await self.events.emit("user_updated", user)
return user
Services emit events without knowing who will handle them.
Event Listeners
Create focused event handlers for different concerns:
async def send_welcome_email(user: dict):
"""Send welcome email when user is created"""
print(f"📧 Sending welcome email to {user.get('email')}")
await asyncio.sleep(0.5) # Simulate email sending
print(f"Email sent to {user.get('email')}")
async def create_user_profile(user: dict):
"""Create user profile when user is created"""
print(f"👤 Creating profile for user {user['id']}")
await asyncio.sleep(0.3) # Simulate profile creation
print(f"Profile created for user {user['id']}")
async def log_user_activity(user: dict):
"""Log user activity"""
print(f"📝 Logging activity for user {user['id']}")
await asyncio.sleep(0.1) # Simulate logging
print(f"Activity logged")
Each listener handles one specific concern and can run independently.
Putting It All Together
Wire up the complete system:
async def demo_state_machine():
"""Demonstrate state machine"""
order = OrderProcessor("ORD-12345")
print(f"Initial state: {order.state_machine.current_state.value}")
await order.process_order()
await order.ship_order(tracking_number="TRK789")
await order.deliver_order()
print(f"Final state: {order.state_machine.current_state.value}")
async def demo_observer_pattern():
"""Demonstrate observer pattern"""
user_service = UserService()
# Register event listeners
user_service.events.on("user_created", send_welcome_email)
user_service.events.on("user_created", create_user_profile)
user_service.events.on("user_created", log_user_activity)
# Create user - triggers all listeners
user = await user_service.create_user({
"name": "John Doe",
"email": "[email protected]"
})
print(f"User created: {user}")
# Run demonstrations
asyncio.run(demo_state_machine())
asyncio.run(demo_observer_pattern())
This shows how both patterns work together in a complete system.
Summary
State machines and observer patterns provide powerful coordination:
State Machines
- Clear State Management: Explicit transitions and validation
- Business Logic: Handlers contain the actual work
- Thread Safety: Locks prevent race conditions
- Auditability: Track state changes easily
Observer Pattern
- Loose Coupling: Components don’t know about each other
- Extensibility: Easy to add new event handlers
- Concurrent Execution: All listeners run simultaneously
- Event-Driven: React to changes as they happen
Best Practices
- Keep transition handlers focused and simple
- Use events for cross-cutting concerns
- Implement proper error handling in listeners
- Design states and events to match business processes
These patterns enable building complex, maintainable async applications with clear separation of concerns.