Event Processing and Projections

In event sourcing, projections are read models built by consuming events and updating query-friendly data structures. They transform the event stream into optimized views for specific query needs.

Projection Fundamentals

Projections have several key characteristics:

  1. Derived Data: They contain no original data, only derived views of events
  2. Rebuilable: They can be rebuilt from scratch by replaying events
  3. Query-Optimized: They are structured for specific query patterns
  4. Eventually Consistent: They may lag slightly behind the write side

Let’s implement a simple projection system:

package eventsourcing

import (
	"context"
	"fmt"
	"sync"
)

// Projection defines the interface for a read model projection
type Projection interface {
	// Name returns the unique name of the projection
	Name() string
	
	// HandleEvent processes an event and updates the projection
	HandleEvent(ctx context.Context, event Event) error
	
	// Reset clears the projection state
	Reset(ctx context.Context) error
}

// ProjectionManager manages a set of projections
type ProjectionManager struct {
	eventStore  EventStore
	projections map[string]Projection
	mu          sync.RWMutex
}

// NewProjectionManager creates a new projection manager
func NewProjectionManager(eventStore EventStore) *ProjectionManager {
	return &ProjectionManager{
		eventStore:  eventStore,
		projections: make(map[string]Projection),
	}
}

// RegisterProjection registers a projection with the manager
func (m *ProjectionManager) RegisterProjection(projection Projection) {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.projections[projection.Name()] = projection
}

// HandleEvent processes an event through all registered projections
func (m *ProjectionManager) HandleEvent(ctx context.Context, event Event) error {
	m.mu.RLock()
	defer m.mu.RUnlock()
	
	for _, projection := range m.projections {
		if err := projection.HandleEvent(ctx, event); err != nil {
			return fmt.Errorf("projection %s failed to handle event: %w", projection.Name(), err)
		}
	}
	
	return nil
}

// RebuildProjection rebuilds a specific projection from scratch
func (m *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error {
	m.mu.RLock()
	projection, exists := m.projections[projectionName]
	m.mu.RUnlock()
	
	if !exists {
		return fmt.Errorf("projection %s not found", projectionName)
	}
	
	// Reset the projection
	if err := projection.Reset(ctx); err != nil {
		return fmt.Errorf("failed to reset projection: %w", err)
	}
	
	// Get all events
	offset := 0
	batchSize := 1000
	
	for {
		events, err := m.eventStore.GetAllEvents(ctx, offset, batchSize)
		if err != nil {
			return fmt.Errorf("failed to get events: %w", err)
		}
		
		// Process each event
		for _, event := range events {
			if err := projection.HandleEvent(ctx, event); err != nil {
				return fmt.Errorf("failed to handle event during rebuild: %w", err)
			}
		}
		
		// If we got fewer events than the batch size, we're done
		if len(events) < batchSize {
			break
		}
		
		// Move to the next batch
		offset += batchSize
	}
	
	return nil
}

// RebuildAllProjections rebuilds all registered projections
func (m *ProjectionManager) RebuildAllProjections(ctx context.Context) error {
	m.mu.RLock()
	projectionNames := make([]string, 0, len(m.projections))
	for name := range m.projections {
		projectionNames = append(projectionNames, name)
	}
	m.mu.RUnlock()
	
	for _, name := range projectionNames {
		if err := m.RebuildProjection(ctx, name); err != nil {
			return err
		}
	}
	
	return nil
}

Product Catalog Projection

Let’s implement a concrete projection for our e-commerce product catalog:

package ecommerce

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"

	"github.com/yourdomain/eventsourcing"
)

// ProductCatalogProjection maintains a read-optimized view of products
type ProductCatalogProjection struct {
	db *sql.DB
}

// NewProductCatalogProjection creates a new product catalog projection
func NewProductCatalogProjection(db *sql.DB) *ProductCatalogProjection {
	return &ProductCatalogProjection{db: db}
}

// Name returns the unique name of the projection
func (p *ProductCatalogProjection) Name() string {
	return "product_catalog"
}

// Initialize creates the necessary tables if they don't exist
func (p *ProductCatalogProjection) Initialize(ctx context.Context) error {
	_, err := p.db.ExecContext(ctx, `
		CREATE TABLE IF NOT EXISTS product_catalog (
			product_id VARCHAR(255) PRIMARY KEY,
			name VARCHAR(255) NOT NULL,
			description TEXT,
			price DECIMAL(10, 2) NOT NULL,
			stock_level INT NOT NULL,
			is_active BOOLEAN NOT NULL DEFAULT TRUE,
			created_at TIMESTAMP NOT NULL,
			updated_at TIMESTAMP NOT NULL
		);
		
		CREATE INDEX IF NOT EXISTS idx_product_catalog_name ON product_catalog(name);
		CREATE INDEX IF NOT EXISTS idx_product_catalog_price ON product_catalog(price);
	`)
	
	return err
}

// HandleEvent processes an event and updates the projection
func (p *ProductCatalogProjection) HandleEvent(ctx context.Context, event eventsourcing.Event) error {
	switch event.EventType {
	case "ProductCreated":
		return p.handleProductCreated(ctx, event)
	case "ProductPriceChanged":
		return p.handleProductPriceChanged(ctx, event)
	case "ProductStockAdjusted":
		return p.handleProductStockAdjusted(ctx, event)
	}
	
	// Ignore events we don't care about
	return nil
}

// Reset clears the projection state
func (p *ProductCatalogProjection) Reset(ctx context.Context) error {
	_, err := p.db.ExecContext(ctx, "DELETE FROM product_catalog")
	return err
}

// handleProductCreated processes a ProductCreated event
func (p *ProductCatalogProjection) handleProductCreated(ctx context.Context, event eventsourcing.Event) error {
	var data ProductCreated
	if err := json.Unmarshal(event.Data, &data); err != nil {
		return err
	}
	
	_, err := p.db.ExecContext(ctx, `
		INSERT INTO product_catalog (
			product_id, name, description, price, stock_level, is_active, created_at, updated_at
		) VALUES (
			$1, $2, $3, $4, $5, TRUE, $6, $6
		)
	`, data.ProductID, data.Name, data.Description, data.Price, data.StockLevel, event.Timestamp)
	
	return err
}

// handleProductPriceChanged processes a ProductPriceChanged event
func (p *ProductCatalogProjection) handleProductPriceChanged(ctx context.Context, event eventsourcing.Event) error {
	var data ProductPriceChanged
	if err := json.Unmarshal(event.Data, &data); err != nil {
		return err
	}
	
	_, err := p.db.ExecContext(ctx, `
		UPDATE product_catalog
		SET price = $1, updated_at = $2
		WHERE product_id = $3
	`, data.NewPrice, event.Timestamp, data.ProductID)
	
	return err
}

// handleProductStockAdjusted processes a ProductStockAdjusted event
func (p *ProductCatalogProjection) handleProductStockAdjusted(ctx context.Context, event eventsourcing.Event) error {
	var data ProductStockAdjusted
	if err := json.Unmarshal(event.Data, &data); err != nil {
		return err
	}
	
	_, err := p.db.ExecContext(ctx, `
		UPDATE product_catalog
		SET stock_level = $1, updated_at = $2
		WHERE product_id = $3
	`, data.NewStockLevel, event.Timestamp, data.ProductID)
	
	return err
}