Implementation Example
public class Order {
private String orderId;
private OrderStatus status;
private List<OrderItem> items;
private List<Event> changes = new ArrayList<>();
public void apply(OrderCreatedEvent event) {
this.orderId = event.getOrderId();
this.status = OrderStatus.CREATED;
this.items = new ArrayList<>(event.getItems());
changes.add(event);
}
public void apply(OrderPaidEvent event) {
this.status = OrderStatus.PAID;
changes.add(event);
}
public void apply(OrderShippedEvent event) {
this.status = OrderStatus.SHIPPED;
changes.add(event);
}
public void apply(OrderItemAddedEvent event) {
this.items.add(event.getItem());
changes.add(event);
}
public List<Event> getUncommittedChanges() {
return new ArrayList<>(changes);
}
public void markChangesAsCommitted() {
changes.clear();
}
// Rebuild state by replaying events
public static Order recreateFrom(List<Event> events) {
Order order = new Order();
for (Event event : events) {
if (event instanceof OrderCreatedEvent) {
order.apply((OrderCreatedEvent) event);
} else if (event instanceof OrderPaidEvent) {
order.apply((OrderPaidEvent) event);
} else if (event instanceof OrderShippedEvent) {
order.apply((OrderShippedEvent) event);
} else if (event instanceof OrderItemAddedEvent) {
order.apply((OrderItemAddedEvent) event);
}
}
return order;
}
}
Event store implementation:
public class EventStore {
private final Map<String, List<Event>> eventStreams = new HashMap<>();
public void saveEvents(String streamId, List<Event> events, int expectedVersion) {
List<Event> eventStream = eventStreams.getOrDefault(streamId, new ArrayList<>());
// Optimistic concurrency check
if (eventStream.size() != expectedVersion) {
throw new ConcurrencyException();
}
// Append events to the stream
eventStream.addAll(events);
eventStreams.put(streamId, eventStream);
// Publish events to the event bus
for (Event event : events) {
eventBus.publish(event);
}
}
public List<Event> getEventsForStream(String streamId) {
return eventStreams.getOrDefault(streamId, Collections.emptyList());
}
}
When to Use
Event sourcing is valuable when:
- You need a complete audit trail of changes
- You want to rebuild past states for analysis or debugging
- You need to implement temporal queries (what was the state at time X?)
- You’re working with complex domains where state transitions are important
Challenges
- Handling schema evolution of events
- Managing the size of event streams
- Implementing efficient querying of current state
3. Command Query Responsibility Segregation (CQRS)
CQRS separates read and write operations into different models, allowing each to be optimized independently. It’s often used alongside event sourcing.
Implementation Example
// Command model (write side)
public class OrderCommandService {
private final EventStore eventStore;
public void createOrder(CreateOrderCommand command) {
// Create a new order event
OrderCreatedEvent event = new OrderCreatedEvent(
UUID.randomUUID().toString(),
command.getCustomerId(),
command.getItems()
);
// Save to event store
eventStore.saveEvents(event.getOrderId(), Collections.singletonList(event), 0);
}
public void addItemToOrder(AddOrderItemCommand command) {
// Load the order from event store
List<Event> events = eventStore.getEventsForStream(command.getOrderId());
Order order = Order.recreateFrom(events);
// Create new event
OrderItemAddedEvent event = new OrderItemAddedEvent(
command.getOrderId(),
command.getItem()
);
// Apply and save
order.apply(event);
eventStore.saveEvents(command.getOrderId(), order.getUncommittedChanges(), events.size());
order.markChangesAsCommitted();
}
}
// Query model (read side)
public class OrderQueryService {
private final OrderReadRepository repository;
public OrderQueryService(EventBus eventBus) {
// Subscribe to events to update read models
eventBus.subscribe(OrderCreatedEvent.class, this::handleOrderCreated);
eventBus.subscribe(OrderItemAddedEvent.class, this::handleOrderItemAdded);
}
private void handleOrderCreated(OrderCreatedEvent event) {
OrderReadModel readModel = new OrderReadModel();
readModel.setOrderId(event.getOrderId());
readModel.setCustomerId(event.getCustomerId());
readModel.setItems(event.getItems());
readModel.setStatus("CREATED");
readModel.setTotalAmount(calculateTotal(event.getItems()));
repository.save(readModel);
}
private void handleOrderItemAdded(OrderItemAddedEvent event) {
OrderReadModel readModel = repository.findById(event.getOrderId());
readModel.getItems().add(event.getItem());
readModel.setTotalAmount(calculateTotal(readModel.getItems()));
repository.save(readModel);
}
public OrderReadModel getOrder(String orderId) {
return repository.findById(orderId);
}
public List<OrderReadModel> getOrdersByCustomer(String customerId) {
return repository.findByCustomerId(customerId);
}
}
When to Use
CQRS is beneficial when:
- Read and write workloads have significantly different requirements
- You need specialized data models for different types of queries
- You’re implementing event sourcing
- You have high-performance requirements for reads
Challenges
- Managing eventual consistency between read and write models
- Increased complexity in the system architecture
- Synchronizing multiple read models
4. Saga Pattern
The Saga pattern manages transactions and data consistency across multiple services in a distributed system by choreographing a sequence of local transactions.
Implementation Example: Choreography-based Saga
// Order Service
public class OrderService {
private final EventBus eventBus;
public void createOrder(CreateOrderRequest request) {
// Create order in local database
Order order = new Order(request);
orderRepository.save(order);
// Publish event
eventBus.publish(new OrderCreatedEvent(order));
}
// Handle compensation
@EventHandler
public void on(PaymentFailedEvent event) {
Order order = orderRepository.findById(event.getOrderId());
order.setStatus(OrderStatus.CANCELLED);
orderRepository.save(order);
}
}
// Payment Service
public class PaymentService {
private final EventBus eventBus;
@EventHandler
public void on(OrderCreatedEvent event) {
try {
// Process payment
Payment payment = paymentProcessor.process(
event.getCustomerId(),
event.getTotalAmount()
);
// Publish success event
eventBus.publish(new PaymentCompletedEvent(
event.getOrderId(),
payment.getId()
));
} catch (PaymentException e) {
// Publish failure event to trigger compensation
eventBus.publish(new PaymentFailedEvent(
event.getOrderId(),
e.getMessage()
));
}
}
}
// Inventory Service
public class InventoryService {
private final EventBus eventBus;
@EventHandler
public void on(PaymentCompletedEvent event) {
try {
// Reserve inventory
Order order = orderRepository.findById(event.getOrderId());
for (OrderItem item : order.getItems()) {
inventoryRepository.reserveItem(item.getProductId(), item.getQuantity());
}
// Publish success event
eventBus.publish(new InventoryReservedEvent(event.getOrderId()));
} catch (InsufficientInventoryException e) {
// Publish failure event to trigger compensation
eventBus.publish(new InventoryReservationFailedEvent(
event.getOrderId(),
e.getMessage()
));
}
}
// Handle compensation
@EventHandler
public void on(PaymentFailedEvent event) {
// No action needed - inventory wasn't reserved yet
}
}