Event Store Design and Implementation

The event store is the heart of any event sourcing system. It’s responsible for persisting events and allowing them to be retrieved by aggregate ID or for building projections.

Event Store Interface

Let’s define a clean interface for our event store:

package eventsourcing

import (
	"context"
	"errors"
)

var (
	ErrConcurrencyConflict = errors.New("concurrency conflict: aggregate has been modified")
	ErrAggregateNotFound   = errors.New("aggregate not found")
	ErrEventNotFound       = errors.New("event not found")
)

// EventStore defines the interface for storing and retrieving events
type EventStore interface {
	// SaveEvents persists new events to the store
	SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error
	
	// GetEvents retrieves all events for a specific aggregate
	GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
	
	// GetEventsByType retrieves events of a specific type
	GetEventsByType(ctx context.Context, eventType string) ([]Event, error)
	
	// GetAllEvents retrieves all events, optionally with pagination
	GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error)
	
	// GetAggregateVersion returns the current version of an aggregate
	GetAggregateVersion(ctx context.Context, aggregateID string) (int, error)
}

In-Memory Event Store Implementation

For development and testing, an in-memory event store is useful:

package eventsourcing

import (
	"context"
	"sort"
	"sync"
)

// InMemoryEventStore is a simple in-memory implementation of EventStore
type InMemoryEventStore struct {
	mu     sync.RWMutex
	events map[string][]Event // Map of aggregateID to events
}

// NewInMemoryEventStore creates a new in-memory event store
func NewInMemoryEventStore() *InMemoryEventStore {
	return &InMemoryEventStore{
		events: make(map[string][]Event),
	}
}

// SaveEvents persists new events to the store
func (s *InMemoryEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
	if len(events) == 0 {
		return nil
	}
	
	s.mu.Lock()
	defer s.mu.Unlock()
	
	// Check if aggregate exists
	existingEvents, exists := s.events[aggregateID]
	
	// If aggregate exists, verify version
	if exists {
		currentVersion := len(existingEvents)
		if currentVersion != expectedVersion {
			return ErrConcurrencyConflict
		}
	} else if expectedVersion > 0 {
		// If aggregate doesn't exist but we expected a version > 0
		return ErrAggregateNotFound
	}
	
	// Append new events
	s.events[aggregateID] = append(existingEvents, events...)
	
	return nil
}

// GetEvents retrieves all events for a specific aggregate
func (s *InMemoryEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	
	events, exists := s.events[aggregateID]
	if !exists {
		return nil, ErrAggregateNotFound
	}
	
	// Return a copy to prevent modification of internal state
	result := make([]Event, len(events))
	copy(result, events)
	
	return result, nil
}

// GetEventsByType retrieves events of a specific type
func (s *InMemoryEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	
	var result []Event
	
	for _, aggregateEvents := range s.events {
		for _, event := range aggregateEvents {
			if event.EventType == eventType {
				result = append(result, event)
			}
		}
	}
	
	// Sort by timestamp for consistent ordering
	sort.Slice(result, func(i, j int) bool {
		return result[i].Timestamp.Before(result[j].Timestamp)
	})
	
	return result, nil
}

// GetAllEvents retrieves all events, optionally with pagination
func (s *InMemoryEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	
	var allEvents []Event
	
	// Collect all events
	for _, aggregateEvents := range s.events {
		allEvents = append(allEvents, aggregateEvents...)
	}
	
	// Sort by timestamp
	sort.Slice(allEvents, func(i, j int) bool {
		return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
	})
	
	// Apply pagination
	if offset >= len(allEvents) {
		return []Event{}, nil
	}
	
	end := offset + limit
	if end > len(allEvents) {
		end = len(allEvents)
	}
	
	return allEvents[offset:end], nil
}

// GetAggregateVersion returns the current version of an aggregate
func (s *InMemoryEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	
	events, exists := s.events[aggregateID]
	if !exists {
		return 0, nil // New aggregates start at version 0
	}
	
	return len(events), nil
}

PostgreSQL Event Store Implementation

For production use, we need a persistent event store. Here’s an implementation using PostgreSQL:

package eventsourcing

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"time"

	"github.com/google/uuid"
	_ "github.com/lib/pq"
)

// PostgresEventStore implements EventStore using PostgreSQL
type PostgresEventStore struct {
	db *sql.DB
}

// NewPostgresEventStore creates a new PostgreSQL-backed event store
func NewPostgresEventStore(connectionString string) (*PostgresEventStore, error) {
	db, err := sql.Open("postgres", connectionString)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to database: %w", err)
	}
	
	// Test connection
	if err := db.Ping(); err != nil {
		return nil, fmt.Errorf("failed to ping database: %w", err)
	}
	
	return &PostgresEventStore{db: db}, nil
}

