Build event-sourced systems in Go with CQRS patterns.

#Most applications store only current state - when something changes, the old data is lost. Event sourcing takes a different approach: store every change as an immutable event, preserving the complete history.

The Problem with State-Based Storage

Traditional applications work like this:

// Traditional approach - only current state
type Account struct {
    ID      string
    Balance int
    Status  string
}

// When balance changes, old value is lost
func (a *Account) Withdraw(amount int) {
    a.Balance -= amount  // History is gone
}

Problems with this approach:

  • Lost History: Can’t see how the account reached its current state
  • No Audit Trail: Compliance and debugging become difficult
  • Limited Queries: Can’t ask “what was the balance yesterday?”
  • Concurrency Issues: Multiple updates can conflict

Event Sourcing Solution

Instead of storing state, store the events that create state:

// Event sourcing approach - store events
type Event struct {
    ID        string
    Type      string
    Data      interface{}
    Timestamp time.Time
}

type AccountCreated struct {
    AccountID string
    InitialBalance int
}

type MoneyWithdrawn struct {
    AccountID string
    Amount    int
}

Benefits:

  • Complete History: Every change is preserved
  • Audit Trail: Natural compliance and debugging
  • Time Travel: Reconstruct state at any point in time
  • Flexibility: New views of data without migration

When to Use Event Sourcing

Event sourcing works well when you need:

  • Audit Requirements: Financial, healthcare, or legal systems
  • Complex Business Logic: Domain events matter to the business
  • Temporal Queries: “Show me last month’s state”
  • High Write Throughput: Events are append-only

Don’t use event sourcing for:

  • Simple CRUD applications
  • Systems where current state is all that matters
  • Teams unfamiliar with the complexity

This guide shows you how to implement event sourcing in Go, from basic concepts to production systems.


Event Sourcing Fundamentals

Before diving into implementation details, let’s establish a solid understanding of event sourcing concepts and how they differ from traditional data storage approaches.

Core Concepts

Event sourcing revolves around several key concepts:

  1. Events: Immutable records of something that happened in the system. Events are always expressed in the past tense (e.g., OrderPlaced, PaymentProcessed).

  2. Aggregate: A cluster of domain objects that can be treated as a single unit. An aggregate has a root entity (the aggregate root) that enforces invariants for all objects within the aggregate.

  3. Event Store: A specialized database that stores events as the source of truth, rather than the current state.

  4. Projections: Read-optimized views built by consuming events and updating query-friendly data structures.

  5. Commands: Requests for the system to perform an action, which may generate events if successful.

Let’s start by defining the basic structures for our event sourcing system:

package eventsourcing

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/google/uuid"
)

// Event represents something that happened in the system
type Event struct {
	ID            string          // Unique identifier for the event
	AggregateID   string          // ID of the aggregate this event belongs to
	AggregateType string          // Type of the aggregate
	EventType     string          // Type of the event
	Version       int             // Version of the aggregate after this event
	Timestamp     time.Time       // When the event occurred
	Data          json.RawMessage // Event payload data
	Metadata      json.RawMessage // Additional metadata about the event
}

// Command represents a request to change the system state
type Command struct {
	ID            string          // Unique identifier for the command
	AggregateID   string          // ID of the target aggregate
	AggregateType string          // Type of the target aggregate
	CommandType   string          // Type of the command
	Data          json.RawMessage // Command payload data
	Metadata      json.RawMessage // Additional metadata about the command
}

// Aggregate is the base interface for all aggregates
type Aggregate interface {
	ID() string
	Type() string
	Version() int
	ApplyEvent(event Event) error
	ApplyEvents(events []Event) error
}

// EventHandler processes events after they are stored
type EventHandler interface {
	HandleEvent(ctx context.Context, event Event) error
}

// CommandHandler processes commands and produces events
type CommandHandler interface {
	HandleCommand(ctx context.Context, command Command) ([]Event, error)
}

Event Sourcing vs. Traditional State Storage

To understand why event sourcing is valuable, let’s compare it with traditional state storage:

Aspect Traditional Storage Event Sourcing
Data Model Current state only Complete history of changes
Updates Destructive (overwrites previous state) Non-destructive (appends new events)
History Limited or requires additional logging Built-in, comprehensive
Audit Requires additional implementation Inherent to the model
Temporal Queries Difficult or impossible Natural and straightforward
Debugging Limited historical context Complete historical context
Complexity Generally simpler More complex initially

Event sourcing shines in domains where:

  • Historical data is as important as current state
  • Audit trails are required for compliance or debugging
  • Business processes evolve over time
  • Complex domain logic benefits from capturing intent

Event Modeling

Before implementing event sourcing, it’s crucial to model your domain events properly. Events should:

  1. Be named in the past tense (e.g., UserRegistered, OrderShipped)
  2. Contain all data relevant to the event
  3. Be immutable once created
  4. Be self-contained and understandable in isolation

Let’s model a simple e-commerce domain with events:

package ecommerce

import (
	"encoding/json"
	"time"

	"github.com/google/uuid"
)

// Product-related events
type ProductCreated struct {
	ProductID   string  `json:"product_id"`
	Name        string  `json:"name"`
	Description string  `json:"description"`
	Price       float64 `json:"price"`
	StockLevel  int     `json:"stock_level"`
}

type ProductPriceChanged struct {
	ProductID  string  `json:"product_id"`
	OldPrice   float64 `json:"old_price"`
	NewPrice   float64 `json:"new_price"`
	ChangeDate time.Time `json:"change_date"`
}

type ProductStockAdjusted struct {
	ProductID      string `json:"product_id"`
	PreviousStock  int    `json:"previous_stock"`
	NewStockLevel  int    `json:"new_stock_level"`
	AdjustmentType string `json:"adjustment_type"` // "addition", "reduction", "inventory_correction"
}

// Order-related events
type OrderCreated struct {
	OrderID     string    `json:"order_id"`
	CustomerID  string    `json:"customer_id"`
	OrderDate   time.Time `json:"order_date"`
	TotalAmount float64   `json:"total_amount"`
}

type OrderItemAdded struct {
	OrderID   string  `json:"order_id"`
	ProductID string  `json:"product_id"`
	Quantity  int     `json:"quantity"`
	UnitPrice float64 `json:"unit_price"`
}

type OrderShipped struct {
	OrderID          string    `json:"order_id"`
	ShippingDate     time.Time `json:"shipping_date"`
	TrackingNumber   string    `json:"tracking_number"`
	EstimatedArrival time.Time `json:"estimated_arrival"`
}

type OrderCancelled struct {
	OrderID       string    `json:"order_id"`
	CancellationDate time.Time `json:"cancellation_date"`
	Reason        string    `json:"reason"`
}

// Helper function to create a generic event from a specific event type
func CreateEvent(aggregateID, aggregateType string, version int, eventType string, data interface{}) (Event, error) {
	dataBytes, err := json.Marshal(data)
	if err != nil {
		return Event{}, err
	}
	
	return Event{
		ID:            uuid.New().String(),
		AggregateID:   aggregateID,
		AggregateType: aggregateType,
		EventType:     eventType,
		Version:       version,
		Timestamp:     time.Now().UTC(),
		Data:          dataBytes,
	}, nil
}

Event Store Design and Implementation

The event store is the heart of any event sourcing system. It’s responsible for persisting events and allowing them to be retrieved by aggregate ID or for building projections.

Event Store Interface

Let’s define a clean interface for our event store:

package eventsourcing

import (
	"context"
	"errors"
)

var (
	ErrConcurrencyConflict = errors.New("concurrency conflict: aggregate has been modified")
	ErrAggregateNotFound   = errors.New("aggregate not found")
	ErrEventNotFound       = errors.New("event not found")
)

// EventStore defines the interface for storing and retrieving events
type EventStore interface {
	// SaveEvents persists new events to the store
	SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error
	
	// GetEvents retrieves all events for a specific aggregate
	GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
	
	// GetEventsByType retrieves events of a specific type
	GetEventsByType(ctx context.Context, eventType string) ([]Event, error)
	
	// GetAllEvents retrieves all events, optionally with pagination
	GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error)
	
	// GetAggregateVersion returns the current version of an aggregate
	GetAggregateVersion(ctx context.Context, aggregateID string) (int, error)
}

In-Memory Event Store Implementation

For development and testing, an in-memory event store is useful:

package eventsourcing

import (
	"context"
	"sort"
	"sync"
)

// InMemoryEventStore is a simple in-memory implementation of EventStore
type InMemoryEventStore struct {
	mu     sync.RWMutex
	events map[string][]Event // Map of aggregateID to events
}

// NewInMemoryEventStore creates a new in-memory event store
func NewInMemoryEventStore() *InMemoryEventStore {
	return &InMemoryEventStore{
		events: make(map[string][]Event),
	}
}

