Snapshotting and Performance Optimization

As event stores grow, rebuilding aggregates from their entire event history becomes increasingly expensive. Snapshotting addresses this by periodically capturing the aggregate’s state, allowing us to rebuild from the snapshot plus subsequent events.

Snapshot Implementation

Let’s implement a snapshotting system:

package eventsourcing

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"time"

	"github.com/google/uuid"
)

// Snapshot represents a point-in-time capture of an aggregate's state
type Snapshot struct {
	ID            string          // Unique identifier for the snapshot
	AggregateID   string          // ID of the aggregate
	AggregateType string          // Type of the aggregate
	Version       int             // Version of the aggregate at snapshot time
	Timestamp     time.Time       // When the snapshot was created
	Data          json.RawMessage // Serialized aggregate state
}

// Snapshotter defines the interface for snapshot storage
type Snapshotter interface {
	// SaveSnapshot persists a snapshot
	SaveSnapshot(ctx context.Context, snapshot Snapshot) error
	
	// GetLatestSnapshot retrieves the most recent snapshot for an aggregate
	GetLatestSnapshot(ctx context.Context, aggregateID string) (Snapshot, error)
}

// SnapshotableAggregate is an aggregate that supports snapshotting
type SnapshotableAggregate interface {
	Aggregate
	
	// ToSnapshot serializes the aggregate state for snapshotting
	ToSnapshot() (json.RawMessage, error)
	
	// FromSnapshot deserializes a snapshot into the aggregate state
	FromSnapshot(data json.RawMessage) error
}

// PostgresSnapshotter implements Snapshotter using PostgreSQL
type PostgresSnapshotter struct {
	db *sql.DB
}

// NewPostgresSnapshotter creates a new PostgreSQL-backed snapshotter
func NewPostgresSnapshotter(db *sql.DB) *PostgresSnapshotter {
	return &PostgresSnapshotter{db: db}
}

// Initialize creates the necessary tables if they don't exist
func (s *PostgresSnapshotter) Initialize(ctx context.Context) error {
	_, err := s.db.ExecContext(ctx, `
		CREATE TABLE IF NOT EXISTS snapshots (
			id UUID PRIMARY KEY,
			aggregate_id VARCHAR(255) NOT NULL,
			aggregate_type VARCHAR(255) NOT NULL,
			version INT NOT NULL,
			timestamp TIMESTAMP NOT NULL,
			data JSONB NOT NULL,
			
			-- Create an index on aggregate_id and version for efficient retrieval
			UNIQUE (aggregate_id, version)
		);
		
		-- Create indexes for common query patterns
		CREATE INDEX IF NOT EXISTS idx_snapshots_aggregate_id ON snapshots(aggregate_id);
	`)
	
	return err
}

// SaveSnapshot persists a snapshot
func (s *PostgresSnapshotter) SaveSnapshot(ctx context.Context, snapshot Snapshot) error {
	// Ensure snapshot has a valid ID
	if snapshot.ID == "" {
		snapshot.ID = uuid.New().String()
	}
	
	// Set timestamp if not already set
	if snapshot.Timestamp.IsZero() {
		snapshot.Timestamp = time.Now().UTC()
	}
	
	_, err := s.db.ExecContext(ctx, `
		INSERT INTO snapshots (
			id, aggregate_id, aggregate_type, version, timestamp, data
		) VALUES (
			$1, $2, $3, $4, $5, $6
		)
	`, snapshot.ID, snapshot.AggregateID, snapshot.AggregateType,
	   snapshot.Version, snapshot.Timestamp, snapshot.Data)
	
	return err
}

// GetLatestSnapshot retrieves the most recent snapshot for an aggregate
func (s *PostgresSnapshotter) GetLatestSnapshot(ctx context.Context, aggregateID string) (Snapshot, error) {
	var snapshot Snapshot
	
	err := s.db.QueryRowContext(ctx, `
		SELECT id, aggregate_id, aggregate_type, version, timestamp, data
		FROM snapshots
		WHERE aggregate_id = $1
		ORDER BY version DESC
		LIMIT 1
	`, aggregateID).Scan(
		&snapshot.ID, &snapshot.AggregateID, &snapshot.AggregateType,
		&snapshot.Version, &snapshot.Timestamp, &snapshot.Data,
	)
	
	if err == sql.ErrNoRows {
		return Snapshot{}, errors.New("snapshot not found")
	}
	
	return snapshot, err
}

Snapshotting Strategy

Now let’s implement a strategy for when to create snapshots:

package eventsourcing

import (
	"context"
	"fmt"
)

// SnapshotStrategy defines when snapshots should be created
type SnapshotStrategy interface {
	// ShouldSnapshot determines if a snapshot should be created
	ShouldSnapshot(aggregateID string, eventCount int, lastSnapshotVersion int) bool
}

// IntervalSnapshotStrategy creates snapshots at fixed event intervals
type IntervalSnapshotStrategy struct {
	interval int // Number of events between snapshots
}

// NewIntervalSnapshotStrategy creates a new interval-based snapshot strategy
func NewIntervalSnapshotStrategy(interval int) *IntervalSnapshotStrategy {
	if interval <= 0 {
		interval = 100 // Default to 100 events
	}
	return &IntervalSnapshotStrategy{interval: interval}
}

