CQRS Pattern Integration
Command Query Responsibility Segregation (CQRS) is a pattern that separates the read and write operations of a data store. When combined with event sourcing, it creates a powerful architecture for building scalable, responsive systems.
CQRS Principles
In CQRS:
- Commands modify state but don’t return data
- Queries return data but don’t modify state
- Write models are optimized for consistency and validation
- Read models (projections) are optimized for specific query needs
This separation allows each side to be optimized independently:
- Write models can focus on business rules and consistency
- Read models can be denormalized for performance
- Different storage technologies can be used for each side
Implementing Command Handlers
Command handlers are responsible for validating commands, applying business rules, and generating events:
package eventsourcing
import (
"context"
"errors"
"fmt"
)
// BaseAggregate provides common functionality for aggregates
type BaseAggregate struct {
id string
typ string
version int
changes []Event
}
// ID returns the aggregate's identifier
func (a *BaseAggregate) ID() string {
return a.id
}
// Type returns the aggregate's type
func (a *BaseAggregate) Type() string {
return a.typ
}
// Version returns the aggregate's current version
func (a *BaseAggregate) Version() int {
return a.version
}
// Changes returns the uncommitted changes
func (a *BaseAggregate) Changes() []Event {
return a.changes
}
// ClearChanges clears the uncommitted changes
func (a *BaseAggregate) ClearChanges() {
a.changes = nil
}
// AddChange adds a new change event
func (a *BaseAggregate) AddChange(eventType string, data interface{}) error {
event, err := CreateEvent(a.id, a.typ, a.version+1, eventType, data)
if err != nil {
return err
}
a.changes = append(a.changes, event)
return nil
}
// Apply applies an event to the aggregate
func (a *BaseAggregate) Apply(event Event) {
a.version = event.Version
}
// GenericCommandHandler provides a base implementation for command handlers
type GenericCommandHandler struct {
eventStore EventStore
aggregates map[string]func(id string) Aggregate
}
// NewGenericCommandHandler creates a new command handler
func NewGenericCommandHandler(eventStore EventStore) *GenericCommandHandler {
return &GenericCommandHandler{
eventStore: eventStore,
aggregates: make(map[string]func(id string) Aggregate),
}
}
// RegisterAggregate registers an aggregate type with the command handler
func (h *GenericCommandHandler) RegisterAggregate(aggregateType string, factory func(id string) Aggregate) {
h.aggregates[aggregateType] = factory
}
// HandleCommand processes a command and generates events
func (h *GenericCommandHandler) HandleCommand(ctx context.Context, command Command) ([]Event, error) {
// Get the aggregate factory
factory, exists := h.aggregates[command.AggregateType]
if !exists {
return nil, fmt.Errorf("unknown aggregate type: %s", command.AggregateType)
}
// Create the aggregate instance
aggregate := factory(command.AggregateID)
// Load the aggregate's events
events, err := h.eventStore.GetEvents(ctx, command.AggregateID)
if err != nil && !errors.Is(err, ErrAggregateNotFound) {
return nil, fmt.Errorf("failed to load aggregate events: %w", err)
}
// Apply existing events to rebuild the aggregate's state
if err := aggregate.ApplyEvents(events); err != nil {
return nil, fmt.Errorf("failed to apply events to aggregate: %w", err)
}
// Process the command (this is implemented by concrete command handlers)
if err := h.processCommand(ctx, aggregate, command); err != nil {
return nil, err
}
// Get the changes produced by the command
changes := aggregate.(interface{ Changes() []Event }).Changes()
if len(changes) == 0 {
return nil, nil // No changes to save
}
// Save the new events
if err := h.eventStore.SaveEvents(ctx, aggregate.ID(), aggregate.Version(), changes); err != nil {
return nil, fmt.Errorf("failed to save events: %w", err)
}
// Clear the changes from the aggregate
aggregate.(interface{ ClearChanges() }).ClearChanges()
return changes, nil
}
// processCommand is implemented by concrete command handlers
func (h *GenericCommandHandler) processCommand(ctx context.Context, aggregate Aggregate, command Command) error {
return errors.New("processCommand must be implemented by concrete command handlers")
}
Implementing a Product Aggregate
Let’s implement a concrete aggregate for our e-commerce example:
package ecommerce
import (
"encoding/json"
"errors"
"fmt"
"github.com/yourdomain/eventsourcing"
)
// ProductAggregate represents a product in our e-commerce system
type ProductAggregate struct {
eventsourcing.BaseAggregate
// Product state
name string
description string
price float64
stockLevel int
isActive bool
}
// NewProductAggregate creates a new product aggregate
func NewProductAggregate(id string) eventsourcing.Aggregate {
product := &ProductAggregate{}
product.id = id
product.typ = "product"
return product
}
// ApplyEvent applies a single event to the aggregate
func (p *ProductAggregate) ApplyEvent(event eventsourcing.Event) error {
// Apply the event to the base aggregate
p.BaseAggregate.Apply(event)
// Apply event-specific logic
switch event.EventType {
case "ProductCreated":
var data ProductCreated
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
p.name = data.Name
p.description = data.Description
p.price = data.Price
p.stockLevel = data.StockLevel
p.isActive = true
case "ProductPriceChanged":
var data ProductPriceChanged
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
p.price = data.NewPrice
case "ProductStockAdjusted":
var data ProductStockAdjusted
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
p.stockLevel = data.NewStockLevel
case "ProductDeactivated":
p.isActive = false
case "ProductReactivated":
p.isActive = true
default:
return fmt.Errorf("unknown event type: %s", event.EventType)
}
return nil
}
// ApplyEvents applies multiple events to the aggregate
func (p *ProductAggregate) ApplyEvents(events []eventsourcing.Event) error {
for _, event := range events {
if err := p.ApplyEvent(event); err != nil {
return err
}
}
return nil
}
// CreateProduct creates a new product
func (p *ProductAggregate) CreateProduct(name, description string, price float64, stockLevel int) error {
// Validate inputs
if name == "" {
return errors.New("product name cannot be empty")
}
if price <= 0 {
return errors.New("product price must be positive")
}
if stockLevel < 0 {
return errors.New("stock level cannot be negative")
}
// Check if product already exists
if p.name != "" {
return errors.New("product already exists")
}
// Create the event
return p.AddChange("ProductCreated", ProductCreated{
ProductID: p.id,
Name: name,
Description: description,
Price: price,
StockLevel: stockLevel,
})
}
// ChangePrice changes the product's price
func (p *ProductAggregate) ChangePrice(newPrice float64) error {
// Validate inputs
if newPrice <= 0 {
return errors.New("product price must be positive")
}
// Check if product exists
if p.name == "" {
return errors.New("product does not exist")
}
// Check if price is actually changing
if p.price == newPrice {
return nil // No change needed
}
// Create the event
return p.AddChange("ProductPriceChanged", ProductPriceChanged{
ProductID: p.id,
OldPrice: p.price,
NewPrice: newPrice,
ChangeDate: time.Now().UTC(),
})
}
// AdjustStock adjusts the product's stock level
func (p *ProductAggregate) AdjustStock(adjustment int, adjustmentType string) error {
// Check if product exists
if p.name == "" {
return errors.New("product does not exist")
}
// Calculate new stock level
newStockLevel := p.stockLevel + adjustment
// Validate new stock level
if newStockLevel < 0 {
return errors.New("stock level cannot be negative")
}
// Create the event
return p.AddChange("ProductStockAdjusted", ProductStockAdjusted{
ProductID: p.id,
PreviousStock: p.stockLevel,
NewStockLevel: newStockLevel,
AdjustmentType: adjustmentType,
})
}
Product Command Handler
Now let’s implement a command handler for our product aggregate:
package ecommerce
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/yourdomain/eventsourcing"
)
// ProductCommands
type CreateProductCommand struct {
ProductID string `json:"product_id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
StockLevel int `json:"stock_level"`
}
type ChangePriceCommand struct {
ProductID string `json:"product_id"`
NewPrice float64 `json:"new_price"`
}
type AdjustStockCommand struct {
ProductID string `json:"product_id"`
Adjustment int `json:"adjustment"`
AdjustmentType string `json:"adjustment_type"`
}
// ProductCommandHandler handles commands for the product aggregate
type ProductCommandHandler struct {
*eventsourcing.GenericCommandHandler
}
// NewProductCommandHandler creates a new product command handler
func NewProductCommandHandler(eventStore eventsourcing.EventStore) *ProductCommandHandler {
handler := &ProductCommandHandler{
GenericCommandHandler: eventsourcing.NewGenericCommandHandler(eventStore),
}
// Register the product aggregate
handler.RegisterAggregate("product", NewProductAggregate)
return handler
}
// processCommand processes a command for a product aggregate
func (h *ProductCommandHandler) processCommand(ctx context.Context, aggregate eventsourcing.Aggregate, command eventsourcing.Command) error {
product, ok := aggregate.(*ProductAggregate)
if !ok {
return errors.New("invalid aggregate type")
}
switch command.CommandType {
case "CreateProduct":
var cmd CreateProductCommand
if err := json.Unmarshal(command.Data, &cmd); err != nil {
return err
}
return product.CreateProduct(cmd.Name, cmd.Description, cmd.Price, cmd.StockLevel)
case "ChangePrice":
var cmd ChangePriceCommand
if err := json.Unmarshal(command.Data, &cmd); err != nil {
return err
}
return product.ChangePrice(cmd.NewPrice)
case "AdjustStock":
var cmd AdjustStockCommand
if err := json.Unmarshal(command.Data, &cmd); err != nil {
return err
}
return product.AdjustStock(cmd.Adjustment, cmd.AdjustmentType)
default:
return fmt.Errorf("unknown command type: %s", command.CommandType)
}
}