// SaveEvents persists new events to the store
func (s *InMemoryEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
	if len(events) == 0 {
		return nil
	}
	
	s.mu.Lock()
	defer s.mu.Unlock()
	
	// Check if aggregate exists
	existingEvents, exists := s.events[aggregateID]
	
	// If aggregate exists, verify version
	if exists {
		currentVersion := len(existingEvents)
		if currentVersion != expectedVersion {
			return ErrConcurrencyConflict
		}
	} else if expectedVersion > 0 {
		// If aggregate doesn't exist but we expected a version > 0
		return ErrAggregateNotFound
	}
	
	// Append new events
	s.events[aggregateID] = append(existingEvents, events...)
	
	return nil
}

// GetEvents retrieves all events for a specific aggregate
func (s *InMemoryEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	
	events, exists := s.events[aggregateID]
	if !exists {
		return nil, ErrAggregateNotFound
	}
	
	// Return a copy to prevent modification of internal state
	result := make([]Event, len(events))
	copy(result, events)
	
	return result, nil
}

// GetEventsByType retrieves events of a specific type
func (s *InMemoryEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	
	var result []Event
	
	for _, aggregateEvents := range s.events {
		for _, event := range aggregateEvents {
			if event.EventType == eventType {
				result = append(result, event)
			}
		}
	}
	
	// Sort by timestamp for consistent ordering
	sort.Slice(result, func(i, j int) bool {
		return result[i].Timestamp.Before(result[j].Timestamp)
	})
	
	return result, nil
}

// GetAllEvents retrieves all events, optionally with pagination
func (s *InMemoryEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	
	var allEvents []Event
	
	// Collect all events
	for _, aggregateEvents := range s.events {
		allEvents = append(allEvents, aggregateEvents...)
	}
	
	// Sort by timestamp
	sort.Slice(allEvents, func(i, j int) bool {
		return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
	})
	
	// Apply pagination
	if offset >= len(allEvents) {
		return []Event{}, nil
	}
	
	end := offset + limit
	if end > len(allEvents) {
		end = len(allEvents)
	}
	
	return allEvents[offset:end], nil
}

// GetAggregateVersion returns the current version of an aggregate
func (s *InMemoryEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	
	events, exists := s.events[aggregateID]
	if !exists {
		return 0, nil // New aggregates start at version 0
	}
	
	return len(events), nil
}

PostgreSQL Event Store Implementation

For production use, we need a persistent event store. Here’s an implementation using PostgreSQL:

package eventsourcing

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"time"

	"github.com/google/uuid"
	_ "github.com/lib/pq"
)

// PostgresEventStore implements EventStore using PostgreSQL
type PostgresEventStore struct {
	db *sql.DB
}

// NewPostgresEventStore creates a new PostgreSQL-backed event store
func NewPostgresEventStore(connectionString string) (*PostgresEventStore, error) {
	db, err := sql.Open("postgres", connectionString)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to database: %w", err)
	}
	
	// Test connection
	if err := db.Ping(); err != nil {
		return nil, fmt.Errorf("failed to ping database: %w", err)
	}
	
	return &PostgresEventStore{db: db}, nil
}

// Initialize creates the necessary tables if they don't exist
func (s *PostgresEventStore) Initialize(ctx context.Context) error {
	// Create events table
	_, err := s.db.ExecContext(ctx, `
		CREATE TABLE IF NOT EXISTS events (
			id UUID PRIMARY KEY,
			aggregate_id VARCHAR(255) NOT NULL,
			aggregate_type VARCHAR(255) NOT NULL,
			event_type VARCHAR(255) NOT NULL,
			version INT NOT NULL,
			timestamp TIMESTAMP NOT NULL,
			data JSONB NOT NULL,
			metadata JSONB,
			
			-- Create an index on aggregate_id and version for optimistic concurrency control
			UNIQUE (aggregate_id, version)
		);
		
		-- Create indexes for common query patterns
		CREATE INDEX IF NOT EXISTS idx_events_aggregate_id ON events(aggregate_id);
		CREATE INDEX IF NOT EXISTS idx_events_event_type ON events(event_type);
		CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
	`)
	
	if err != nil {
		return fmt.Errorf("failed to create events table: %w", err)
	}
	
	return nil
}

// SaveEvents persists new events to the store
func (s *PostgresEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
	if len(events) == 0 {
		return nil
	}
	
	// Start a transaction
	tx, err := s.db.BeginTx(ctx, nil)
	if err != nil {
		return fmt.Errorf("failed to begin transaction: %w", err)
	}
	defer tx.Rollback() // Will be ignored if transaction is committed
	
	// Check current version
	var currentVersion int
	err = tx.QueryRowContext(ctx, 
		"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1", 
		aggregateID).Scan(&currentVersion)
	
	if err != nil {
		return fmt.Errorf("failed to get current version: %w", err)
	}
	
	if currentVersion != expectedVersion {
		return ErrConcurrencyConflict
	}
	
	// Insert all events
	stmt, err := tx.PrepareContext(ctx, `
		INSERT INTO events (
			id, aggregate_id, aggregate_type, event_type, 
			version, timestamp, data, metadata
		) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
	`)
	if err != nil {
		return fmt.Errorf("failed to prepare statement: %w", err)
	}
	defer stmt.Close()
	
	for i, event := range events {
		// Ensure event has a valid ID
		if event.ID == "" {
			event.ID = uuid.New().String()
		}
		
		// Set version based on expected version
		event.Version = expectedVersion + i + 1
		
		// Set timestamp if not already set
		if event.Timestamp.IsZero() {
			event.Timestamp = time.Now().UTC()
		}
		
		_, err = stmt.ExecContext(ctx,
			event.ID, event.AggregateID, event.AggregateType, event.EventType,
			event.Version, event.Timestamp, event.Data, event.Metadata)
		
		if err != nil {
			return fmt.Errorf("failed to insert event: %w", err)
		}
	}
	
	// Commit the transaction
	if err := tx.Commit(); err != nil {
		return fmt.Errorf("failed to commit transaction: %w", err)
	}
	
	return nil
}

// GetEvents retrieves all events for a specific aggregate
func (s *PostgresEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
	rows, err := s.db.QueryContext(ctx, `
		SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
		FROM events
		WHERE aggregate_id = $1
		ORDER BY version ASC
	`, aggregateID)
	
	if err != nil {
		return nil, fmt.Errorf("failed to query events: %w", err)
	}
	defer rows.Close()
	
	return s.scanEvents(rows)
}

// GetEventsByType retrieves events of a specific type
func (s *PostgresEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
	rows, err := s.db.QueryContext(ctx, `
		SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
		FROM events
		WHERE event_type = $1
		ORDER BY timestamp ASC
	`, eventType)
	
	if err != nil {
		return nil, fmt.Errorf("failed to query events by type: %w", err)
	}
	defer rows.Close()
	
	return s.scanEvents(rows)
}

// GetAllEvents retrieves all events, optionally with pagination
func (s *PostgresEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
	rows, err := s.db.QueryContext(ctx, `
		SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
		FROM events
		ORDER BY timestamp ASC
		LIMIT $1 OFFSET $2
	`, limit, offset)
	
	if err != nil {
		return nil, fmt.Errorf("failed to query all events: %w", err)
	}
	defer rows.Close()
	
	return s.scanEvents(rows)
}

// GetAggregateVersion returns the current version of an aggregate
func (s *PostgresEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
	var version int
	err := s.db.QueryRowContext(ctx, 
		"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1", 
		aggregateID).Scan(&version)
	
	if err != nil {
		return 0, fmt.Errorf("failed to get aggregate version: %w", err)
	}
	
	return version, nil
}

// scanEvents is a helper function to scan rows into Event structs
func (s *PostgresEventStore) scanEvents(rows *sql.Rows) ([]Event, error) {
	var events []Event
	
	for rows.Next() {
		var event Event
		var data, metadata []byte
		
		err := rows.Scan(
			&event.ID, &event.AggregateID, &event.AggregateType, &event.EventType,
			&event.Version, &event.Timestamp, &data, &metadata)
		
		if err != nil {
			return nil, fmt.Errorf("failed to scan event row: %w", err)
		}
		
		event.Data = json.RawMessage(data)
		event.Metadata = json.RawMessage(metadata)
		
		events = append(events, event)
	}
	
	if err := rows.Err(); err != nil {
		return nil, fmt.Errorf("error iterating event rows: %w", err)
	}
	
	return events, nil
}

// Close closes the database connection
func (s *PostgresEventStore) Close() error {
	return s.db.Close()
}

CQRS Pattern Integration

Command Query Responsibility Segregation (CQRS) is a pattern that separates the read and write operations of a data store. When combined with event sourcing, it creates a powerful architecture for building scalable, responsive systems.

CQRS Principles

In CQRS:

  1. Commands modify state but don’t return data
  2. Queries return data but don’t modify state
  3. Write models are optimized for consistency and validation
  4. Read models (projections) are optimized for specific query needs

