Implementation Example

public class Order {
    private String orderId;
    private OrderStatus status;
    private List<OrderItem> items;
    private List<Event> changes = new ArrayList<>();
    
    public void apply(OrderCreatedEvent event) {
        this.orderId = event.getOrderId();
        this.status = OrderStatus.CREATED;
        this.items = new ArrayList<>(event.getItems());
        changes.add(event);
    }
    
    public void apply(OrderPaidEvent event) {
        this.status = OrderStatus.PAID;
        changes.add(event);
    }
    
    public void apply(OrderShippedEvent event) {
        this.status = OrderStatus.SHIPPED;
        changes.add(event);
    }
    
    public void apply(OrderItemAddedEvent event) {
        this.items.add(event.getItem());
        changes.add(event);
    }
    
    public List<Event> getUncommittedChanges() {
        return new ArrayList<>(changes);
    }
    
    public void markChangesAsCommitted() {
        changes.clear();
    }
    
    // Rebuild state by replaying events
    public static Order recreateFrom(List<Event> events) {
        Order order = new Order();
        for (Event event : events) {
            if (event instanceof OrderCreatedEvent) {
                order.apply((OrderCreatedEvent) event);
            } else if (event instanceof OrderPaidEvent) {
                order.apply((OrderPaidEvent) event);
            } else if (event instanceof OrderShippedEvent) {
                order.apply((OrderShippedEvent) event);
            } else if (event instanceof OrderItemAddedEvent) {
                order.apply((OrderItemAddedEvent) event);
            }
        }
        return order;
    }
}

Event store implementation:

public class EventStore {
    private final Map<String, List<Event>> eventStreams = new HashMap<>();
    
    public void saveEvents(String streamId, List<Event> events, int expectedVersion) {
        List<Event> eventStream = eventStreams.getOrDefault(streamId, new ArrayList<>());
        
        // Optimistic concurrency check
        if (eventStream.size() != expectedVersion) {
            throw new ConcurrencyException();
        }
        
        // Append events to the stream
        eventStream.addAll(events);
        eventStreams.put(streamId, eventStream);
        
        // Publish events to the event bus
        for (Event event : events) {
            eventBus.publish(event);
        }
    }
    
    public List<Event> getEventsForStream(String streamId) {
        return eventStreams.getOrDefault(streamId, Collections.emptyList());
    }
}

When to Use

Event sourcing is valuable when:

  • You need a complete audit trail of changes
  • You want to rebuild past states for analysis or debugging
  • You need to implement temporal queries (what was the state at time X?)
  • You’re working with complex domains where state transitions are important

Challenges

  • Handling schema evolution of events
  • Managing the size of event streams
  • Implementing efficient querying of current state

3. Command Query Responsibility Segregation (CQRS)

CQRS separates read and write operations into different models, allowing each to be optimized independently. It’s often used alongside event sourcing.

Implementation Example

// Command model (write side)
public class OrderCommandService {
    private final EventStore eventStore;
    
    public void createOrder(CreateOrderCommand command) {
        // Create a new order event
        OrderCreatedEvent event = new OrderCreatedEvent(
            UUID.randomUUID().toString(),
            command.getCustomerId(),
            command.getItems()
        );
        
        // Save to event store
        eventStore.saveEvents(event.getOrderId(), Collections.singletonList(event), 0);
    }
    
    public void addItemToOrder(AddOrderItemCommand command) {
        // Load the order from event store
        List<Event> events = eventStore.getEventsForStream(command.getOrderId());
        Order order = Order.recreateFrom(events);
        
        // Create new event
        OrderItemAddedEvent event = new OrderItemAddedEvent(
            command.getOrderId(),
            command.getItem()
        );
        
        // Apply and save
        order.apply(event);
        eventStore.saveEvents(command.getOrderId(), order.getUncommittedChanges(), events.size());
        order.markChangesAsCommitted();
    }
}

