CQRS Pattern Integration

Command Query Responsibility Segregation (CQRS) is a pattern that separates the read and write operations of a data store. When combined with event sourcing, it creates a powerful architecture for building scalable, responsive systems.

CQRS Principles

In CQRS:

  1. Commands modify state but don’t return data
  2. Queries return data but don’t modify state
  3. Write models are optimized for consistency and validation
  4. Read models (projections) are optimized for specific query needs

This separation allows each side to be optimized independently:

  • Write models can focus on business rules and consistency
  • Read models can be denormalized for performance
  • Different storage technologies can be used for each side

Implementing Command Handlers

Command handlers are responsible for validating commands, applying business rules, and generating events:

package eventsourcing

import (
	"context"
	"errors"
	"fmt"
)

// BaseAggregate provides common functionality for aggregates
type BaseAggregate struct {
	id      string
	typ     string
	version int
	changes []Event
}

// ID returns the aggregate's identifier
func (a *BaseAggregate) ID() string {
	return a.id
}

// Type returns the aggregate's type
func (a *BaseAggregate) Type() string {
	return a.typ
}

// Version returns the aggregate's current version
func (a *BaseAggregate) Version() int {
	return a.version
}

// Changes returns the uncommitted changes
func (a *BaseAggregate) Changes() []Event {
	return a.changes
}

// ClearChanges clears the uncommitted changes
func (a *BaseAggregate) ClearChanges() {
	a.changes = nil
}

// AddChange adds a new change event
func (a *BaseAggregate) AddChange(eventType string, data interface{}) error {
	event, err := CreateEvent(a.id, a.typ, a.version+1, eventType, data)
	if err != nil {
		return err
	}
	
	a.changes = append(a.changes, event)
	return nil
}

// Apply applies an event to the aggregate
func (a *BaseAggregate) Apply(event Event) {
	a.version = event.Version
}

// GenericCommandHandler provides a base implementation for command handlers
type GenericCommandHandler struct {
	eventStore EventStore
	aggregates map[string]func(id string) Aggregate
}

// NewGenericCommandHandler creates a new command handler
func NewGenericCommandHandler(eventStore EventStore) *GenericCommandHandler {
	return &GenericCommandHandler{
		eventStore: eventStore,
		aggregates: make(map[string]func(id string) Aggregate),
	}
}

// RegisterAggregate registers an aggregate type with the command handler
func (h *GenericCommandHandler) RegisterAggregate(aggregateType string, factory func(id string) Aggregate) {
	h.aggregates[aggregateType] = factory
}

// HandleCommand processes a command and generates events
func (h *GenericCommandHandler) HandleCommand(ctx context.Context, command Command) ([]Event, error) {
	// Get the aggregate factory
	factory, exists := h.aggregates[command.AggregateType]
	if !exists {
		return nil, fmt.Errorf("unknown aggregate type: %s", command.AggregateType)
	}
	
	// Create the aggregate instance
	aggregate := factory(command.AggregateID)
	
	// Load the aggregate's events
	events, err := h.eventStore.GetEvents(ctx, command.AggregateID)
	if err != nil && !errors.Is(err, ErrAggregateNotFound) {
		return nil, fmt.Errorf("failed to load aggregate events: %w", err)
	}
	
	// Apply existing events to rebuild the aggregate's state
	if err := aggregate.ApplyEvents(events); err != nil {
		return nil, fmt.Errorf("failed to apply events to aggregate: %w", err)
	}
	
	// Process the command (this is implemented by concrete command handlers)
	if err := h.processCommand(ctx, aggregate, command); err != nil {
		return nil, err
	}
	
	// Get the changes produced by the command
	changes := aggregate.(interface{ Changes() []Event }).Changes()
	if len(changes) == 0 {
		return nil, nil // No changes to save
	}
	
	// Save the new events
	if err := h.eventStore.SaveEvents(ctx, aggregate.ID(), aggregate.Version(), changes); err != nil {
		return nil, fmt.Errorf("failed to save events: %w", err)
	}
	
	// Clear the changes from the aggregate
	aggregate.(interface{ ClearChanges() }).ClearChanges()
	
	return changes, nil
}

// processCommand is implemented by concrete command handlers
func (h *GenericCommandHandler) processCommand(ctx context.Context, aggregate Aggregate, command Command) error {
	return errors.New("processCommand must be implemented by concrete command handlers")
}

Implementing a Product Aggregate

Let’s implement a concrete aggregate for our e-commerce example:

package ecommerce

import (
	"encoding/json"
	"errors"
	"fmt"

	"github.com/yourdomain/eventsourcing"
)

// ProductAggregate represents a product in our e-commerce system
type ProductAggregate struct {
	eventsourcing.BaseAggregate
	
	// Product state
	name        string
	description string
	price       float64
	stockLevel  int
	isActive    bool
}

// NewProductAggregate creates a new product aggregate
func NewProductAggregate(id string) eventsourcing.Aggregate {
	product := &ProductAggregate{}
	product.id = id
	product.typ = "product"
	return product
}