This separation allows each side to be optimized independently:

  • Write models can focus on business rules and consistency
  • Read models can be denormalized for performance
  • Different storage technologies can be used for each side

Implementing Command Handlers

Command handlers are responsible for validating commands, applying business rules, and generating events:

package eventsourcing

import (
	"context"
	"errors"
	"fmt"
)

// BaseAggregate provides common functionality for aggregates
type BaseAggregate struct {
	id      string
	typ     string
	version int
	changes []Event
}

// ID returns the aggregate's identifier
func (a *BaseAggregate) ID() string {
	return a.id
}

// Type returns the aggregate's type
func (a *BaseAggregate) Type() string {
	return a.typ
}

// Version returns the aggregate's current version
func (a *BaseAggregate) Version() int {
	return a.version
}

// Changes returns the uncommitted changes
func (a *BaseAggregate) Changes() []Event {
	return a.changes
}

// ClearChanges clears the uncommitted changes
func (a *BaseAggregate) ClearChanges() {
	a.changes = nil
}

// AddChange adds a new change event
func (a *BaseAggregate) AddChange(eventType string, data interface{}) error {
	event, err := CreateEvent(a.id, a.typ, a.version+1, eventType, data)
	if err != nil {
		return err
	}
	
	a.changes = append(a.changes, event)
	return nil
}

// Apply applies an event to the aggregate
func (a *BaseAggregate) Apply(event Event) {
	a.version = event.Version
}

// GenericCommandHandler provides a base implementation for command handlers
type GenericCommandHandler struct {
	eventStore EventStore
	aggregates map[string]func(id string) Aggregate
}

// NewGenericCommandHandler creates a new command handler
func NewGenericCommandHandler(eventStore EventStore) *GenericCommandHandler {
	return &GenericCommandHandler{
		eventStore: eventStore,
		aggregates: make(map[string]func(id string) Aggregate),
	}
}

// RegisterAggregate registers an aggregate type with the command handler
func (h *GenericCommandHandler) RegisterAggregate(aggregateType string, factory func(id string) Aggregate) {
	h.aggregates[aggregateType] = factory
}

// HandleCommand processes a command and generates events
func (h *GenericCommandHandler) HandleCommand(ctx context.Context, command Command) ([]Event, error) {
	// Get the aggregate factory
	factory, exists := h.aggregates[command.AggregateType]
	if !exists {
		return nil, fmt.Errorf("unknown aggregate type: %s", command.AggregateType)
	}
	
	// Create the aggregate instance
	aggregate := factory(command.AggregateID)
	
	// Load the aggregate's events
	events, err := h.eventStore.GetEvents(ctx, command.AggregateID)
	if err != nil && !errors.Is(err, ErrAggregateNotFound) {
		return nil, fmt.Errorf("failed to load aggregate events: %w", err)
	}
	
	// Apply existing events to rebuild the aggregate's state
	if err := aggregate.ApplyEvents(events); err != nil {
		return nil, fmt.Errorf("failed to apply events to aggregate: %w", err)
	}
	
	// Process the command (this is implemented by concrete command handlers)
	if err := h.processCommand(ctx, aggregate, command); err != nil {
		return nil, err
	}
	
	// Get the changes produced by the command
	changes := aggregate.(interface{ Changes() []Event }).Changes()
	if len(changes) == 0 {
		return nil, nil // No changes to save
	}
	
	// Save the new events
	if err := h.eventStore.SaveEvents(ctx, aggregate.ID(), aggregate.Version(), changes); err != nil {
		return nil, fmt.Errorf("failed to save events: %w", err)
	}
	
	// Clear the changes from the aggregate
	aggregate.(interface{ ClearChanges() }).ClearChanges()
	
	return changes, nil
}

// processCommand is implemented by concrete command handlers
func (h *GenericCommandHandler) processCommand(ctx context.Context, aggregate Aggregate, command Command) error {
	return errors.New("processCommand must be implemented by concrete command handlers")
}

Implementing a Product Aggregate

Let’s implement a concrete aggregate for our e-commerce example:

package ecommerce

import (
	"encoding/json"
	"errors"
	"fmt"

	"github.com/yourdomain/eventsourcing"
)

// ProductAggregate represents a product in our e-commerce system
type ProductAggregate struct {
	eventsourcing.BaseAggregate
	
	// Product state
	name        string
	description string
	price       float64
	stockLevel  int
	isActive    bool
}

// NewProductAggregate creates a new product aggregate
func NewProductAggregate(id string) eventsourcing.Aggregate {
	product := &ProductAggregate{}
	product.id = id
	product.typ = "product"
	return product
}

// ApplyEvent applies a single event to the aggregate
func (p *ProductAggregate) ApplyEvent(event eventsourcing.Event) error {
	// Apply the event to the base aggregate
	p.BaseAggregate.Apply(event)
	
	// Apply event-specific logic
	switch event.EventType {
	case "ProductCreated":
		var data ProductCreated
		if err := json.Unmarshal(event.Data, &data); err != nil {
			return err
		}
		p.name = data.Name
		p.description = data.Description
		p.price = data.Price
		p.stockLevel = data.StockLevel
		p.isActive = true
		
	case "ProductPriceChanged":
		var data ProductPriceChanged
		if err := json.Unmarshal(event.Data, &data); err != nil {
			return err
		}
		p.price = data.NewPrice
		
	case "ProductStockAdjusted":
		var data ProductStockAdjusted
		if err := json.Unmarshal(event.Data, &data); err != nil {
			return err
		}
		p.stockLevel = data.NewStockLevel
		
	case "ProductDeactivated":
		p.isActive = false
		
	case "ProductReactivated":
		p.isActive = true
		
	default:
		return fmt.Errorf("unknown event type: %s", event.EventType)
	}
	
	return nil
}

// ApplyEvents applies multiple events to the aggregate
func (p *ProductAggregate) ApplyEvents(events []eventsourcing.Event) error {
	for _, event := range events {
		if err := p.ApplyEvent(event); err != nil {
			return err
		}
	}
	return nil
}

// CreateProduct creates a new product
func (p *ProductAggregate) CreateProduct(name, description string, price float64, stockLevel int) error {
	// Validate inputs
	if name == "" {
		return errors.New("product name cannot be empty")
	}
	if price <= 0 {
		return errors.New("product price must be positive")
	}
	if stockLevel < 0 {
		return errors.New("stock level cannot be negative")
	}
	
	// Check if product already exists
	if p.name != "" {
		return errors.New("product already exists")
	}
	
	// Create the event
	return p.AddChange("ProductCreated", ProductCreated{
		ProductID:   p.id,
		Name:        name,
		Description: description,
		Price:       price,
		StockLevel:  stockLevel,
	})
}

// ChangePrice changes the product's price
func (p *ProductAggregate) ChangePrice(newPrice float64) error {
	// Validate inputs
	if newPrice <= 0 {
		return errors.New("product price must be positive")
	}
	
	// Check if product exists
	if p.name == "" {
		return errors.New("product does not exist")
	}
	
	// Check if price is actually changing
	if p.price == newPrice {
		return nil // No change needed
	}
	
	// Create the event
	return p.AddChange("ProductPriceChanged", ProductPriceChanged{
		ProductID:  p.id,
		OldPrice:   p.price,
		NewPrice:   newPrice,
		ChangeDate: time.Now().UTC(),
	})
}

// AdjustStock adjusts the product's stock level
func (p *ProductAggregate) AdjustStock(adjustment int, adjustmentType string) error {
	// Check if product exists
	if p.name == "" {
		return errors.New("product does not exist")
	}
	
	// Calculate new stock level
	newStockLevel := p.stockLevel + adjustment
	
	// Validate new stock level
	if newStockLevel < 0 {
		return errors.New("stock level cannot be negative")
	}
	
	// Create the event
	return p.AddChange("ProductStockAdjusted", ProductStockAdjusted{
		ProductID:      p.id,
		PreviousStock:  p.stockLevel,
		NewStockLevel:  newStockLevel,
		AdjustmentType: adjustmentType,
	})
}

Product Command Handler

Now let’s implement a command handler for our product aggregate:

package ecommerce

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"

	"github.com/yourdomain/eventsourcing"
)

// ProductCommands
type CreateProductCommand struct {
	ProductID   string  `json:"product_id"`
	Name        string  `json:"name"`
	Description string  `json:"description"`
	Price       float64 `json:"price"`
	StockLevel  int     `json:"stock_level"`
}

type ChangePriceCommand struct {
	ProductID string  `json:"product_id"`
	NewPrice  float64 `json:"new_price"`
}

type AdjustStockCommand struct {
	ProductID      string `json:"product_id"`
	Adjustment     int    `json:"adjustment"`
	AdjustmentType string `json:"adjustment_type"`
}

// ProductCommandHandler handles commands for the product aggregate
type ProductCommandHandler struct {
	*eventsourcing.GenericCommandHandler
}