// ShouldSnapshot determines if a snapshot should be created
func (s *IntervalSnapshotStrategy) ShouldSnapshot(aggregateID string, eventCount int, lastSnapshotVersion int) bool {
	return eventCount >= lastSnapshotVersion+s.interval
}

// SnapshotManager coordinates snapshot creation and loading
type SnapshotManager struct {
	eventStore        EventStore
	snapshotter       Snapshotter
	strategy          SnapshotStrategy
	aggregateFactories map[string]func(id string) SnapshotableAggregate
}

// NewSnapshotManager creates a new snapshot manager
func NewSnapshotManager(
	eventStore EventStore,
	snapshotter Snapshotter,
	strategy SnapshotStrategy,
) *SnapshotManager {
	return &SnapshotManager{
		eventStore:        eventStore,
		snapshotter:       snapshotter,
		strategy:          strategy,
		aggregateFactories: make(map[string]func(id string) SnapshotableAggregate),
	}
}

// RegisterAggregate registers an aggregate type with the snapshot manager
func (m *SnapshotManager) RegisterAggregate(
	aggregateType string,
	factory func(id string) SnapshotableAggregate,
) {
	m.aggregateFactories[aggregateType] = factory
}

// GetAggregate loads an aggregate, using snapshots if available
func (m *SnapshotManager) GetAggregate(
	ctx context.Context,
	aggregateID string,
	aggregateType string,
) (SnapshotableAggregate, error) {
	// Get the aggregate factory
	factory, exists := m.aggregateFactories[aggregateType]
	if !exists {
		return nil, fmt.Errorf("unknown aggregate type: %s", aggregateType)
	}
	
	// Create the aggregate instance
	aggregate := factory(aggregateID)
	
	// Try to load the latest snapshot
	snapshot, err := m.snapshotter.GetLatestSnapshot(ctx, aggregateID)
	
	var snapshotVersion int
	if err == nil {
		// Apply the snapshot
		if err := aggregate.FromSnapshot(snapshot.Data); err != nil {
			return nil, fmt.Errorf("failed to apply snapshot: %w", err)
		}
		snapshotVersion = snapshot.Version
	} else {
		// No snapshot available, start from scratch
		snapshotVersion = 0
	}
	
	// Load events after the snapshot
	events, err := m.eventStore.GetEventsAfterVersion(ctx, aggregateID, snapshotVersion)
	if err != nil {
		return nil, fmt.Errorf("failed to load events: %w", err)
	}
	
	// Apply the events
	if err := aggregate.ApplyEvents(events); err != nil {
		return nil, fmt.Errorf("failed to apply events: %w", err)
	}
	
	// Check if we should create a new snapshot
	if m.strategy.ShouldSnapshot(aggregateID, aggregate.Version(), snapshotVersion) {
		if err := m.CreateSnapshot(ctx, aggregate); err != nil {
			// Log the error but continue
			fmt.Printf("Failed to create snapshot: %v\n", err)
		}
	}
	
	return aggregate, nil
}

// CreateSnapshot creates a snapshot of the aggregate's current state
func (m *SnapshotManager) CreateSnapshot(ctx context.Context, aggregate SnapshotableAggregate) error {
	// Serialize the aggregate state
	data, err := aggregate.ToSnapshot()
	if err != nil {
		return fmt.Errorf("failed to serialize aggregate: %w", err)
	}
	
	// Create the snapshot
	snapshot := Snapshot{
		AggregateID:   aggregate.ID(),
		AggregateType: aggregate.Type(),
		Version:       aggregate.Version(),
		Data:          data,
	}
	
	// Save the snapshot
	return m.snapshotter.SaveSnapshot(ctx, snapshot)
}

Making ProductAggregate Snapshotable

Let’s update our ProductAggregate to support snapshotting:

package ecommerce

import (
	"encoding/json"

	"github.com/yourdomain/eventsourcing"
)

// ProductSnapshot represents the serialized state of a ProductAggregate
type ProductSnapshot struct {
	ID          string  `json:"id"`
	Name        string  `json:"name"`
	Description string  `json:"description"`
	Price       float64 `json:"price"`
	StockLevel  int     `json:"stock_level"`
	IsActive    bool    `json:"is_active"`
	Version     int     `json:"version"`
}

// ToSnapshot serializes the aggregate state for snapshotting
func (p *ProductAggregate) ToSnapshot() (json.RawMessage, error) {
	snapshot := ProductSnapshot{
		ID:          p.id,
		Name:        p.name,
		Description: p.description,
		Price:       p.price,
		StockLevel:  p.stockLevel,
		IsActive:    p.isActive,
		Version:     p.version,
	}
	
	return json.Marshal(snapshot)
}

// FromSnapshot deserializes a snapshot into the aggregate state
func (p *ProductAggregate) FromSnapshot(data json.RawMessage) error {
	var snapshot ProductSnapshot
	if err := json.Unmarshal(data, &snapshot); err != nil {
		return err
	}
	
	p.id = snapshot.ID
	p.name = snapshot.Name
	p.description = snapshot.Description
	p.price = snapshot.Price
	p.stockLevel = snapshot.StockLevel
	p.isActive = snapshot.IsActive
	p.version = snapshot.Version
	
	return nil
}