State Machines and Observer Patterns

Complex async applications need sophisticated coordination mechanisms. Let’s build state machines and observer patterns step by step.

Basic State Machine Structure

Start with a simple state machine foundation:

from enum import Enum
import asyncio

class OrderState(Enum):
    PENDING = "PENDING"
    PROCESSING = "PROCESSING"
    SHIPPED = "SHIPPED"
    DELIVERED = "DELIVERED"

class AsyncStateMachine:
    def __init__(self, initial_state):
        self.current_state = initial_state
        self.transitions = {}
        self.lock = asyncio.Lock()
    
    def add_transition(self, from_state, event, to_state, handler=None):
        """Register a state transition"""
        self.transitions[(from_state, event)] = (to_state, handler)

This creates the basic structure for managing states and transitions safely.

Implementing State Transitions

Add the transition logic with proper error handling:

async def trigger(self, event: str, **kwargs):
    """Trigger a state transition"""
    async with self.lock:
        key = (self.current_state, event)
        
        if key not in self.transitions:
            raise ValueError(f"No transition from {self.current_state} on '{event}'")
        
        new_state, handler = self.transitions[key]
        
        # Execute transition handler if exists
        if handler:
            await handler(self.current_state, new_state, **kwargs)
        
        # Update state
        old_state = self.current_state
        self.current_state = new_state
        
        print(f"State transition: {old_state.value} -> {new_state.value}")
        return new_state

The lock ensures thread-safe state transitions in concurrent environments.

Order Processing Example

Create a practical order processing state machine:

class OrderProcessor:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.state_machine = AsyncStateMachine(OrderState.PENDING)
        self._setup_transitions()
    
    def _setup_transitions(self):
        """Configure valid state transitions"""
        self.state_machine.add_transition(
            OrderState.PENDING, "process", OrderState.PROCESSING,
            self._handle_process
        )
        self.state_machine.add_transition(
            OrderState.PROCESSING, "ship", OrderState.SHIPPED,
            self._handle_ship
        )
        self.state_machine.add_transition(
            OrderState.SHIPPED, "deliver", OrderState.DELIVERED,
            self._handle_deliver
        )

This defines the valid flow: pending → processing → shipped → delivered.

Transition Handlers

Implement the actual business logic for each transition:

async def _handle_process(self, from_state, to_state, **kwargs):
    """Handle order processing"""
    print(f"📦 Processing order {self.order_id}")
    await asyncio.sleep(1)  # Simulate processing time

async def _handle_ship(self, from_state, to_state, **kwargs):
    """Handle order shipping"""
    tracking = kwargs.get("tracking_number", "TRK123")
    print(f"🚚 Shipping order {self.order_id} (Tracking: {tracking})")
    await asyncio.sleep(0.5)

async def _handle_deliver(self, from_state, to_state, **kwargs):
    """Handle order delivery"""
    print(f"📬 Delivering order {self.order_id}")
    await asyncio.sleep(0.3)

# Public methods for triggering transitions
async def process_order(self):
    await self.state_machine.trigger("process")

async def ship_order(self, tracking_number=None):
    await self.state_machine.trigger("ship", tracking_number=tracking_number)

async def deliver_order(self):
    await self.state_machine.trigger("deliver")

Each handler performs the actual work and can accept additional parameters.

Basic Observer Pattern

Create a simple event emitter for the observer pattern:

from typing import List, Callable

class AsyncEventEmitter:
    def __init__(self):
        self.listeners = {}  # event_name -> list of functions
    
    def on(self, event_name: str, listener: Callable):
        """Register event listener"""
        if event_name not in self.listeners:
            self.listeners[event_name] = []
        self.listeners[event_name].append(listener)
    
    async def emit(self, event_name: str, *args, **kwargs):
        """Emit event to all listeners"""
        if event_name not in self.listeners:
            return
        
        # Run all listeners concurrently
        tasks = [
            listener(*args, **kwargs) 
            for listener in self.listeners[event_name]
        ]
        
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)

This allows multiple components to react to events without tight coupling.

User Service with Events

Implement a service that emits events for important actions:

class UserService:
    def __init__(self):
        self.events = AsyncEventEmitter()
    
    async def create_user(self, user_data: dict):
        """Create user and emit events"""
        user = {"id": 123, **user_data}
        
        # Emit event for other components to handle
        await self.events.emit("user_created", user)
        
        return user
    
    async def update_user(self, user_id: int, updates: dict):
        """Update user and emit events"""
        user = {"id": user_id, **updates}
        
        await self.events.emit("user_updated", user)
        
        return user

Services emit events without knowing who will handle them.

Event Listeners

Create focused event handlers for different concerns:

async def send_welcome_email(user: dict):
    """Send welcome email when user is created"""
    print(f"📧 Sending welcome email to {user.get('email')}")
    await asyncio.sleep(0.5)  # Simulate email sending
    print(f"Email sent to {user.get('email')}")

async def create_user_profile(user: dict):
    """Create user profile when user is created"""
    print(f"👤 Creating profile for user {user['id']}")
    await asyncio.sleep(0.3)  # Simulate profile creation
    print(f"Profile created for user {user['id']}")

async def log_user_activity(user: dict):
    """Log user activity"""
    print(f"📝 Logging activity for user {user['id']}")
    await asyncio.sleep(0.1)  # Simulate logging
    print(f"Activity logged")

Each listener handles one specific concern and can run independently.

Putting It All Together

Wire up the complete system:

async def demo_state_machine():
    """Demonstrate state machine"""
    order = OrderProcessor("ORD-12345")
    
    print(f"Initial state: {order.state_machine.current_state.value}")
    
    await order.process_order()
    await order.ship_order(tracking_number="TRK789")
    await order.deliver_order()
    
    print(f"Final state: {order.state_machine.current_state.value}")

async def demo_observer_pattern():
    """Demonstrate observer pattern"""
    user_service = UserService()
    
    # Register event listeners
    user_service.events.on("user_created", send_welcome_email)
    user_service.events.on("user_created", create_user_profile)
    user_service.events.on("user_created", log_user_activity)
    
    # Create user - triggers all listeners
    user = await user_service.create_user({
        "name": "John Doe",
        "email": "[email protected]"
    })
    
    print(f"User created: {user}")

# Run demonstrations
asyncio.run(demo_state_machine())
asyncio.run(demo_observer_pattern())

This shows how both patterns work together in a complete system.

Summary

State machines and observer patterns provide powerful coordination:

State Machines

  • Clear State Management: Explicit transitions and validation
  • Business Logic: Handlers contain the actual work
  • Thread Safety: Locks prevent race conditions
  • Auditability: Track state changes easily

Observer Pattern

  • Loose Coupling: Components don’t know about each other
  • Extensibility: Easy to add new event handlers
  • Concurrent Execution: All listeners run simultaneously
  • Event-Driven: React to changes as they happen

Best Practices

  • Keep transition handlers focused and simple
  • Use events for cross-cutting concerns
  • Implement proper error handling in listeners
  • Design states and events to match business processes

These patterns enable building complex, maintainable async applications with clear separation of concerns.