// NewProductCommandHandler creates a new product command handler
func NewProductCommandHandler(eventStore eventsourcing.EventStore) *ProductCommandHandler {
	handler := &ProductCommandHandler{
		GenericCommandHandler: eventsourcing.NewGenericCommandHandler(eventStore),
	}
	
	// Register the product aggregate
	handler.RegisterAggregate("product", NewProductAggregate)
	
	return handler
}

// processCommand processes a command for a product aggregate
func (h *ProductCommandHandler) processCommand(ctx context.Context, aggregate eventsourcing.Aggregate, command eventsourcing.Command) error {
	product, ok := aggregate.(*ProductAggregate)
	if !ok {
		return errors.New("invalid aggregate type")
	}
	
	switch command.CommandType {
	case "CreateProduct":
		var cmd CreateProductCommand
		if err := json.Unmarshal(command.Data, &cmd); err != nil {
			return err
		}
		return product.CreateProduct(cmd.Name, cmd.Description, cmd.Price, cmd.StockLevel)
		
	case "ChangePrice":
		var cmd ChangePriceCommand
		if err := json.Unmarshal(command.Data, &cmd); err != nil {
			return err
		}
		return product.ChangePrice(cmd.NewPrice)
		
	case "AdjustStock":
		var cmd AdjustStockCommand
		if err := json.Unmarshal(command.Data, &cmd); err != nil {
			return err
		}
		return product.AdjustStock(cmd.Adjustment, cmd.AdjustmentType)
		
	default:
		return fmt.Errorf("unknown command type: %s", command.CommandType)
	}
}

Event Processing and Projections

In event sourcing, projections are read models built by consuming events and updating query-friendly data structures. They transform the event stream into optimized views for specific query needs.

Projection Fundamentals

Projections have several key characteristics:

  1. Derived Data: They contain no original data, only derived views of events
  2. Rebuilable: They can be rebuilt from scratch by replaying events
  3. Query-Optimized: They are structured for specific query patterns
  4. Eventually Consistent: They may lag slightly behind the write side

Let’s implement a simple projection system:

package eventsourcing

import (
	"context"
	"fmt"
	"sync"
)

// Projection defines the interface for a read model projection
type Projection interface {
	// Name returns the unique name of the projection
	Name() string
	
	// HandleEvent processes an event and updates the projection
	HandleEvent(ctx context.Context, event Event) error
	
	// Reset clears the projection state
	Reset(ctx context.Context) error
}

// ProjectionManager manages a set of projections
type ProjectionManager struct {
	eventStore  EventStore
	projections map[string]Projection
	mu          sync.RWMutex
}

// NewProjectionManager creates a new projection manager
func NewProjectionManager(eventStore EventStore) *ProjectionManager {
	return &ProjectionManager{
		eventStore:  eventStore,
		projections: make(map[string]Projection),
	}
}

// RegisterProjection registers a projection with the manager
func (m *ProjectionManager) RegisterProjection(projection Projection) {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.projections[projection.Name()] = projection
}

// HandleEvent processes an event through all registered projections
func (m *ProjectionManager) HandleEvent(ctx context.Context, event Event) error {
	m.mu.RLock()
	defer m.mu.RUnlock()
	
	for _, projection := range m.projections {
		if err := projection.HandleEvent(ctx, event); err != nil {
			return fmt.Errorf("projection %s failed to handle event: %w", projection.Name(), err)
		}
	}
	
	return nil
}

// RebuildProjection rebuilds a specific projection from scratch
func (m *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error {
	m.mu.RLock()
	projection, exists := m.projections[projectionName]
	m.mu.RUnlock()
	
	if !exists {
		return fmt.Errorf("projection %s not found", projectionName)
	}
	
	// Reset the projection
	if err := projection.Reset(ctx); err != nil {
		return fmt.Errorf("failed to reset projection: %w", err)
	}
	
	// Get all events
	offset := 0
	batchSize := 1000
	
	for {
		events, err := m.eventStore.GetAllEvents(ctx, offset, batchSize)
		if err != nil {
			return fmt.Errorf("failed to get events: %w", err)
		}
		
		// Process each event
		for _, event := range events {
			if err := projection.HandleEvent(ctx, event); err != nil {
				return fmt.Errorf("failed to handle event during rebuild: %w", err)
			}
		}
		
		// If we got fewer events than the batch size, we're done
		if len(events) < batchSize {
			break
		}
		
		// Move to the next batch
		offset += batchSize
	}
	
	return nil
}

// RebuildAllProjections rebuilds all registered projections
func (m *ProjectionManager) RebuildAllProjections(ctx context.Context) error {
	m.mu.RLock()
	projectionNames := make([]string, 0, len(m.projections))
	for name := range m.projections {
		projectionNames = append(projectionNames, name)
	}
	m.mu.RUnlock()
	
	for _, name := range projectionNames {
		if err := m.RebuildProjection(ctx, name); err != nil {
			return err
		}
	}
	
	return nil
}

Product Catalog Projection

Let’s implement a concrete projection for our e-commerce product catalog:

package ecommerce

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"

	"github.com/yourdomain/eventsourcing"
)

// ProductCatalogProjection maintains a read-optimized view of products
type ProductCatalogProjection struct {
	db *sql.DB
}

// NewProductCatalogProjection creates a new product catalog projection
func NewProductCatalogProjection(db *sql.DB) *ProductCatalogProjection {
	return &ProductCatalogProjection{db: db}
}

// Name returns the unique name of the projection
func (p *ProductCatalogProjection) Name() string {
	return "product_catalog"
}

// Initialize creates the necessary tables if they don't exist
func (p *ProductCatalogProjection) Initialize(ctx context.Context) error {
	_, err := p.db.ExecContext(ctx, `
		CREATE TABLE IF NOT EXISTS product_catalog (
			product_id VARCHAR(255) PRIMARY KEY,
			name VARCHAR(255) NOT NULL,
			description TEXT,
			price DECIMAL(10, 2) NOT NULL,
			stock_level INT NOT NULL,
			is_active BOOLEAN NOT NULL DEFAULT TRUE,
			created_at TIMESTAMP NOT NULL,
			updated_at TIMESTAMP NOT NULL
		);
		
		CREATE INDEX IF NOT EXISTS idx_product_catalog_name ON product_catalog(name);
		CREATE INDEX IF NOT EXISTS idx_product_catalog_price ON product_catalog(price);
	`)
	
	return err
}

// HandleEvent processes an event and updates the projection
func (p *ProductCatalogProjection) HandleEvent(ctx context.Context, event eventsourcing.Event) error {
	switch event.EventType {
	case "ProductCreated":
		return p.handleProductCreated(ctx, event)
	case "ProductPriceChanged":
		return p.handleProductPriceChanged(ctx, event)
	case "ProductStockAdjusted":
		return p.handleProductStockAdjusted(ctx, event)
	}
	
	// Ignore events we don't care about
	return nil
}

// Reset clears the projection state
func (p *ProductCatalogProjection) Reset(ctx context.Context) error {
	_, err := p.db.ExecContext(ctx, "DELETE FROM product_catalog")
	return err
}

// handleProductCreated processes a ProductCreated event
func (p *ProductCatalogProjection) handleProductCreated(ctx context.Context, event eventsourcing.Event) error {
	var data ProductCreated
	if err := json.Unmarshal(event.Data, &data); err != nil {
		return err
	}
	
	_, err := p.db.ExecContext(ctx, `
		INSERT INTO product_catalog (
			product_id, name, description, price, stock_level, is_active, created_at, updated_at
		) VALUES (
			$1, $2, $3, $4, $5, TRUE, $6, $6
		)
	`, data.ProductID, data.Name, data.Description, data.Price, data.StockLevel, event.Timestamp)
	
	return err
}

// handleProductPriceChanged processes a ProductPriceChanged event
func (p *ProductCatalogProjection) handleProductPriceChanged(ctx context.Context, event eventsourcing.Event) error {
	var data ProductPriceChanged
	if err := json.Unmarshal(event.Data, &data); err != nil {
		return err
	}
	
	_, err := p.db.ExecContext(ctx, `
		UPDATE product_catalog
		SET price = $1, updated_at = $2
		WHERE product_id = $3
	`, data.NewPrice, event.Timestamp, data.ProductID)
	
	return err
}

// handleProductStockAdjusted processes a ProductStockAdjusted event
func (p *ProductCatalogProjection) handleProductStockAdjusted(ctx context.Context, event eventsourcing.Event) error {
	var data ProductStockAdjusted
	if err := json.Unmarshal(event.Data, &data); err != nil {
		return err
	}
	
	_, err := p.db.ExecContext(ctx, `
		UPDATE product_catalog
		SET stock_level = $1, updated_at = $2
		WHERE product_id = $3
	`, data.NewStockLevel, event.Timestamp, data.ProductID)
	
	return err
}

Snapshotting and Performance Optimization

As event stores grow, rebuilding aggregates from their entire event history becomes increasingly expensive. Snapshotting addresses this by periodically capturing the aggregate’s state, allowing us to rebuild from the snapshot plus subsequent events.