// Query model (read side)
public class OrderQueryService {
    private final OrderReadRepository repository;
    
    public OrderQueryService(EventBus eventBus) {
        // Subscribe to events to update read models
        eventBus.subscribe(OrderCreatedEvent.class, this::handleOrderCreated);
        eventBus.subscribe(OrderItemAddedEvent.class, this::handleOrderItemAdded);
    }
    
    private void handleOrderCreated(OrderCreatedEvent event) {
        OrderReadModel readModel = new OrderReadModel();
        readModel.setOrderId(event.getOrderId());
        readModel.setCustomerId(event.getCustomerId());
        readModel.setItems(event.getItems());
        readModel.setStatus("CREATED");
        readModel.setTotalAmount(calculateTotal(event.getItems()));
        
        repository.save(readModel);
    }
    
    private void handleOrderItemAdded(OrderItemAddedEvent event) {
        OrderReadModel readModel = repository.findById(event.getOrderId());
        readModel.getItems().add(event.getItem());
        readModel.setTotalAmount(calculateTotal(readModel.getItems()));
        
        repository.save(readModel);
    }
    
    public OrderReadModel getOrder(String orderId) {
        return repository.findById(orderId);
    }
    
    public List<OrderReadModel> getOrdersByCustomer(String customerId) {
        return repository.findByCustomerId(customerId);
    }
}

When to Use

CQRS is beneficial when:

  • Read and write workloads have significantly different requirements
  • You need specialized data models for different types of queries
  • You’re implementing event sourcing
  • You have high-performance requirements for reads

Challenges

  • Managing eventual consistency between read and write models
  • Increased complexity in the system architecture
  • Synchronizing multiple read models

4. Saga Pattern

The Saga pattern manages transactions and data consistency across multiple services in a distributed system by choreographing a sequence of local transactions.

Implementation Example: Choreography-based Saga

// Order Service
public class OrderService {
    private final EventBus eventBus;
    
    public void createOrder(CreateOrderRequest request) {
        // Create order in local database
        Order order = new Order(request);
        orderRepository.save(order);
        
        // Publish event
        eventBus.publish(new OrderCreatedEvent(order));
    }
    
    // Handle compensation
    @EventHandler
    public void on(PaymentFailedEvent event) {
        Order order = orderRepository.findById(event.getOrderId());
        order.setStatus(OrderStatus.CANCELLED);
        orderRepository.save(order);
    }
}

// Payment Service
public class PaymentService {
    private final EventBus eventBus;
    
    @EventHandler
    public void on(OrderCreatedEvent event) {
        try {
            // Process payment
            Payment payment = paymentProcessor.process(
                event.getCustomerId(), 
                event.getTotalAmount()
            );
            
            // Publish success event
            eventBus.publish(new PaymentCompletedEvent(
                event.getOrderId(), 
                payment.getId()
            ));
        } catch (PaymentException e) {
            // Publish failure event to trigger compensation
            eventBus.publish(new PaymentFailedEvent(
                event.getOrderId(), 
                e.getMessage()
            ));
        }
    }
}

// Inventory Service
public class InventoryService {
    private final EventBus eventBus;
    
    @EventHandler
    public void on(PaymentCompletedEvent event) {
        try {
            // Reserve inventory
            Order order = orderRepository.findById(event.getOrderId());
            for (OrderItem item : order.getItems()) {
                inventoryRepository.reserveItem(item.getProductId(), item.getQuantity());
            }
            
            // Publish success event
            eventBus.publish(new InventoryReservedEvent(event.getOrderId()));
        } catch (InsufficientInventoryException e) {
            // Publish failure event to trigger compensation
            eventBus.publish(new InventoryReservationFailedEvent(
                event.getOrderId(), 
                e.getMessage()
            ));
        }
    }
    
    // Handle compensation
    @EventHandler
    public void on(PaymentFailedEvent event) {
        // No action needed - inventory wasn't reserved yet
    }
}