Event Store Design and Implementation
The event store is the heart of any event sourcing system. It’s responsible for persisting events and allowing them to be retrieved by aggregate ID or for building projections.
Event Store Interface
Let’s define a clean interface for our event store:
package eventsourcing
import (
"context"
"errors"
)
var (
ErrConcurrencyConflict = errors.New("concurrency conflict: aggregate has been modified")
ErrAggregateNotFound = errors.New("aggregate not found")
ErrEventNotFound = errors.New("event not found")
)
// EventStore defines the interface for storing and retrieving events
type EventStore interface {
// SaveEvents persists new events to the store
SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error
// GetEvents retrieves all events for a specific aggregate
GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
// GetEventsByType retrieves events of a specific type
GetEventsByType(ctx context.Context, eventType string) ([]Event, error)
// GetAllEvents retrieves all events, optionally with pagination
GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error)
// GetAggregateVersion returns the current version of an aggregate
GetAggregateVersion(ctx context.Context, aggregateID string) (int, error)
}
In-Memory Event Store Implementation
For development and testing, an in-memory event store is useful:
package eventsourcing
import (
"context"
"sort"
"sync"
)
// InMemoryEventStore is a simple in-memory implementation of EventStore
type InMemoryEventStore struct {
mu sync.RWMutex
events map[string][]Event // Map of aggregateID to events
}
// NewInMemoryEventStore creates a new in-memory event store
func NewInMemoryEventStore() *InMemoryEventStore {
return &InMemoryEventStore{
events: make(map[string][]Event),
}
}
// SaveEvents persists new events to the store
func (s *InMemoryEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
if len(events) == 0 {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
// Check if aggregate exists
existingEvents, exists := s.events[aggregateID]
// If aggregate exists, verify version
if exists {
currentVersion := len(existingEvents)
if currentVersion != expectedVersion {
return ErrConcurrencyConflict
}
} else if expectedVersion > 0 {
// If aggregate doesn't exist but we expected a version > 0
return ErrAggregateNotFound
}
// Append new events
s.events[aggregateID] = append(existingEvents, events...)
return nil
}
// GetEvents retrieves all events for a specific aggregate
func (s *InMemoryEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
s.mu.RLock()
defer s.mu.RUnlock()
events, exists := s.events[aggregateID]
if !exists {
return nil, ErrAggregateNotFound
}
// Return a copy to prevent modification of internal state
result := make([]Event, len(events))
copy(result, events)
return result, nil
}
// GetEventsByType retrieves events of a specific type
func (s *InMemoryEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var result []Event
for _, aggregateEvents := range s.events {
for _, event := range aggregateEvents {
if event.EventType == eventType {
result = append(result, event)
}
}
}
// Sort by timestamp for consistent ordering
sort.Slice(result, func(i, j int) bool {
return result[i].Timestamp.Before(result[j].Timestamp)
})
return result, nil
}
// GetAllEvents retrieves all events, optionally with pagination
func (s *InMemoryEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var allEvents []Event
// Collect all events
for _, aggregateEvents := range s.events {
allEvents = append(allEvents, aggregateEvents...)
}
// Sort by timestamp
sort.Slice(allEvents, func(i, j int) bool {
return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
})
// Apply pagination
if offset >= len(allEvents) {
return []Event{}, nil
}
end := offset + limit
if end > len(allEvents) {
end = len(allEvents)
}
return allEvents[offset:end], nil
}
// GetAggregateVersion returns the current version of an aggregate
func (s *InMemoryEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
s.mu.RLock()
defer s.mu.RUnlock()
events, exists := s.events[aggregateID]
if !exists {
return 0, nil // New aggregates start at version 0
}
return len(events), nil
}
PostgreSQL Event Store Implementation
For production use, we need a persistent event store. Here’s an implementation using PostgreSQL:
package eventsourcing
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
_ "github.com/lib/pq"
)
// PostgresEventStore implements EventStore using PostgreSQL
type PostgresEventStore struct {
db *sql.DB
}
// NewPostgresEventStore creates a new PostgreSQL-backed event store
func NewPostgresEventStore(connectionString string) (*PostgresEventStore, error) {
db, err := sql.Open("postgres", connectionString)
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
// Test connection
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("failed to ping database: %w", err)
}
return &PostgresEventStore{db: db}, nil
}
// Initialize creates the necessary tables if they don't exist
func (s *PostgresEventStore) Initialize(ctx context.Context) error {
// Create events table
_, err := s.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS events (
id UUID PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
version INT NOT NULL,
timestamp TIMESTAMP NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
-- Create an index on aggregate_id and version for optimistic concurrency control
UNIQUE (aggregate_id, version)
);
-- Create indexes for common query patterns
CREATE INDEX IF NOT EXISTS idx_events_aggregate_id ON events(aggregate_id);
CREATE INDEX IF NOT EXISTS idx_events_event_type ON events(event_type);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
`)
if err != nil {
return fmt.Errorf("failed to create events table: %w", err)
}
return nil
}
// SaveEvents persists new events to the store
func (s *PostgresEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
if len(events) == 0 {
return nil
}
// Start a transaction
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback() // Will be ignored if transaction is committed
// Check current version
var currentVersion int
err = tx.QueryRowContext(ctx,
"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1",
aggregateID).Scan(¤tVersion)
if err != nil {
return fmt.Errorf("failed to get current version: %w", err)
}
if currentVersion != expectedVersion {
return ErrConcurrencyConflict
}
// Insert all events
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO events (
id, aggregate_id, aggregate_type, event_type,
version, timestamp, data, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()
for i, event := range events {
// Ensure event has a valid ID
if event.ID == "" {
event.ID = uuid.New().String()
}
// Set version based on expected version
event.Version = expectedVersion + i + 1
// Set timestamp if not already set
if event.Timestamp.IsZero() {
event.Timestamp = time.Now().UTC()
}
_, err = stmt.ExecContext(ctx,
event.ID, event.AggregateID, event.AggregateType, event.EventType,
event.Version, event.Timestamp, event.Data, event.Metadata)
if err != nil {
return fmt.Errorf("failed to insert event: %w", err)
}
}
// Commit the transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// GetEvents retrieves all events for a specific aggregate
func (s *PostgresEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
FROM events
WHERE aggregate_id = $1
ORDER BY version ASC
`, aggregateID)
if err != nil {
return nil, fmt.Errorf("failed to query events: %w", err)
}
defer rows.Close()
return s.scanEvents(rows)
}
// GetEventsByType retrieves events of a specific type
func (s *PostgresEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
FROM events
WHERE event_type = $1
ORDER BY timestamp ASC
`, eventType)
if err != nil {
return nil, fmt.Errorf("failed to query events by type: %w", err)
}
defer rows.Close()
return s.scanEvents(rows)
}
// GetAllEvents retrieves all events, optionally with pagination
func (s *PostgresEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
FROM events
ORDER BY timestamp ASC
LIMIT $1 OFFSET $2
`, limit, offset)
if err != nil {
return nil, fmt.Errorf("failed to query all events: %w", err)
}
defer rows.Close()
return s.scanEvents(rows)
}
// GetAggregateVersion returns the current version of an aggregate
func (s *PostgresEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
var version int
err := s.db.QueryRowContext(ctx,
"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1",
aggregateID).Scan(&version)
if err != nil {
return 0, fmt.Errorf("failed to get aggregate version: %w", err)
}
return version, nil
}
// scanEvents is a helper function to scan rows into Event structs
func (s *PostgresEventStore) scanEvents(rows *sql.Rows) ([]Event, error) {
var events []Event
for rows.Next() {
var event Event
var data, metadata []byte
err := rows.Scan(
&event.ID, &event.AggregateID, &event.AggregateType, &event.EventType,
&event.Version, &event.Timestamp, &data, &metadata)
if err != nil {
return nil, fmt.Errorf("failed to scan event row: %w", err)
}
event.Data = json.RawMessage(data)
event.Metadata = json.RawMessage(metadata)
events = append(events, event)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating event rows: %w", err)
}
return events, nil
}
// Close closes the database connection
func (s *PostgresEventStore) Close() error {
return s.db.Close()
}