Snapshot Implementation

Let’s implement a snapshotting system:

package eventsourcing

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"time"

	"github.com/google/uuid"
)

// Snapshot represents a point-in-time capture of an aggregate's state
type Snapshot struct {
	ID            string          // Unique identifier for the snapshot
	AggregateID   string          // ID of the aggregate
	AggregateType string          // Type of the aggregate
	Version       int             // Version of the aggregate at snapshot time
	Timestamp     time.Time       // When the snapshot was created
	Data          json.RawMessage // Serialized aggregate state
}

// Snapshotter defines the interface for snapshot storage
type Snapshotter interface {
	// SaveSnapshot persists a snapshot
	SaveSnapshot(ctx context.Context, snapshot Snapshot) error
	
	// GetLatestSnapshot retrieves the most recent snapshot for an aggregate
	GetLatestSnapshot(ctx context.Context, aggregateID string) (Snapshot, error)
}

// SnapshotableAggregate is an aggregate that supports snapshotting
type SnapshotableAggregate interface {
	Aggregate
	
	// ToSnapshot serializes the aggregate state for snapshotting
	ToSnapshot() (json.RawMessage, error)
	
	// FromSnapshot deserializes a snapshot into the aggregate state
	FromSnapshot(data json.RawMessage) error
}

// PostgresSnapshotter implements Snapshotter using PostgreSQL
type PostgresSnapshotter struct {
	db *sql.DB
}

// NewPostgresSnapshotter creates a new PostgreSQL-backed snapshotter
func NewPostgresSnapshotter(db *sql.DB) *PostgresSnapshotter {
	return &PostgresSnapshotter{db: db}
}

// Initialize creates the necessary tables if they don't exist
func (s *PostgresSnapshotter) Initialize(ctx context.Context) error {
	_, err := s.db.ExecContext(ctx, `
		CREATE TABLE IF NOT EXISTS snapshots (
			id UUID PRIMARY KEY,
			aggregate_id VARCHAR(255) NOT NULL,
			aggregate_type VARCHAR(255) NOT NULL,
			version INT NOT NULL,
			timestamp TIMESTAMP NOT NULL,
			data JSONB NOT NULL,
			
			-- Create an index on aggregate_id and version for efficient retrieval
			UNIQUE (aggregate_id, version)
		);
		
		-- Create indexes for common query patterns
		CREATE INDEX IF NOT EXISTS idx_snapshots_aggregate_id ON snapshots(aggregate_id);
	`)
	
	return err
}

// SaveSnapshot persists a snapshot
func (s *PostgresSnapshotter) SaveSnapshot(ctx context.Context, snapshot Snapshot) error {
	// Ensure snapshot has a valid ID
	if snapshot.ID == "" {
		snapshot.ID = uuid.New().String()
	}
	
	// Set timestamp if not already set
	if snapshot.Timestamp.IsZero() {
		snapshot.Timestamp = time.Now().UTC()
	}
	
	_, err := s.db.ExecContext(ctx, `
		INSERT INTO snapshots (
			id, aggregate_id, aggregate_type, version, timestamp, data
		) VALUES (
			$1, $2, $3, $4, $5, $6
		)
	`, snapshot.ID, snapshot.AggregateID, snapshot.AggregateType,
	   snapshot.Version, snapshot.Timestamp, snapshot.Data)
	
	return err
}

// GetLatestSnapshot retrieves the most recent snapshot for an aggregate
func (s *PostgresSnapshotter) GetLatestSnapshot(ctx context.Context, aggregateID string) (Snapshot, error) {
	var snapshot Snapshot
	
	err := s.db.QueryRowContext(ctx, `
		SELECT id, aggregate_id, aggregate_type, version, timestamp, data
		FROM snapshots
		WHERE aggregate_id = $1
		ORDER BY version DESC
		LIMIT 1
	`, aggregateID).Scan(
		&snapshot.ID, &snapshot.AggregateID, &snapshot.AggregateType,
		&snapshot.Version, &snapshot.Timestamp, &snapshot.Data,
	)
	
	if err == sql.ErrNoRows {
		return Snapshot{}, errors.New("snapshot not found")
	}
	
	return snapshot, err
}

Snapshotting Strategy

Now let’s implement a strategy for when to create snapshots:

package eventsourcing

import (
	"context"
	"fmt"
)

// SnapshotStrategy defines when snapshots should be created
type SnapshotStrategy interface {
	// ShouldSnapshot determines if a snapshot should be created
	ShouldSnapshot(aggregateID string, eventCount int, lastSnapshotVersion int) bool
}

// IntervalSnapshotStrategy creates snapshots at fixed event intervals
type IntervalSnapshotStrategy struct {
	interval int // Number of events between snapshots
}

// NewIntervalSnapshotStrategy creates a new interval-based snapshot strategy
func NewIntervalSnapshotStrategy(interval int) *IntervalSnapshotStrategy {
	if interval <= 0 {
		interval = 100 // Default to 100 events
	}
	return &IntervalSnapshotStrategy{interval: interval}
}

// ShouldSnapshot determines if a snapshot should be created
func (s *IntervalSnapshotStrategy) ShouldSnapshot(aggregateID string, eventCount int, lastSnapshotVersion int) bool {
	return eventCount >= lastSnapshotVersion+s.interval
}

// SnapshotManager coordinates snapshot creation and loading
type SnapshotManager struct {
	eventStore        EventStore
	snapshotter       Snapshotter
	strategy          SnapshotStrategy
	aggregateFactories map[string]func(id string) SnapshotableAggregate
}

// NewSnapshotManager creates a new snapshot manager
func NewSnapshotManager(
	eventStore EventStore,
	snapshotter Snapshotter,
	strategy SnapshotStrategy,
) *SnapshotManager {
	return &SnapshotManager{
		eventStore:        eventStore,
		snapshotter:       snapshotter,
		strategy:          strategy,
		aggregateFactories: make(map[string]func(id string) SnapshotableAggregate),
	}
}

// RegisterAggregate registers an aggregate type with the snapshot manager
func (m *SnapshotManager) RegisterAggregate(
	aggregateType string,
	factory func(id string) SnapshotableAggregate,
) {
	m.aggregateFactories[aggregateType] = factory
}

// GetAggregate loads an aggregate, using snapshots if available
func (m *SnapshotManager) GetAggregate(
	ctx context.Context,
	aggregateID string,
	aggregateType string,
) (SnapshotableAggregate, error) {
	// Get the aggregate factory
	factory, exists := m.aggregateFactories[aggregateType]
	if !exists {
		return nil, fmt.Errorf("unknown aggregate type: %s", aggregateType)
	}
	
	// Create the aggregate instance
	aggregate := factory(aggregateID)
	
	// Try to load the latest snapshot
	snapshot, err := m.snapshotter.GetLatestSnapshot(ctx, aggregateID)
	
	var snapshotVersion int
	if err == nil {
		// Apply the snapshot
		if err := aggregate.FromSnapshot(snapshot.Data); err != nil {
			return nil, fmt.Errorf("failed to apply snapshot: %w", err)
		}
		snapshotVersion = snapshot.Version
	} else {
		// No snapshot available, start from scratch
		snapshotVersion = 0
	}
	
	// Load events after the snapshot
	events, err := m.eventStore.GetEventsAfterVersion(ctx, aggregateID, snapshotVersion)
	if err != nil {
		return nil, fmt.Errorf("failed to load events: %w", err)
	}
	
	// Apply the events
	if err := aggregate.ApplyEvents(events); err != nil {
		return nil, fmt.Errorf("failed to apply events: %w", err)
	}
	
	// Check if we should create a new snapshot
	if m.strategy.ShouldSnapshot(aggregateID, aggregate.Version(), snapshotVersion) {
		if err := m.CreateSnapshot(ctx, aggregate); err != nil {
			// Log the error but continue
			fmt.Printf("Failed to create snapshot: %v\n", err)
		}
	}
	
	return aggregate, nil
}

// CreateSnapshot creates a snapshot of the aggregate's current state
func (m *SnapshotManager) CreateSnapshot(ctx context.Context, aggregate SnapshotableAggregate) error {
	// Serialize the aggregate state
	data, err := aggregate.ToSnapshot()
	if err != nil {
		return fmt.Errorf("failed to serialize aggregate: %w", err)
	}
	
	// Create the snapshot
	snapshot := Snapshot{
		AggregateID:   aggregate.ID(),
		AggregateType: aggregate.Type(),
		Version:       aggregate.Version(),
		Data:          data,
	}
	
	// Save the snapshot
	return m.snapshotter.SaveSnapshot(ctx, snapshot)
}

Making ProductAggregate Snapshotable

Let’s update our ProductAggregate to support snapshotting:

package ecommerce

import (
	"encoding/json"

	"github.com/yourdomain/eventsourcing"
)