// ApplyEvent applies a single event to the aggregate
func (p *ProductAggregate) ApplyEvent(event eventsourcing.Event) error {
	// Apply the event to the base aggregate
	p.BaseAggregate.Apply(event)
	
	// Apply event-specific logic
	switch event.EventType {
	case "ProductCreated":
		var data ProductCreated
		if err := json.Unmarshal(event.Data, &data); err != nil {
			return err
		}
		p.name = data.Name
		p.description = data.Description
		p.price = data.Price
		p.stockLevel = data.StockLevel
		p.isActive = true
		
	case "ProductPriceChanged":
		var data ProductPriceChanged
		if err := json.Unmarshal(event.Data, &data); err != nil {
			return err
		}
		p.price = data.NewPrice
		
	case "ProductStockAdjusted":
		var data ProductStockAdjusted
		if err := json.Unmarshal(event.Data, &data); err != nil {
			return err
		}
		p.stockLevel = data.NewStockLevel
		
	case "ProductDeactivated":
		p.isActive = false
		
	case "ProductReactivated":
		p.isActive = true
		
	default:
		return fmt.Errorf("unknown event type: %s", event.EventType)
	}
	
	return nil
}

// ApplyEvents applies multiple events to the aggregate
func (p *ProductAggregate) ApplyEvents(events []eventsourcing.Event) error {
	for _, event := range events {
		if err := p.ApplyEvent(event); err != nil {
			return err
		}
	}
	return nil
}

// CreateProduct creates a new product
func (p *ProductAggregate) CreateProduct(name, description string, price float64, stockLevel int) error {
	// Validate inputs
	if name == "" {
		return errors.New("product name cannot be empty")
	}
	if price <= 0 {
		return errors.New("product price must be positive")
	}
	if stockLevel < 0 {
		return errors.New("stock level cannot be negative")
	}
	
	// Check if product already exists
	if p.name != "" {
		return errors.New("product already exists")
	}
	
	// Create the event
	return p.AddChange("ProductCreated", ProductCreated{
		ProductID:   p.id,
		Name:        name,
		Description: description,
		Price:       price,
		StockLevel:  stockLevel,
	})
}

// ChangePrice changes the product's price
func (p *ProductAggregate) ChangePrice(newPrice float64) error {
	// Validate inputs
	if newPrice <= 0 {
		return errors.New("product price must be positive")
	}
	
	// Check if product exists
	if p.name == "" {
		return errors.New("product does not exist")
	}
	
	// Check if price is actually changing
	if p.price == newPrice {
		return nil // No change needed
	}
	
	// Create the event
	return p.AddChange("ProductPriceChanged", ProductPriceChanged{
		ProductID:  p.id,
		OldPrice:   p.price,
		NewPrice:   newPrice,
		ChangeDate: time.Now().UTC(),
	})
}

// AdjustStock adjusts the product's stock level
func (p *ProductAggregate) AdjustStock(adjustment int, adjustmentType string) error {
	// Check if product exists
	if p.name == "" {
		return errors.New("product does not exist")
	}
	
	// Calculate new stock level
	newStockLevel := p.stockLevel + adjustment
	
	// Validate new stock level
	if newStockLevel < 0 {
		return errors.New("stock level cannot be negative")
	}
	
	// Create the event
	return p.AddChange("ProductStockAdjusted", ProductStockAdjusted{
		ProductID:      p.id,
		PreviousStock:  p.stockLevel,
		NewStockLevel:  newStockLevel,
		AdjustmentType: adjustmentType,
	})
}

Product Command Handler

Now let’s implement a command handler for our product aggregate:

package ecommerce

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"

	"github.com/yourdomain/eventsourcing"
)

// ProductCommands
type CreateProductCommand struct {
	ProductID   string  `json:"product_id"`
	Name        string  `json:"name"`
	Description string  `json:"description"`
	Price       float64 `json:"price"`
	StockLevel  int     `json:"stock_level"`
}

type ChangePriceCommand struct {
	ProductID string  `json:"product_id"`
	NewPrice  float64 `json:"new_price"`
}

type AdjustStockCommand struct {
	ProductID      string `json:"product_id"`
	Adjustment     int    `json:"adjustment"`
	AdjustmentType string `json:"adjustment_type"`
}

// ProductCommandHandler handles commands for the product aggregate
type ProductCommandHandler struct {
	*eventsourcing.GenericCommandHandler
}

// NewProductCommandHandler creates a new product command handler
func NewProductCommandHandler(eventStore eventsourcing.EventStore) *ProductCommandHandler {
	handler := &ProductCommandHandler{
		GenericCommandHandler: eventsourcing.NewGenericCommandHandler(eventStore),
	}
	
	// Register the product aggregate
	handler.RegisterAggregate("product", NewProductAggregate)
	
	return handler
}

// processCommand processes a command for a product aggregate
func (h *ProductCommandHandler) processCommand(ctx context.Context, aggregate eventsourcing.Aggregate, command eventsourcing.Command) error {
	product, ok := aggregate.(*ProductAggregate)
	if !ok {
		return errors.New("invalid aggregate type")
	}
	
	switch command.CommandType {
	case "CreateProduct":
		var cmd CreateProductCommand
		if err := json.Unmarshal(command.Data, &cmd); err != nil {
			return err
		}
		return product.CreateProduct(cmd.Name, cmd.Description, cmd.Price, cmd.StockLevel)
		
	case "ChangePrice":
		var cmd ChangePriceCommand
		if err := json.Unmarshal(command.Data, &cmd); err != nil {
			return err
		}
		return product.ChangePrice(cmd.NewPrice)
		
	case "AdjustStock":
		var cmd AdjustStockCommand
		if err := json.Unmarshal(command.Data, &cmd); err != nil {
			return err
		}
		return product.AdjustStock(cmd.Adjustment, cmd.AdjustmentType)
		
	default:
		return fmt.Errorf("unknown command type: %s", command.CommandType)
	}
}