Snapshotting and Performance Optimization
As event stores grow, rebuilding aggregates from their entire event history becomes increasingly expensive. Snapshotting addresses this by periodically capturing the aggregate’s state, allowing us to rebuild from the snapshot plus subsequent events.
Snapshot Implementation
Let’s implement a snapshotting system:
package eventsourcing
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/google/uuid"
)
// Snapshot represents a point-in-time capture of an aggregate's state
type Snapshot struct {
ID string // Unique identifier for the snapshot
AggregateID string // ID of the aggregate
AggregateType string // Type of the aggregate
Version int // Version of the aggregate at snapshot time
Timestamp time.Time // When the snapshot was created
Data json.RawMessage // Serialized aggregate state
}
// Snapshotter defines the interface for snapshot storage
type Snapshotter interface {
// SaveSnapshot persists a snapshot
SaveSnapshot(ctx context.Context, snapshot Snapshot) error
// GetLatestSnapshot retrieves the most recent snapshot for an aggregate
GetLatestSnapshot(ctx context.Context, aggregateID string) (Snapshot, error)
}
// SnapshotableAggregate is an aggregate that supports snapshotting
type SnapshotableAggregate interface {
Aggregate
// ToSnapshot serializes the aggregate state for snapshotting
ToSnapshot() (json.RawMessage, error)
// FromSnapshot deserializes a snapshot into the aggregate state
FromSnapshot(data json.RawMessage) error
}
// PostgresSnapshotter implements Snapshotter using PostgreSQL
type PostgresSnapshotter struct {
db *sql.DB
}
// NewPostgresSnapshotter creates a new PostgreSQL-backed snapshotter
func NewPostgresSnapshotter(db *sql.DB) *PostgresSnapshotter {
return &PostgresSnapshotter{db: db}
}
// Initialize creates the necessary tables if they don't exist
func (s *PostgresSnapshotter) Initialize(ctx context.Context) error {
_, err := s.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS snapshots (
id UUID PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
version INT NOT NULL,
timestamp TIMESTAMP NOT NULL,
data JSONB NOT NULL,
-- Create an index on aggregate_id and version for efficient retrieval
UNIQUE (aggregate_id, version)
);
-- Create indexes for common query patterns
CREATE INDEX IF NOT EXISTS idx_snapshots_aggregate_id ON snapshots(aggregate_id);
`)
return err
}
// SaveSnapshot persists a snapshot
func (s *PostgresSnapshotter) SaveSnapshot(ctx context.Context, snapshot Snapshot) error {
// Ensure snapshot has a valid ID
if snapshot.ID == "" {
snapshot.ID = uuid.New().String()
}
// Set timestamp if not already set
if snapshot.Timestamp.IsZero() {
snapshot.Timestamp = time.Now().UTC()
}
_, err := s.db.ExecContext(ctx, `
INSERT INTO snapshots (
id, aggregate_id, aggregate_type, version, timestamp, data
) VALUES (
$1, $2, $3, $4, $5, $6
)
`, snapshot.ID, snapshot.AggregateID, snapshot.AggregateType,
snapshot.Version, snapshot.Timestamp, snapshot.Data)
return err
}
// GetLatestSnapshot retrieves the most recent snapshot for an aggregate
func (s *PostgresSnapshotter) GetLatestSnapshot(ctx context.Context, aggregateID string) (Snapshot, error) {
var snapshot Snapshot
err := s.db.QueryRowContext(ctx, `
SELECT id, aggregate_id, aggregate_type, version, timestamp, data
FROM snapshots
WHERE aggregate_id = $1
ORDER BY version DESC
LIMIT 1
`, aggregateID).Scan(
&snapshot.ID, &snapshot.AggregateID, &snapshot.AggregateType,
&snapshot.Version, &snapshot.Timestamp, &snapshot.Data,
)
if err == sql.ErrNoRows {
return Snapshot{}, errors.New("snapshot not found")
}
return snapshot, err
}
Snapshotting Strategy
Now let’s implement a strategy for when to create snapshots:
package eventsourcing
import (
"context"
"fmt"
)
// SnapshotStrategy defines when snapshots should be created
type SnapshotStrategy interface {
// ShouldSnapshot determines if a snapshot should be created
ShouldSnapshot(aggregateID string, eventCount int, lastSnapshotVersion int) bool
}
// IntervalSnapshotStrategy creates snapshots at fixed event intervals
type IntervalSnapshotStrategy struct {
interval int // Number of events between snapshots
}
// NewIntervalSnapshotStrategy creates a new interval-based snapshot strategy
func NewIntervalSnapshotStrategy(interval int) *IntervalSnapshotStrategy {
if interval <= 0 {
interval = 100 // Default to 100 events
}
return &IntervalSnapshotStrategy{interval: interval}
}
// ShouldSnapshot determines if a snapshot should be created
func (s *IntervalSnapshotStrategy) ShouldSnapshot(aggregateID string, eventCount int, lastSnapshotVersion int) bool {
return eventCount >= lastSnapshotVersion+s.interval
}
// SnapshotManager coordinates snapshot creation and loading
type SnapshotManager struct {
eventStore EventStore
snapshotter Snapshotter
strategy SnapshotStrategy
aggregateFactories map[string]func(id string) SnapshotableAggregate
}
// NewSnapshotManager creates a new snapshot manager
func NewSnapshotManager(
eventStore EventStore,
snapshotter Snapshotter,
strategy SnapshotStrategy,
) *SnapshotManager {
return &SnapshotManager{
eventStore: eventStore,
snapshotter: snapshotter,
strategy: strategy,
aggregateFactories: make(map[string]func(id string) SnapshotableAggregate),
}
}
// RegisterAggregate registers an aggregate type with the snapshot manager
func (m *SnapshotManager) RegisterAggregate(
aggregateType string,
factory func(id string) SnapshotableAggregate,
) {
m.aggregateFactories[aggregateType] = factory
}
// GetAggregate loads an aggregate, using snapshots if available
func (m *SnapshotManager) GetAggregate(
ctx context.Context,
aggregateID string,
aggregateType string,
) (SnapshotableAggregate, error) {
// Get the aggregate factory
factory, exists := m.aggregateFactories[aggregateType]
if !exists {
return nil, fmt.Errorf("unknown aggregate type: %s", aggregateType)
}
// Create the aggregate instance
aggregate := factory(aggregateID)
// Try to load the latest snapshot
snapshot, err := m.snapshotter.GetLatestSnapshot(ctx, aggregateID)
var snapshotVersion int
if err == nil {
// Apply the snapshot
if err := aggregate.FromSnapshot(snapshot.Data); err != nil {
return nil, fmt.Errorf("failed to apply snapshot: %w", err)
}
snapshotVersion = snapshot.Version
} else {
// No snapshot available, start from scratch
snapshotVersion = 0
}
// Load events after the snapshot
events, err := m.eventStore.GetEventsAfterVersion(ctx, aggregateID, snapshotVersion)
if err != nil {
return nil, fmt.Errorf("failed to load events: %w", err)
}
// Apply the events
if err := aggregate.ApplyEvents(events); err != nil {
return nil, fmt.Errorf("failed to apply events: %w", err)
}
// Check if we should create a new snapshot
if m.strategy.ShouldSnapshot(aggregateID, aggregate.Version(), snapshotVersion) {
if err := m.CreateSnapshot(ctx, aggregate); err != nil {
// Log the error but continue
fmt.Printf("Failed to create snapshot: %v\n", err)
}
}
return aggregate, nil
}
// CreateSnapshot creates a snapshot of the aggregate's current state
func (m *SnapshotManager) CreateSnapshot(ctx context.Context, aggregate SnapshotableAggregate) error {
// Serialize the aggregate state
data, err := aggregate.ToSnapshot()
if err != nil {
return fmt.Errorf("failed to serialize aggregate: %w", err)
}
// Create the snapshot
snapshot := Snapshot{
AggregateID: aggregate.ID(),
AggregateType: aggregate.Type(),
Version: aggregate.Version(),
Data: data,
}
// Save the snapshot
return m.snapshotter.SaveSnapshot(ctx, snapshot)
}
Making ProductAggregate Snapshotable
Let’s update our ProductAggregate to support snapshotting:
package ecommerce
import (
"encoding/json"
"github.com/yourdomain/eventsourcing"
)
// ProductSnapshot represents the serialized state of a ProductAggregate
type ProductSnapshot struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
StockLevel int `json:"stock_level"`
IsActive bool `json:"is_active"`
Version int `json:"version"`
}
// ToSnapshot serializes the aggregate state for snapshotting
func (p *ProductAggregate) ToSnapshot() (json.RawMessage, error) {
snapshot := ProductSnapshot{
ID: p.id,
Name: p.name,
Description: p.description,
Price: p.price,
StockLevel: p.stockLevel,
IsActive: p.isActive,
Version: p.version,
}
return json.Marshal(snapshot)
}
// FromSnapshot deserializes a snapshot into the aggregate state
func (p *ProductAggregate) FromSnapshot(data json.RawMessage) error {
var snapshot ProductSnapshot
if err := json.Unmarshal(data, &snapshot); err != nil {
return err
}
p.id = snapshot.ID
p.name = snapshot.Name
p.description = snapshot.Description
p.price = snapshot.Price
p.stockLevel = snapshot.StockLevel
p.isActive = snapshot.IsActive
p.version = snapshot.Version
return nil
}