// ProductSnapshot represents the serialized state of a ProductAggregate
type ProductSnapshot struct {
	ID          string  `json:"id"`
	Name        string  `json:"name"`
	Description string  `json:"description"`
	Price       float64 `json:"price"`
	StockLevel  int     `json:"stock_level"`
	IsActive    bool    `json:"is_active"`
	Version     int     `json:"version"`
}

// ToSnapshot serializes the aggregate state for snapshotting
func (p *ProductAggregate) ToSnapshot() (json.RawMessage, error) {
	snapshot := ProductSnapshot{
		ID:          p.id,
		Name:        p.name,
		Description: p.description,
		Price:       p.price,
		StockLevel:  p.stockLevel,
		IsActive:    p.isActive,
		Version:     p.version,
	}
	
	return json.Marshal(snapshot)
}

// FromSnapshot deserializes a snapshot into the aggregate state
func (p *ProductAggregate) FromSnapshot(data json.RawMessage) error {
	var snapshot ProductSnapshot
	if err := json.Unmarshal(data, &snapshot); err != nil {
		return err
	}
	
	p.id = snapshot.ID
	p.name = snapshot.Name
	p.description = snapshot.Description
	p.price = snapshot.Price
	p.stockLevel = snapshot.StockLevel
	p.isActive = snapshot.IsActive
	p.version = snapshot.Version
	
	return nil
}

Event Versioning and Migration

As your system evolves, event schemas will change. Event versioning and migration strategies are essential for maintaining compatibility with historical events.

Event Versioning Strategies

There are several approaches to event versioning:

  1. Explicit Versioning: Include a version field in the event type or payload
  2. Upcasting: Transform old event versions to new versions during loading
  3. Event Adapter Pattern: Use adapters to handle different event versions

Let’s implement an upcasting approach:

package eventsourcing

import (
	"encoding/json"
	"fmt"
)

// EventUpcaster transforms events from older versions to newer versions
type EventUpcaster interface {
	// CanUpcast returns true if this upcaster can handle the given event
	CanUpcast(event Event) bool
	
	// Upcast transforms the event to the latest version
	Upcast(event Event) (Event, error)
}

// UpcasterChain chains multiple upcasters together
type UpcasterChain struct {
	upcasters []EventUpcaster
}

// NewUpcasterChain creates a new upcaster chain
func NewUpcasterChain(upcasters ...EventUpcaster) *UpcasterChain {
	return &UpcasterChain{upcasters: upcasters}
}

// AddUpcaster adds an upcaster to the chain
func (c *UpcasterChain) AddUpcaster(upcaster EventUpcaster) {
	c.upcasters = append(c.upcasters, upcaster)
}

// Upcast transforms an event through all applicable upcasters
func (c *UpcasterChain) Upcast(event Event) (Event, error) {
	result := event
	
	for _, upcaster := range c.upcasters {
		if upcaster.CanUpcast(result) {
			var err error
			result, err = upcaster.Upcast(result)
			if err != nil {
				return Event{}, fmt.Errorf("upcasting failed: %w", err)
			}
		}
	}
	
	return result, nil
}

// Example upcaster for ProductCreated events
type ProductCreatedV1ToV2Upcaster struct{}

func (u *ProductCreatedV1ToV2Upcaster) CanUpcast(event Event) bool {
	return event.EventType == "ProductCreated" && !hasField(event.Data, "category")
}

func (u *ProductCreatedV1ToV2Upcaster) Upcast(event Event) (Event, error) {
	// Parse the original data
	var v1Data map[string]interface{}
	if err := json.Unmarshal(event.Data, &v1Data); err != nil {
		return Event{}, err
	}
	
	// Add the missing field with a default value
	v1Data["category"] = "uncategorized"
	
	// Serialize back to JSON
	newData, err := json.Marshal(v1Data)
	if err != nil {
		return Event{}, err
	}
	
	// Create the updated event
	result := event
	result.Data = json.RawMessage(newData)
	
	return result, nil
}

// Helper function to check if a JSON object has a field
func hasField(data json.RawMessage, field string) bool {
	var obj map[string]interface{}
	if err := json.Unmarshal(data, &obj); err != nil {
		return false
	}
	_, exists := obj[field]
	return exists
}

Integrating Upcasters with Event Store

Now let’s update our event store to use upcasters:

package eventsourcing

import (
	"context"
)

// UpcasterEventStore wraps an EventStore and applies upcasting
type UpcasterEventStore struct {
	eventStore EventStore
	upcaster   *UpcasterChain
}

// NewUpcasterEventStore creates a new upcaster event store
func NewUpcasterEventStore(eventStore EventStore, upcaster *UpcasterChain) *UpcasterEventStore {
	return &UpcasterEventStore{
		eventStore: eventStore,
		upcaster:   upcaster,
	}
}

// SaveEvents persists new events to the store
func (s *UpcasterEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
	// Pass through to the underlying store
	return s.eventStore.SaveEvents(ctx, aggregateID, expectedVersion, events)
}

// GetEvents retrieves all events for a specific aggregate
func (s *UpcasterEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
	// Get events from the underlying store
	events, err := s.eventStore.GetEvents(ctx, aggregateID)
	if err != nil {
		return nil, err
	}
	
	// Upcast each event
	result := make([]Event, len(events))
	for i, event := range events {
		upcasted, err := s.upcaster.Upcast(event)
		if err != nil {
			return nil, err
		}
		result[i] = upcasted
	}
	
	return result, nil
}

// GetEventsByType retrieves events of a specific type
func (s *UpcasterEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
	// Get events from the underlying store
	events, err := s.eventStore.GetEventsByType(ctx, eventType)
	if err != nil {
		return nil, err
	}
	
	// Upcast each event
	result := make([]Event, len(events))
	for i, event := range events {
		upcasted, err := s.upcaster.Upcast(event)
		if err != nil {
			return nil, err
		}
		result[i] = upcasted
	}
	
	return result, nil
}

// GetAllEvents retrieves all events, optionally with pagination
func (s *UpcasterEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
	// Get events from the underlying store
	events, err := s.eventStore.GetAllEvents(ctx, offset, limit)
	if err != nil {
		return nil, err
	}
	
	// Upcast each event
	result := make([]Event, len(events))
	for i, event := range events {
		upcasted, err := s.upcaster.Upcast(event)
		if err != nil {
			return nil, err
		}
		result[i] = upcasted
	}
	
	return result, nil
}

// GetAggregateVersion returns the current version of an aggregate
func (s *UpcasterEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
	// Pass through to the underlying store
	return s.eventStore.GetAggregateVersion(ctx, aggregateID)
}

// GetEventsAfterVersion retrieves events after a specific version
func (s *UpcasterEventStore) GetEventsAfterVersion(ctx context.Context, aggregateID string, version int) ([]Event, error) {
	// Get events from the underlying store
	events, err := s.eventStore.GetEventsAfterVersion(ctx, aggregateID, version)
	if err != nil {
		return nil, err
	}
	
	// Upcast each event
	result := make([]Event, len(events))
	for i, event := range events {
		upcasted, err := s.upcaster.Upcast(event)
		if err != nil {
			return nil, err
		}
		result[i] = upcasted
	}
	
	return result, nil
}

Saga Pattern Implementation

In distributed systems, transactions that span multiple services present a significant challenge. The Saga pattern addresses this by breaking a distributed transaction into a sequence of local transactions, each with a compensating action to handle failures.

Saga Fundamentals

A saga is a sequence of local transactions where:

  1. Each local transaction updates a single service
  2. Each local transaction publishes an event to trigger the next transaction
  3. If a transaction fails, compensating transactions undo the changes

Let’s implement a saga framework for our e-commerce system:

package eventsourcing

import (
	"context"
	"errors"
	"fmt"
	"sync"
)

// SagaStep represents a single step in a saga
type SagaStep struct {
	// Action to perform for this step
	Action func(ctx context.Context, data interface{}) error
	
	// Compensation to run if the saga needs to be rolled back
	Compensation func(ctx context.Context, data interface{}) error
}

// Saga coordinates a sequence of steps that should be executed as a unit
type Saga struct {
	name  string
	steps []SagaStep
	data  interface{}
	mu    sync.Mutex
	
	// Track which steps have been executed
	executedSteps int
}

// NewSaga creates a new saga with the given name
func NewSaga(name string, data interface{}) *Saga {
	return &Saga{
		name:  name,
		data:  data,
		steps: []SagaStep{},
	}
}

// AddStep adds a step to the saga
func (s *Saga) AddStep(action, compensation func(ctx context.Context, data interface{}) error) {
	s.steps = append(s.steps, SagaStep{
		Action:       action,
		Compensation: compensation,
	})
}

// Execute runs the saga, executing each step in sequence
func (s *Saga) Execute(ctx context.Context) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	
	// Start from where we left off (useful for resuming sagas)
	for i := s.executedSteps; i < len(s.steps); i++ {
		step := s.steps[i]
		
		// Execute the step's action
		if err := step.Action(ctx, s.data); err != nil {
			// If the action fails, compensate
			return s.compensate(ctx, i)
		}
		
		// Mark this step as executed
		s.executedSteps = i + 1
	}
	
	return nil
}