// Initialize creates the necessary tables if they don't exist
func (s *PostgresEventStore) Initialize(ctx context.Context) error {
	// Create events table
	_, err := s.db.ExecContext(ctx, `
		CREATE TABLE IF NOT EXISTS events (
			id UUID PRIMARY KEY,
			aggregate_id VARCHAR(255) NOT NULL,
			aggregate_type VARCHAR(255) NOT NULL,
			event_type VARCHAR(255) NOT NULL,
			version INT NOT NULL,
			timestamp TIMESTAMP NOT NULL,
			data JSONB NOT NULL,
			metadata JSONB,
			
			-- Create an index on aggregate_id and version for optimistic concurrency control
			UNIQUE (aggregate_id, version)
		);
		
		-- Create indexes for common query patterns
		CREATE INDEX IF NOT EXISTS idx_events_aggregate_id ON events(aggregate_id);
		CREATE INDEX IF NOT EXISTS idx_events_event_type ON events(event_type);
		CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
	`)
	
	if err != nil {
		return fmt.Errorf("failed to create events table: %w", err)
	}
	
	return nil
}

// SaveEvents persists new events to the store
func (s *PostgresEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
	if len(events) == 0 {
		return nil
	}
	
	// Start a transaction
	tx, err := s.db.BeginTx(ctx, nil)
	if err != nil {
		return fmt.Errorf("failed to begin transaction: %w", err)
	}
	defer tx.Rollback() // Will be ignored if transaction is committed
	
	// Check current version
	var currentVersion int
	err = tx.QueryRowContext(ctx, 
		"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1", 
		aggregateID).Scan(&currentVersion)
	
	if err != nil {
		return fmt.Errorf("failed to get current version: %w", err)
	}
	
	if currentVersion != expectedVersion {
		return ErrConcurrencyConflict
	}
	
	// Insert all events
	stmt, err := tx.PrepareContext(ctx, `
		INSERT INTO events (
			id, aggregate_id, aggregate_type, event_type, 
			version, timestamp, data, metadata
		) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
	`)
	if err != nil {
		return fmt.Errorf("failed to prepare statement: %w", err)
	}
	defer stmt.Close()
	
	for i, event := range events {
		// Ensure event has a valid ID
		if event.ID == "" {
			event.ID = uuid.New().String()
		}
		
		// Set version based on expected version
		event.Version = expectedVersion + i + 1
		
		// Set timestamp if not already set
		if event.Timestamp.IsZero() {
			event.Timestamp = time.Now().UTC()
		}
		
		_, err = stmt.ExecContext(ctx,
			event.ID, event.AggregateID, event.AggregateType, event.EventType,
			event.Version, event.Timestamp, event.Data, event.Metadata)
		
		if err != nil {
			return fmt.Errorf("failed to insert event: %w", err)
		}
	}
	
	// Commit the transaction
	if err := tx.Commit(); err != nil {
		return fmt.Errorf("failed to commit transaction: %w", err)
	}
	
	return nil
}

// GetEvents retrieves all events for a specific aggregate
func (s *PostgresEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
	rows, err := s.db.QueryContext(ctx, `
		SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
		FROM events
		WHERE aggregate_id = $1
		ORDER BY version ASC
	`, aggregateID)
	
	if err != nil {
		return nil, fmt.Errorf("failed to query events: %w", err)
	}
	defer rows.Close()
	
	return s.scanEvents(rows)
}

// GetEventsByType retrieves events of a specific type
func (s *PostgresEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
	rows, err := s.db.QueryContext(ctx, `
		SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
		FROM events
		WHERE event_type = $1
		ORDER BY timestamp ASC
	`, eventType)
	
	if err != nil {
		return nil, fmt.Errorf("failed to query events by type: %w", err)
	}
	defer rows.Close()
	
	return s.scanEvents(rows)
}

// GetAllEvents retrieves all events, optionally with pagination
func (s *PostgresEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
	rows, err := s.db.QueryContext(ctx, `
		SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
		FROM events
		ORDER BY timestamp ASC
		LIMIT $1 OFFSET $2
	`, limit, offset)
	
	if err != nil {
		return nil, fmt.Errorf("failed to query all events: %w", err)
	}
	defer rows.Close()
	
	return s.scanEvents(rows)
}

// GetAggregateVersion returns the current version of an aggregate
func (s *PostgresEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
	var version int
	err := s.db.QueryRowContext(ctx, 
		"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1", 
		aggregateID).Scan(&version)
	
	if err != nil {
		return 0, fmt.Errorf("failed to get aggregate version: %w", err)
	}
	
	return version, nil
}

// scanEvents is a helper function to scan rows into Event structs
func (s *PostgresEventStore) scanEvents(rows *sql.Rows) ([]Event, error) {
	var events []Event
	
	for rows.Next() {
		var event Event
		var data, metadata []byte
		
		err := rows.Scan(
			&event.ID, &event.AggregateID, &event.AggregateType, &event.EventType,
			&event.Version, &event.Timestamp, &data, &metadata)
		
		if err != nil {
			return nil, fmt.Errorf("failed to scan event row: %w", err)
		}
		
		event.Data = json.RawMessage(data)
		event.Metadata = json.RawMessage(metadata)
		
		events = append(events, event)
	}
	
	if err := rows.Err(); err != nil {
		return nil, fmt.Errorf("error iterating event rows: %w", err)
	}
	
	return events, nil
}

// Close closes the database connection
func (s *PostgresEventStore) Close() error {
	return s.db.Close()
}