// compensate runs the compensation functions for executed steps in reverse order
func (s *Saga) compensate(ctx context.Context, failedStepIndex int) error {
	var compensationErr error
	
	// Run compensations in reverse order
	for i := failedStepIndex - 1; i >= 0; i-- {
		step := s.steps[i]
		
		if step.Compensation != nil {
			if err := step.Compensation(ctx, s.data); err != nil {
				// Log compensation errors but continue with other compensations
				compensationErr = errors.Join(compensationErr,
					fmt.Errorf("compensation failed for step %d: %w", i, err))
			}
		}
	}
	
	return compensationErr
}

Order Processing Saga Example

Let’s implement a concrete saga for processing orders in our e-commerce system:

package ecommerce

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/yourdomain/eventsourcing"
)

// OrderData contains all the data needed for the order processing saga
type OrderData struct {
	OrderID       string
	CustomerID    string
	Items         []OrderItem
	TotalAmount   float64
	PaymentID     string
	ShipmentID    string
	InventoryLock string
}

type OrderItem struct {
	ProductID string
	Quantity  int
	UnitPrice float64
}

// OrderProcessingSaga coordinates the order processing workflow
type OrderProcessingSaga struct {
	saga          *eventsourcing.Saga
	eventStore    eventsourcing.EventStore
	commandBus    CommandBus
	orderData     *OrderData
}

// NewOrderProcessingSaga creates a new order processing saga
func NewOrderProcessingSaga(
	orderData *OrderData,
	eventStore eventsourcing.EventStore,
	commandBus CommandBus,
) *OrderProcessingSaga {
	s := &OrderProcessingSaga{
		orderData:  orderData,
		eventStore: eventStore,
		commandBus: commandBus,
	}
	
	// Create the underlying saga
	s.saga = eventsourcing.NewSaga("OrderProcessing", orderData)
	
	// Define the saga steps
	s.defineSagaSteps()
	
	return s
}

// defineSagaSteps sets up the steps for the order processing saga
func (s *OrderProcessingSaga) defineSagaSteps() {
	// Step 1: Reserve inventory
	s.saga.AddStep(
		// Action
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Create a command to reserve inventory
			cmd := ReserveInventoryCommand{
				OrderID: orderData.OrderID,
				Items:   orderData.Items,
			}
			
			// Send the command
			result, err := s.commandBus.Send(ctx, "inventory", "ReserveInventory", cmd)
			if err != nil {
				return fmt.Errorf("failed to reserve inventory: %w", err)
			}
			
			// Store the inventory lock ID
			orderData.InventoryLock = result.(string)
			return nil
		},
		// Compensation
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Only compensate if we actually got a lock
			if orderData.InventoryLock == "" {
				return nil
			}
			
			// Create a command to release inventory
			cmd := ReleaseInventoryCommand{
				OrderID: orderData.OrderID,
				LockID:  orderData.InventoryLock,
			}
			
			// Send the command
			_, err := s.commandBus.Send(ctx, "inventory", "ReleaseInventory", cmd)
			if err != nil {
				return fmt.Errorf("failed to release inventory: %w", err)
			}
			
			return nil
		},
	)
	
	// Step 2: Process payment
	s.saga.AddStep(
		// Action
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Create a command to process payment
			cmd := ProcessPaymentCommand{
				OrderID:     orderData.OrderID,
				CustomerID:  orderData.CustomerID,
				Amount:      orderData.TotalAmount,
			}
			
			// Send the command
			result, err := s.commandBus.Send(ctx, "payment", "ProcessPayment", cmd)
			if err != nil {
				return fmt.Errorf("failed to process payment: %w", err)
			}
			
			// Store the payment ID
			orderData.PaymentID = result.(string)
			return nil
		},
		// Compensation
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Only compensate if we actually processed a payment
			if orderData.PaymentID == "" {
				return nil
			}
			
			// Create a command to refund payment
			cmd := RefundPaymentCommand{
				OrderID:   orderData.OrderID,
				PaymentID: orderData.PaymentID,
			}
			
			// Send the command
			_, err := s.commandBus.Send(ctx, "payment", "RefundPayment", cmd)
			if err != nil {
				return fmt.Errorf("failed to refund payment: %w", err)
			}
			
			return nil
		},
	)
	
	// Step 3: Create shipment
	s.saga.AddStep(
		// Action
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Create a command to create shipment
			cmd := CreateShipmentCommand{
				OrderID:    orderData.OrderID,
				CustomerID: orderData.CustomerID,
				Items:      orderData.Items,
			}
			
			// Send the command
			result, err := s.commandBus.Send(ctx, "shipping", "CreateShipment", cmd)
			if err != nil {
				return fmt.Errorf("failed to create shipment: %w", err)
			}
			
			// Store the shipment ID
			orderData.ShipmentID = result.(string)
			return nil
		},
		// Compensation
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Only compensate if we actually created a shipment
			if orderData.ShipmentID == "" {
				return nil
			}
			
			// Create a command to cancel shipment
			cmd := CancelShipmentCommand{
				OrderID:    orderData.OrderID,
				ShipmentID: orderData.ShipmentID,
			}
			
			// Send the command
			_, err := s.commandBus.Send(ctx, "shipping", "CancelShipment", cmd)
			if err != nil {
				return fmt.Errorf("failed to cancel shipment: %w", err)
			}
			
			return nil
		},
	)
	
	// Step 4: Complete order
	s.saga.AddStep(
		// Action
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Create a command to complete the order
			cmd := CompleteOrderCommand{
				OrderID:    orderData.OrderID,
				PaymentID:  orderData.PaymentID,
				ShipmentID: orderData.ShipmentID,
			}
			
			// Send the command
			_, err := s.commandBus.Send(ctx, "order", "CompleteOrder", cmd)
			if err != nil {
				return fmt.Errorf("failed to complete order: %w", err)
			}
			
			return nil
		},
		// Compensation
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Create a command to cancel the order
			cmd := CancelOrderCommand{
				OrderID: orderData.OrderID,
				Reason:  "Saga rollback",
			}
			
			// Send the command
			_, err := s.commandBus.Send(ctx, "order", "CancelOrder", cmd)
			if err != nil {
				return fmt.Errorf("failed to cancel order: %w", err)
			}
			
			return nil
		},
	)
}

// Execute runs the order processing saga
func (s *OrderProcessingSaga) Execute(ctx context.Context) error {
	return s.saga.Execute(ctx)
}

// CommandBus is a simple interface for sending commands to different services
type CommandBus interface {
	Send(ctx context.Context, service, commandType string, command interface{}) (interface{}, error)
}

Production Deployment and Monitoring

Deploying event-sourced systems to production requires careful consideration of performance, scalability, and observability. Let’s explore key aspects of running event sourcing in production.

Event Store Scaling

Event stores must scale to handle high throughput and growing event history. Consider these strategies:

  1. Partitioning: Divide events by aggregate type or ID to distribute load
  2. Read Replicas: Use separate read-only replicas for projections
  3. Event Pruning: Archive old events that are no longer needed for active aggregates

Here’s an example of a partitioned event store:

package eventsourcing

import (
	"context"
	"fmt"
	"hash/fnv"
)

// PartitionedEventStore distributes events across multiple event stores
type PartitionedEventStore struct {
	partitions []EventStore
	numPartitions int
}

// NewPartitionedEventStore creates a new partitioned event store
func NewPartitionedEventStore(partitions []EventStore) *PartitionedEventStore {
	return &PartitionedEventStore{
		partitions:    partitions,
		numPartitions: len(partitions),
	}
}

// getPartition determines which partition to use for a given aggregate ID
func (s *PartitionedEventStore) getPartition(aggregateID string) (EventStore, error) {
	if s.numPartitions == 0 {
		return nil, errors.New("no partitions available")
	}
	
	// Hash the aggregate ID to determine the partition
	h := fnv.New32a()
	h.Write([]byte(aggregateID))
	partitionIndex := int(h.Sum32()) % s.numPartitions
	
	return s.partitions[partitionIndex], nil
}

// SaveEvents persists new events to the appropriate partition
func (s *PartitionedEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
	partition, err := s.getPartition(aggregateID)
	if err != nil {
		return err
	}
	
	return partition.SaveEvents(ctx, aggregateID, expectedVersion, events)
}

// GetEvents retrieves all events for a specific aggregate from the appropriate partition
func (s *PartitionedEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
	partition, err := s.getPartition(aggregateID)
	if err != nil {
		return nil, err
	}
	
	return partition.GetEvents(ctx, aggregateID)
}

// GetEventsByType retrieves events of a specific type from all partitions
func (s *PartitionedEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
	var allEvents []Event
	
	// Query each partition
	for _, partition := range s.partitions {
		events, err := partition.GetEventsByType(ctx, eventType)
		if err != nil {
			return nil, err
		}
		
		allEvents = append(allEvents, events...)
	}
	
	// Sort events by timestamp
	sort.Slice(allEvents, func(i, j int) bool {
		return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
	})
	
	return allEvents, nil
}

// GetAllEvents retrieves all events from all partitions
func (s *PartitionedEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
	// This is inefficient for large event stores
	// In a real implementation, you would need a more sophisticated approach
	var allEvents []Event
	
	// Query each partition
	for _, partition := range s.partitions {
		events, err := partition.GetAllEvents(ctx, 0, offset+limit)
		if err != nil {
			return nil, err
		}
		
		allEvents = append(allEvents, events...)
	}
	
	// Sort events by timestamp
	sort.Slice(allEvents, func(i, j int) bool {
		return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
	})
	
	// Apply pagination
	if offset >= len(allEvents) {
		return []Event{}, nil
	}
	
	end := offset + limit
	if end > len(allEvents) {
		end = len(allEvents)
	}
	
	return allEvents[offset:end], nil
}

// GetAggregateVersion returns the current version of an aggregate
func (s *PartitionedEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
	partition, err := s.getPartition(aggregateID)
	if err != nil {
		return 0, err
	}
	
	return partition.GetAggregateVersion(ctx, aggregateID)
}

Monitoring and Metrics

Effective monitoring is crucial for event-sourced systems. Here’s a metrics collector for event stores:

package eventsourcing

import (
	"context"
	"time"

	"github.com/prometheus/client_golang/prometheus"
)

// MetricsEventStore wraps an EventStore and collects metrics
type MetricsEventStore struct {
	eventStore EventStore
	
	// Metrics
	eventSaveCount    *prometheus.CounterVec
	eventSaveDuration *prometheus.HistogramVec
	eventReadCount    *prometheus.CounterVec
	eventReadDuration *prometheus.HistogramVec
}

// NewMetricsEventStore creates a new metrics-collecting event store
func NewMetricsEventStore(eventStore EventStore, registry *prometheus.Registry) *MetricsEventStore {
	store := &MetricsEventStore{
		eventStore: eventStore,
		
		eventSaveCount: prometheus.NewCounterVec(
			prometheus.CounterOpts{
				Name: "event_store_save_total",
				Help: "Total number of events saved",
			},
			[]string{"aggregate_type"},
		),
		
		eventSaveDuration: prometheus.NewHistogramVec(
			prometheus.HistogramOpts{
				Name:    "event_store_save_duration_seconds",
				Help:    "Duration of event save operations",
				Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
			},
			[]string{"aggregate_type"},
		),
		
		eventReadCount: prometheus.NewCounterVec(
			prometheus.CounterOpts{
				Name: "event_store_read_total",
				Help: "Total number of event read operations",
			},
			[]string{"operation"},
		),
		
		eventReadDuration: prometheus.NewHistogramVec(
			prometheus.HistogramOpts{
				Name:    "event_store_read_duration_seconds",
				Help:    "Duration of event read operations",
				Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
			},
			[]string{"operation"},
		),
	}
	
	// Register metrics with Prometheus
	registry.MustRegister(
		store.eventSaveCount,
		store.eventSaveDuration,
		store.eventReadCount,
		store.eventReadDuration,
	)
	
	return store
}

// SaveEvents persists new events to the store
func (s *MetricsEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
	if len(events) == 0 {
		return nil
	}
	
	// Get the aggregate type from the first event
	aggregateType := events[0].AggregateType
	
	// Record metrics
	timer := prometheus.NewTimer(s.eventSaveDuration.WithLabelValues(aggregateType))
	defer timer.ObserveDuration()
	
	err := s.eventStore.SaveEvents(ctx, aggregateID, expectedVersion, events)
	
	if err == nil {
		s.eventSaveCount.WithLabelValues(aggregateType).Add(float64(len(events)))
	}
	
	return err
}

// GetEvents retrieves all events for a specific aggregate
func (s *MetricsEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
	timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetEvents"))
	defer timer.ObserveDuration()
	
	events, err := s.eventStore.GetEvents(ctx, aggregateID)
	
	if err == nil {
		s.eventReadCount.WithLabelValues("GetEvents").Inc()
	}
	
	return events, err
}

// GetEventsByType retrieves events of a specific type
func (s *MetricsEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
	timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetEventsByType"))
	defer timer.ObserveDuration()
	
	events, err := s.eventStore.GetEventsByType(ctx, eventType)
	
	if err == nil {
		s.eventReadCount.WithLabelValues("GetEventsByType").Inc()
	}
	
	return events, err
}

// GetAllEvents retrieves all events, optionally with pagination
func (s *MetricsEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
	timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetAllEvents"))
	defer timer.ObserveDuration()
	
	events, err := s.eventStore.GetAllEvents(ctx, offset, limit)
	
	if err == nil {
		s.eventReadCount.WithLabelValues("GetAllEvents").Inc()
	}
	
	return events, err
}

// GetAggregateVersion returns the current version of an aggregate
func (s *MetricsEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
	timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetAggregateVersion"))
	defer timer.ObserveDuration()
	
	version, err := s.eventStore.GetAggregateVersion(ctx, aggregateID)
	
	if err == nil {
		s.eventReadCount.WithLabelValues("GetAggregateVersion").Inc()
	}
	
	return version, err
}

Deployment Architecture

A robust event-sourced system typically includes these components:

  1. Command API: Handles incoming commands and validates them
  2. Event Store: Persists events durably
  3. Event Bus: Distributes events to subscribers
  4. Projection Workers: Build and maintain read models
  5. Query API: Serves read models to clients

Here’s a diagram of a typical deployment architecture:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│             │     │             │     │             │
│  Command    │────▶│   Event     │────▶│   Event     │
│    API      │     │   Store     │     │    Bus      │
│             │     │             │     │             │
└─────────────┘     └─────────────┘     └──────┬──────┘
                                               │
                                               ▼
                    ┌─────────────┐     ┌─────────────┐
                    │             │     │             │
                    │   Query     │◀────│ Projection  │
                    │    API      │     │  Workers    │
                    │             │     │             │
                    └─────────────┘     └─────────────┘

Performance Considerations

To ensure optimal performance in production:

  1. Batch Processing: Process events in batches for efficiency
  2. Caching: Cache aggregates and projections to reduce database load
  3. Asynchronous Projections: Update read models asynchronously
  4. Event Versioning: Handle schema evolution gracefully
  5. Monitoring: Track event store metrics and projection lag

Here’s an example of a batch event processor:

package eventsourcing

import (
	"context"
	"time"
)

// BatchEventProcessor processes events in batches
type BatchEventProcessor struct {
	eventStore      EventStore
	eventHandler    EventHandler
	batchSize       int
	processingDelay time.Duration
	lastEventID     string
}

// NewBatchEventProcessor creates a new batch event processor
func NewBatchEventProcessor(
	eventStore EventStore,
	eventHandler EventHandler,
	batchSize int,
	processingDelay time.Duration,
) *BatchEventProcessor {
	if batchSize <= 0 {
		batchSize = 100
	}
	
	if processingDelay <= 0 {
		processingDelay = 100 * time.Millisecond
	}
	
	return &BatchEventProcessor{
		eventStore:      eventStore,
		eventHandler:    eventHandler,
		batchSize:       batchSize,
		processingDelay: processingDelay,
	}
}

// Start begins processing events in batches
func (p *BatchEventProcessor) Start(ctx context.Context) error {
	ticker := time.NewTicker(p.processingDelay)
	defer ticker.Stop()
	
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-ticker.C:
			if err := p.processBatch(ctx); err != nil {
				// Log the error but continue processing
				fmt.Printf("Error processing batch: %v\n", err)
			}
		}
	}
}

// processBatch processes a single batch of events
func (p *BatchEventProcessor) processBatch(ctx context.Context) error {
	// Get a batch of events after the last processed event
	events, err := p.eventStore.GetEventsAfterID(ctx, p.lastEventID, p.batchSize)
	if err != nil {
		return err
	}
	
	// If no events, nothing to do
	if len(events) == 0 {
		return nil
	}
	
	// Process each event
	for _, event := range events {
		if err := p.eventHandler.HandleEvent(ctx, event); err != nil {
			return err
		}
		
		// Update the last processed event ID
		p.lastEventID = event.ID
	}
	
	return nil
}

Disaster Recovery

Event sourcing provides excellent disaster recovery capabilities:

  1. Event Store Backups: Regular backups of the event store
  2. Projection Rebuilding: Ability to rebuild projections from events
  3. Event Replay: Replay events to recover from data corruption
  4. Audit Trail: Complete history for forensic analysis

Implement a disaster recovery plan that includes:

  1. Regular backups of the event store
  2. Automated projection rebuilding
  3. Testing of recovery procedures
  4. Documentation of recovery processes