Event Sourcing Implementation in Go
Build event-sourced systems in Go with CQRS patterns.
#Most applications store only current state - when something changes, the old data is lost. Event sourcing takes a different approach: store every change as an immutable event, preserving the complete history.
The Problem with State-Based Storage
Traditional applications work like this:
// Traditional approach - only current state
type Account struct {
ID string
Balance int
Status string
}
// When balance changes, old value is lost
func (a *Account) Withdraw(amount int) {
a.Balance -= amount // History is gone
}
Problems with this approach:
- Lost History: Can’t see how the account reached its current state
- No Audit Trail: Compliance and debugging become difficult
- Limited Queries: Can’t ask “what was the balance yesterday?”
- Concurrency Issues: Multiple updates can conflict
Event Sourcing Solution
Instead of storing state, store the events that create state:
// Event sourcing approach - store events
type Event struct {
ID string
Type string
Data interface{}
Timestamp time.Time
}
type AccountCreated struct {
AccountID string
InitialBalance int
}
type MoneyWithdrawn struct {
AccountID string
Amount int
}
Benefits:
- Complete History: Every change is preserved
- Audit Trail: Natural compliance and debugging
- Time Travel: Reconstruct state at any point in time
- Flexibility: New views of data without migration
When to Use Event Sourcing
Event sourcing works well when you need:
- Audit Requirements: Financial, healthcare, or legal systems
- Complex Business Logic: Domain events matter to the business
- Temporal Queries: “Show me last month’s state”
- High Write Throughput: Events are append-only
Don’t use event sourcing for:
- Simple CRUD applications
- Systems where current state is all that matters
- Teams unfamiliar with the complexity
This guide shows you how to implement event sourcing in Go, from basic concepts to production systems.
Event Sourcing Fundamentals
Before diving into implementation details, let’s establish a solid understanding of event sourcing concepts and how they differ from traditional data storage approaches.
Core Concepts
Event sourcing revolves around several key concepts:
-
Events: Immutable records of something that happened in the system. Events are always expressed in the past tense (e.g.,
OrderPlaced
,PaymentProcessed
). -
Aggregate: A cluster of domain objects that can be treated as a single unit. An aggregate has a root entity (the aggregate root) that enforces invariants for all objects within the aggregate.
-
Event Store: A specialized database that stores events as the source of truth, rather than the current state.
-
Projections: Read-optimized views built by consuming events and updating query-friendly data structures.
-
Commands: Requests for the system to perform an action, which may generate events if successful.
Let’s start by defining the basic structures for our event sourcing system:
package eventsourcing
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
)
// Event represents something that happened in the system
type Event struct {
ID string // Unique identifier for the event
AggregateID string // ID of the aggregate this event belongs to
AggregateType string // Type of the aggregate
EventType string // Type of the event
Version int // Version of the aggregate after this event
Timestamp time.Time // When the event occurred
Data json.RawMessage // Event payload data
Metadata json.RawMessage // Additional metadata about the event
}
// Command represents a request to change the system state
type Command struct {
ID string // Unique identifier for the command
AggregateID string // ID of the target aggregate
AggregateType string // Type of the target aggregate
CommandType string // Type of the command
Data json.RawMessage // Command payload data
Metadata json.RawMessage // Additional metadata about the command
}
// Aggregate is the base interface for all aggregates
type Aggregate interface {
ID() string
Type() string
Version() int
ApplyEvent(event Event) error
ApplyEvents(events []Event) error
}
// EventHandler processes events after they are stored
type EventHandler interface {
HandleEvent(ctx context.Context, event Event) error
}
// CommandHandler processes commands and produces events
type CommandHandler interface {
HandleCommand(ctx context.Context, command Command) ([]Event, error)
}
Event Sourcing vs. Traditional State Storage
To understand why event sourcing is valuable, let’s compare it with traditional state storage:
Aspect | Traditional Storage | Event Sourcing |
---|---|---|
Data Model | Current state only | Complete history of changes |
Updates | Destructive (overwrites previous state) | Non-destructive (appends new events) |
History | Limited or requires additional logging | Built-in, comprehensive |
Audit | Requires additional implementation | Inherent to the model |
Temporal Queries | Difficult or impossible | Natural and straightforward |
Debugging | Limited historical context | Complete historical context |
Complexity | Generally simpler | More complex initially |
Event sourcing shines in domains where:
- Historical data is as important as current state
- Audit trails are required for compliance or debugging
- Business processes evolve over time
- Complex domain logic benefits from capturing intent
Event Modeling
Before implementing event sourcing, it’s crucial to model your domain events properly. Events should:
- Be named in the past tense (e.g.,
UserRegistered
,OrderShipped
) - Contain all data relevant to the event
- Be immutable once created
- Be self-contained and understandable in isolation
Let’s model a simple e-commerce domain with events:
package ecommerce
import (
"encoding/json"
"time"
"github.com/google/uuid"
)
// Product-related events
type ProductCreated struct {
ProductID string `json:"product_id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
StockLevel int `json:"stock_level"`
}
type ProductPriceChanged struct {
ProductID string `json:"product_id"`
OldPrice float64 `json:"old_price"`
NewPrice float64 `json:"new_price"`
ChangeDate time.Time `json:"change_date"`
}
type ProductStockAdjusted struct {
ProductID string `json:"product_id"`
PreviousStock int `json:"previous_stock"`
NewStockLevel int `json:"new_stock_level"`
AdjustmentType string `json:"adjustment_type"` // "addition", "reduction", "inventory_correction"
}
// Order-related events
type OrderCreated struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
OrderDate time.Time `json:"order_date"`
TotalAmount float64 `json:"total_amount"`
}
type OrderItemAdded struct {
OrderID string `json:"order_id"`
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
UnitPrice float64 `json:"unit_price"`
}
type OrderShipped struct {
OrderID string `json:"order_id"`
ShippingDate time.Time `json:"shipping_date"`
TrackingNumber string `json:"tracking_number"`
EstimatedArrival time.Time `json:"estimated_arrival"`
}
type OrderCancelled struct {
OrderID string `json:"order_id"`
CancellationDate time.Time `json:"cancellation_date"`
Reason string `json:"reason"`
}
// Helper function to create a generic event from a specific event type
func CreateEvent(aggregateID, aggregateType string, version int, eventType string, data interface{}) (Event, error) {
dataBytes, err := json.Marshal(data)
if err != nil {
return Event{}, err
}
return Event{
ID: uuid.New().String(),
AggregateID: aggregateID,
AggregateType: aggregateType,
EventType: eventType,
Version: version,
Timestamp: time.Now().UTC(),
Data: dataBytes,
}, nil
}
Event Store Design and Implementation
The event store is the heart of any event sourcing system. It’s responsible for persisting events and allowing them to be retrieved by aggregate ID or for building projections.
Event Store Interface
Let’s define a clean interface for our event store:
package eventsourcing
import (
"context"
"errors"
)
var (
ErrConcurrencyConflict = errors.New("concurrency conflict: aggregate has been modified")
ErrAggregateNotFound = errors.New("aggregate not found")
ErrEventNotFound = errors.New("event not found")
)
// EventStore defines the interface for storing and retrieving events
type EventStore interface {
// SaveEvents persists new events to the store
SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error
// GetEvents retrieves all events for a specific aggregate
GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
// GetEventsByType retrieves events of a specific type
GetEventsByType(ctx context.Context, eventType string) ([]Event, error)
// GetAllEvents retrieves all events, optionally with pagination
GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error)
// GetAggregateVersion returns the current version of an aggregate
GetAggregateVersion(ctx context.Context, aggregateID string) (int, error)
}
In-Memory Event Store Implementation
For development and testing, an in-memory event store is useful:
package eventsourcing
import (
"context"
"sort"
"sync"
)
// InMemoryEventStore is a simple in-memory implementation of EventStore
type InMemoryEventStore struct {
mu sync.RWMutex
events map[string][]Event // Map of aggregateID to events
}
// NewInMemoryEventStore creates a new in-memory event store
func NewInMemoryEventStore() *InMemoryEventStore {
return &InMemoryEventStore{
events: make(map[string][]Event),
}
}
// SaveEvents persists new events to the store
func (s *InMemoryEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
if len(events) == 0 {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
// Check if aggregate exists
existingEvents, exists := s.events[aggregateID]
// If aggregate exists, verify version
if exists {
currentVersion := len(existingEvents)
if currentVersion != expectedVersion {
return ErrConcurrencyConflict
}
} else if expectedVersion > 0 {
// If aggregate doesn't exist but we expected a version > 0
return ErrAggregateNotFound
}
// Append new events
s.events[aggregateID] = append(existingEvents, events...)
return nil
}
// GetEvents retrieves all events for a specific aggregate
func (s *InMemoryEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
s.mu.RLock()
defer s.mu.RUnlock()
events, exists := s.events[aggregateID]
if !exists {
return nil, ErrAggregateNotFound
}
// Return a copy to prevent modification of internal state
result := make([]Event, len(events))
copy(result, events)
return result, nil
}
// GetEventsByType retrieves events of a specific type
func (s *InMemoryEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var result []Event
for _, aggregateEvents := range s.events {
for _, event := range aggregateEvents {
if event.EventType == eventType {
result = append(result, event)
}
}
}
// Sort by timestamp for consistent ordering
sort.Slice(result, func(i, j int) bool {
return result[i].Timestamp.Before(result[j].Timestamp)
})
return result, nil
}
// GetAllEvents retrieves all events, optionally with pagination
func (s *InMemoryEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var allEvents []Event
// Collect all events
for _, aggregateEvents := range s.events {
allEvents = append(allEvents, aggregateEvents...)
}
// Sort by timestamp
sort.Slice(allEvents, func(i, j int) bool {
return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
})
// Apply pagination
if offset >= len(allEvents) {
return []Event{}, nil
}
end := offset + limit
if end > len(allEvents) {
end = len(allEvents)
}
return allEvents[offset:end], nil
}
// GetAggregateVersion returns the current version of an aggregate
func (s *InMemoryEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
s.mu.RLock()
defer s.mu.RUnlock()
events, exists := s.events[aggregateID]
if !exists {
return 0, nil // New aggregates start at version 0
}
return len(events), nil
}
PostgreSQL Event Store Implementation
For production use, we need a persistent event store. Here’s an implementation using PostgreSQL:
package eventsourcing
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
_ "github.com/lib/pq"
)
// PostgresEventStore implements EventStore using PostgreSQL
type PostgresEventStore struct {
db *sql.DB
}
// NewPostgresEventStore creates a new PostgreSQL-backed event store
func NewPostgresEventStore(connectionString string) (*PostgresEventStore, error) {
db, err := sql.Open("postgres", connectionString)
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
// Test connection
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("failed to ping database: %w", err)
}
return &PostgresEventStore{db: db}, nil
}
// Initialize creates the necessary tables if they don't exist
func (s *PostgresEventStore) Initialize(ctx context.Context) error {
// Create events table
_, err := s.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS events (
id UUID PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
version INT NOT NULL,
timestamp TIMESTAMP NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
-- Create an index on aggregate_id and version for optimistic concurrency control
UNIQUE (aggregate_id, version)
);
-- Create indexes for common query patterns
CREATE INDEX IF NOT EXISTS idx_events_aggregate_id ON events(aggregate_id);
CREATE INDEX IF NOT EXISTS idx_events_event_type ON events(event_type);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
`)
if err != nil {
return fmt.Errorf("failed to create events table: %w", err)
}
return nil
}
// SaveEvents persists new events to the store
func (s *PostgresEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
if len(events) == 0 {
return nil
}
// Start a transaction
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback() // Will be ignored if transaction is committed
// Check current version
var currentVersion int
err = tx.QueryRowContext(ctx,
"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1",
aggregateID).Scan(¤tVersion)
if err != nil {
return fmt.Errorf("failed to get current version: %w", err)
}
if currentVersion != expectedVersion {
return ErrConcurrencyConflict
}
// Insert all events
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO events (
id, aggregate_id, aggregate_type, event_type,
version, timestamp, data, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()
for i, event := range events {
// Ensure event has a valid ID
if event.ID == "" {
event.ID = uuid.New().String()
}
// Set version based on expected version
event.Version = expectedVersion + i + 1
// Set timestamp if not already set
if event.Timestamp.IsZero() {
event.Timestamp = time.Now().UTC()
}
_, err = stmt.ExecContext(ctx,
event.ID, event.AggregateID, event.AggregateType, event.EventType,
event.Version, event.Timestamp, event.Data, event.Metadata)
if err != nil {
return fmt.Errorf("failed to insert event: %w", err)
}
}
// Commit the transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// GetEvents retrieves all events for a specific aggregate
func (s *PostgresEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
FROM events
WHERE aggregate_id = $1
ORDER BY version ASC
`, aggregateID)
if err != nil {
return nil, fmt.Errorf("failed to query events: %w", err)
}
defer rows.Close()
return s.scanEvents(rows)
}
// GetEventsByType retrieves events of a specific type
func (s *PostgresEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
FROM events
WHERE event_type = $1
ORDER BY timestamp ASC
`, eventType)
if err != nil {
return nil, fmt.Errorf("failed to query events by type: %w", err)
}
defer rows.Close()
return s.scanEvents(rows)
}
// GetAllEvents retrieves all events, optionally with pagination
func (s *PostgresEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, aggregate_id, aggregate_type, event_type, version, timestamp, data, metadata
FROM events
ORDER BY timestamp ASC
LIMIT $1 OFFSET $2
`, limit, offset)
if err != nil {
return nil, fmt.Errorf("failed to query all events: %w", err)
}
defer rows.Close()
return s.scanEvents(rows)
}
// GetAggregateVersion returns the current version of an aggregate
func (s *PostgresEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
var version int
err := s.db.QueryRowContext(ctx,
"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1",
aggregateID).Scan(&version)
if err != nil {
return 0, fmt.Errorf("failed to get aggregate version: %w", err)
}
return version, nil
}
// scanEvents is a helper function to scan rows into Event structs
func (s *PostgresEventStore) scanEvents(rows *sql.Rows) ([]Event, error) {
var events []Event
for rows.Next() {
var event Event
var data, metadata []byte
err := rows.Scan(
&event.ID, &event.AggregateID, &event.AggregateType, &event.EventType,
&event.Version, &event.Timestamp, &data, &metadata)
if err != nil {
return nil, fmt.Errorf("failed to scan event row: %w", err)
}
event.Data = json.RawMessage(data)
event.Metadata = json.RawMessage(metadata)
events = append(events, event)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating event rows: %w", err)
}
return events, nil
}
// Close closes the database connection
func (s *PostgresEventStore) Close() error {
return s.db.Close()
}
CQRS Pattern Integration
Command Query Responsibility Segregation (CQRS) is a pattern that separates the read and write operations of a data store. When combined with event sourcing, it creates a powerful architecture for building scalable, responsive systems.
CQRS Principles
In CQRS:
- Commands modify state but don’t return data
- Queries return data but don’t modify state
- Write models are optimized for consistency and validation
- Read models (projections) are optimized for specific query needs
This separation allows each side to be optimized independently:
- Write models can focus on business rules and consistency
- Read models can be denormalized for performance
- Different storage technologies can be used for each side
Implementing Command Handlers
Command handlers are responsible for validating commands, applying business rules, and generating events:
package eventsourcing
import (
"context"
"errors"
"fmt"
)
// BaseAggregate provides common functionality for aggregates
type BaseAggregate struct {
id string
typ string
version int
changes []Event
}
// ID returns the aggregate's identifier
func (a *BaseAggregate) ID() string {
return a.id
}
// Type returns the aggregate's type
func (a *BaseAggregate) Type() string {
return a.typ
}
// Version returns the aggregate's current version
func (a *BaseAggregate) Version() int {
return a.version
}
// Changes returns the uncommitted changes
func (a *BaseAggregate) Changes() []Event {
return a.changes
}
// ClearChanges clears the uncommitted changes
func (a *BaseAggregate) ClearChanges() {
a.changes = nil
}
// AddChange adds a new change event
func (a *BaseAggregate) AddChange(eventType string, data interface{}) error {
event, err := CreateEvent(a.id, a.typ, a.version+1, eventType, data)
if err != nil {
return err
}
a.changes = append(a.changes, event)
return nil
}
// Apply applies an event to the aggregate
func (a *BaseAggregate) Apply(event Event) {
a.version = event.Version
}
// GenericCommandHandler provides a base implementation for command handlers
type GenericCommandHandler struct {
eventStore EventStore
aggregates map[string]func(id string) Aggregate
}
// NewGenericCommandHandler creates a new command handler
func NewGenericCommandHandler(eventStore EventStore) *GenericCommandHandler {
return &GenericCommandHandler{
eventStore: eventStore,
aggregates: make(map[string]func(id string) Aggregate),
}
}
// RegisterAggregate registers an aggregate type with the command handler
func (h *GenericCommandHandler) RegisterAggregate(aggregateType string, factory func(id string) Aggregate) {
h.aggregates[aggregateType] = factory
}
// HandleCommand processes a command and generates events
func (h *GenericCommandHandler) HandleCommand(ctx context.Context, command Command) ([]Event, error) {
// Get the aggregate factory
factory, exists := h.aggregates[command.AggregateType]
if !exists {
return nil, fmt.Errorf("unknown aggregate type: %s", command.AggregateType)
}
// Create the aggregate instance
aggregate := factory(command.AggregateID)
// Load the aggregate's events
events, err := h.eventStore.GetEvents(ctx, command.AggregateID)
if err != nil && !errors.Is(err, ErrAggregateNotFound) {
return nil, fmt.Errorf("failed to load aggregate events: %w", err)
}
// Apply existing events to rebuild the aggregate's state
if err := aggregate.ApplyEvents(events); err != nil {
return nil, fmt.Errorf("failed to apply events to aggregate: %w", err)
}
// Process the command (this is implemented by concrete command handlers)
if err := h.processCommand(ctx, aggregate, command); err != nil {
return nil, err
}
// Get the changes produced by the command
changes := aggregate.(interface{ Changes() []Event }).Changes()
if len(changes) == 0 {
return nil, nil // No changes to save
}
// Save the new events
if err := h.eventStore.SaveEvents(ctx, aggregate.ID(), aggregate.Version(), changes); err != nil {
return nil, fmt.Errorf("failed to save events: %w", err)
}
// Clear the changes from the aggregate
aggregate.(interface{ ClearChanges() }).ClearChanges()
return changes, nil
}
// processCommand is implemented by concrete command handlers
func (h *GenericCommandHandler) processCommand(ctx context.Context, aggregate Aggregate, command Command) error {
return errors.New("processCommand must be implemented by concrete command handlers")
}
Implementing a Product Aggregate
Let’s implement a concrete aggregate for our e-commerce example:
package ecommerce
import (
"encoding/json"
"errors"
"fmt"
"github.com/yourdomain/eventsourcing"
)
// ProductAggregate represents a product in our e-commerce system
type ProductAggregate struct {
eventsourcing.BaseAggregate
// Product state
name string
description string
price float64
stockLevel int
isActive bool
}
// NewProductAggregate creates a new product aggregate
func NewProductAggregate(id string) eventsourcing.Aggregate {
product := &ProductAggregate{}
product.id = id
product.typ = "product"
return product
}
// ApplyEvent applies a single event to the aggregate
func (p *ProductAggregate) ApplyEvent(event eventsourcing.Event) error {
// Apply the event to the base aggregate
p.BaseAggregate.Apply(event)
// Apply event-specific logic
switch event.EventType {
case "ProductCreated":
var data ProductCreated
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
p.name = data.Name
p.description = data.Description
p.price = data.Price
p.stockLevel = data.StockLevel
p.isActive = true
case "ProductPriceChanged":
var data ProductPriceChanged
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
p.price = data.NewPrice
case "ProductStockAdjusted":
var data ProductStockAdjusted
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
p.stockLevel = data.NewStockLevel
case "ProductDeactivated":
p.isActive = false
case "ProductReactivated":
p.isActive = true
default:
return fmt.Errorf("unknown event type: %s", event.EventType)
}
return nil
}
// ApplyEvents applies multiple events to the aggregate
func (p *ProductAggregate) ApplyEvents(events []eventsourcing.Event) error {
for _, event := range events {
if err := p.ApplyEvent(event); err != nil {
return err
}
}
return nil
}
// CreateProduct creates a new product
func (p *ProductAggregate) CreateProduct(name, description string, price float64, stockLevel int) error {
// Validate inputs
if name == "" {
return errors.New("product name cannot be empty")
}
if price <= 0 {
return errors.New("product price must be positive")
}
if stockLevel < 0 {
return errors.New("stock level cannot be negative")
}
// Check if product already exists
if p.name != "" {
return errors.New("product already exists")
}
// Create the event
return p.AddChange("ProductCreated", ProductCreated{
ProductID: p.id,
Name: name,
Description: description,
Price: price,
StockLevel: stockLevel,
})
}
// ChangePrice changes the product's price
func (p *ProductAggregate) ChangePrice(newPrice float64) error {
// Validate inputs
if newPrice <= 0 {
return errors.New("product price must be positive")
}
// Check if product exists
if p.name == "" {
return errors.New("product does not exist")
}
// Check if price is actually changing
if p.price == newPrice {
return nil // No change needed
}
// Create the event
return p.AddChange("ProductPriceChanged", ProductPriceChanged{
ProductID: p.id,
OldPrice: p.price,
NewPrice: newPrice,
ChangeDate: time.Now().UTC(),
})
}
// AdjustStock adjusts the product's stock level
func (p *ProductAggregate) AdjustStock(adjustment int, adjustmentType string) error {
// Check if product exists
if p.name == "" {
return errors.New("product does not exist")
}
// Calculate new stock level
newStockLevel := p.stockLevel + adjustment
// Validate new stock level
if newStockLevel < 0 {
return errors.New("stock level cannot be negative")
}
// Create the event
return p.AddChange("ProductStockAdjusted", ProductStockAdjusted{
ProductID: p.id,
PreviousStock: p.stockLevel,
NewStockLevel: newStockLevel,
AdjustmentType: adjustmentType,
})
}
Product Command Handler
Now let’s implement a command handler for our product aggregate:
package ecommerce
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/yourdomain/eventsourcing"
)
// ProductCommands
type CreateProductCommand struct {
ProductID string `json:"product_id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
StockLevel int `json:"stock_level"`
}
type ChangePriceCommand struct {
ProductID string `json:"product_id"`
NewPrice float64 `json:"new_price"`
}
type AdjustStockCommand struct {
ProductID string `json:"product_id"`
Adjustment int `json:"adjustment"`
AdjustmentType string `json:"adjustment_type"`
}
// ProductCommandHandler handles commands for the product aggregate
type ProductCommandHandler struct {
*eventsourcing.GenericCommandHandler
}
// NewProductCommandHandler creates a new product command handler
func NewProductCommandHandler(eventStore eventsourcing.EventStore) *ProductCommandHandler {
handler := &ProductCommandHandler{
GenericCommandHandler: eventsourcing.NewGenericCommandHandler(eventStore),
}
// Register the product aggregate
handler.RegisterAggregate("product", NewProductAggregate)
return handler
}
// processCommand processes a command for a product aggregate
func (h *ProductCommandHandler) processCommand(ctx context.Context, aggregate eventsourcing.Aggregate, command eventsourcing.Command) error {
product, ok := aggregate.(*ProductAggregate)
if !ok {
return errors.New("invalid aggregate type")
}
switch command.CommandType {
case "CreateProduct":
var cmd CreateProductCommand
if err := json.Unmarshal(command.Data, &cmd); err != nil {
return err
}
return product.CreateProduct(cmd.Name, cmd.Description, cmd.Price, cmd.StockLevel)
case "ChangePrice":
var cmd ChangePriceCommand
if err := json.Unmarshal(command.Data, &cmd); err != nil {
return err
}
return product.ChangePrice(cmd.NewPrice)
case "AdjustStock":
var cmd AdjustStockCommand
if err := json.Unmarshal(command.Data, &cmd); err != nil {
return err
}
return product.AdjustStock(cmd.Adjustment, cmd.AdjustmentType)
default:
return fmt.Errorf("unknown command type: %s", command.CommandType)
}
}
Event Processing and Projections
In event sourcing, projections are read models built by consuming events and updating query-friendly data structures. They transform the event stream into optimized views for specific query needs.
Projection Fundamentals
Projections have several key characteristics:
- Derived Data: They contain no original data, only derived views of events
- Rebuilable: They can be rebuilt from scratch by replaying events
- Query-Optimized: They are structured for specific query patterns
- Eventually Consistent: They may lag slightly behind the write side
Let’s implement a simple projection system:
package eventsourcing
import (
"context"
"fmt"
"sync"
)
// Projection defines the interface for a read model projection
type Projection interface {
// Name returns the unique name of the projection
Name() string
// HandleEvent processes an event and updates the projection
HandleEvent(ctx context.Context, event Event) error
// Reset clears the projection state
Reset(ctx context.Context) error
}
// ProjectionManager manages a set of projections
type ProjectionManager struct {
eventStore EventStore
projections map[string]Projection
mu sync.RWMutex
}
// NewProjectionManager creates a new projection manager
func NewProjectionManager(eventStore EventStore) *ProjectionManager {
return &ProjectionManager{
eventStore: eventStore,
projections: make(map[string]Projection),
}
}
// RegisterProjection registers a projection with the manager
func (m *ProjectionManager) RegisterProjection(projection Projection) {
m.mu.Lock()
defer m.mu.Unlock()
m.projections[projection.Name()] = projection
}
// HandleEvent processes an event through all registered projections
func (m *ProjectionManager) HandleEvent(ctx context.Context, event Event) error {
m.mu.RLock()
defer m.mu.RUnlock()
for _, projection := range m.projections {
if err := projection.HandleEvent(ctx, event); err != nil {
return fmt.Errorf("projection %s failed to handle event: %w", projection.Name(), err)
}
}
return nil
}
// RebuildProjection rebuilds a specific projection from scratch
func (m *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error {
m.mu.RLock()
projection, exists := m.projections[projectionName]
m.mu.RUnlock()
if !exists {
return fmt.Errorf("projection %s not found", projectionName)
}
// Reset the projection
if err := projection.Reset(ctx); err != nil {
return fmt.Errorf("failed to reset projection: %w", err)
}
// Get all events
offset := 0
batchSize := 1000
for {
events, err := m.eventStore.GetAllEvents(ctx, offset, batchSize)
if err != nil {
return fmt.Errorf("failed to get events: %w", err)
}
// Process each event
for _, event := range events {
if err := projection.HandleEvent(ctx, event); err != nil {
return fmt.Errorf("failed to handle event during rebuild: %w", err)
}
}
// If we got fewer events than the batch size, we're done
if len(events) < batchSize {
break
}
// Move to the next batch
offset += batchSize
}
return nil
}
// RebuildAllProjections rebuilds all registered projections
func (m *ProjectionManager) RebuildAllProjections(ctx context.Context) error {
m.mu.RLock()
projectionNames := make([]string, 0, len(m.projections))
for name := range m.projections {
projectionNames = append(projectionNames, name)
}
m.mu.RUnlock()
for _, name := range projectionNames {
if err := m.RebuildProjection(ctx, name); err != nil {
return err
}
}
return nil
}
Product Catalog Projection
Let’s implement a concrete projection for our e-commerce product catalog:
package ecommerce
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/yourdomain/eventsourcing"
)
// ProductCatalogProjection maintains a read-optimized view of products
type ProductCatalogProjection struct {
db *sql.DB
}
// NewProductCatalogProjection creates a new product catalog projection
func NewProductCatalogProjection(db *sql.DB) *ProductCatalogProjection {
return &ProductCatalogProjection{db: db}
}
// Name returns the unique name of the projection
func (p *ProductCatalogProjection) Name() string {
return "product_catalog"
}
// Initialize creates the necessary tables if they don't exist
func (p *ProductCatalogProjection) Initialize(ctx context.Context) error {
_, err := p.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS product_catalog (
product_id VARCHAR(255) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
price DECIMAL(10, 2) NOT NULL,
stock_level INT NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_product_catalog_name ON product_catalog(name);
CREATE INDEX IF NOT EXISTS idx_product_catalog_price ON product_catalog(price);
`)
return err
}
// HandleEvent processes an event and updates the projection
func (p *ProductCatalogProjection) HandleEvent(ctx context.Context, event eventsourcing.Event) error {
switch event.EventType {
case "ProductCreated":
return p.handleProductCreated(ctx, event)
case "ProductPriceChanged":
return p.handleProductPriceChanged(ctx, event)
case "ProductStockAdjusted":
return p.handleProductStockAdjusted(ctx, event)
}
// Ignore events we don't care about
return nil
}
// Reset clears the projection state
func (p *ProductCatalogProjection) Reset(ctx context.Context) error {
_, err := p.db.ExecContext(ctx, "DELETE FROM product_catalog")
return err
}
// handleProductCreated processes a ProductCreated event
func (p *ProductCatalogProjection) handleProductCreated(ctx context.Context, event eventsourcing.Event) error {
var data ProductCreated
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
_, err := p.db.ExecContext(ctx, `
INSERT INTO product_catalog (
product_id, name, description, price, stock_level, is_active, created_at, updated_at
) VALUES (
$1, $2, $3, $4, $5, TRUE, $6, $6
)
`, data.ProductID, data.Name, data.Description, data.Price, data.StockLevel, event.Timestamp)
return err
}
// handleProductPriceChanged processes a ProductPriceChanged event
func (p *ProductCatalogProjection) handleProductPriceChanged(ctx context.Context, event eventsourcing.Event) error {
var data ProductPriceChanged
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
_, err := p.db.ExecContext(ctx, `
UPDATE product_catalog
SET price = $1, updated_at = $2
WHERE product_id = $3
`, data.NewPrice, event.Timestamp, data.ProductID)
return err
}
// handleProductStockAdjusted processes a ProductStockAdjusted event
func (p *ProductCatalogProjection) handleProductStockAdjusted(ctx context.Context, event eventsourcing.Event) error {
var data ProductStockAdjusted
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
_, err := p.db.ExecContext(ctx, `
UPDATE product_catalog
SET stock_level = $1, updated_at = $2
WHERE product_id = $3
`, data.NewStockLevel, event.Timestamp, data.ProductID)
return err
}
Snapshotting and Performance Optimization
As event stores grow, rebuilding aggregates from their entire event history becomes increasingly expensive. Snapshotting addresses this by periodically capturing the aggregate’s state, allowing us to rebuild from the snapshot plus subsequent events.
Snapshot Implementation
Let’s implement a snapshotting system:
package eventsourcing
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/google/uuid"
)
// Snapshot represents a point-in-time capture of an aggregate's state
type Snapshot struct {
ID string // Unique identifier for the snapshot
AggregateID string // ID of the aggregate
AggregateType string // Type of the aggregate
Version int // Version of the aggregate at snapshot time
Timestamp time.Time // When the snapshot was created
Data json.RawMessage // Serialized aggregate state
}
// Snapshotter defines the interface for snapshot storage
type Snapshotter interface {
// SaveSnapshot persists a snapshot
SaveSnapshot(ctx context.Context, snapshot Snapshot) error
// GetLatestSnapshot retrieves the most recent snapshot for an aggregate
GetLatestSnapshot(ctx context.Context, aggregateID string) (Snapshot, error)
}
// SnapshotableAggregate is an aggregate that supports snapshotting
type SnapshotableAggregate interface {
Aggregate
// ToSnapshot serializes the aggregate state for snapshotting
ToSnapshot() (json.RawMessage, error)
// FromSnapshot deserializes a snapshot into the aggregate state
FromSnapshot(data json.RawMessage) error
}
// PostgresSnapshotter implements Snapshotter using PostgreSQL
type PostgresSnapshotter struct {
db *sql.DB
}
// NewPostgresSnapshotter creates a new PostgreSQL-backed snapshotter
func NewPostgresSnapshotter(db *sql.DB) *PostgresSnapshotter {
return &PostgresSnapshotter{db: db}
}
// Initialize creates the necessary tables if they don't exist
func (s *PostgresSnapshotter) Initialize(ctx context.Context) error {
_, err := s.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS snapshots (
id UUID PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
version INT NOT NULL,
timestamp TIMESTAMP NOT NULL,
data JSONB NOT NULL,
-- Create an index on aggregate_id and version for efficient retrieval
UNIQUE (aggregate_id, version)
);
-- Create indexes for common query patterns
CREATE INDEX IF NOT EXISTS idx_snapshots_aggregate_id ON snapshots(aggregate_id);
`)
return err
}
// SaveSnapshot persists a snapshot
func (s *PostgresSnapshotter) SaveSnapshot(ctx context.Context, snapshot Snapshot) error {
// Ensure snapshot has a valid ID
if snapshot.ID == "" {
snapshot.ID = uuid.New().String()
}
// Set timestamp if not already set
if snapshot.Timestamp.IsZero() {
snapshot.Timestamp = time.Now().UTC()
}
_, err := s.db.ExecContext(ctx, `
INSERT INTO snapshots (
id, aggregate_id, aggregate_type, version, timestamp, data
) VALUES (
$1, $2, $3, $4, $5, $6
)
`, snapshot.ID, snapshot.AggregateID, snapshot.AggregateType,
snapshot.Version, snapshot.Timestamp, snapshot.Data)
return err
}
// GetLatestSnapshot retrieves the most recent snapshot for an aggregate
func (s *PostgresSnapshotter) GetLatestSnapshot(ctx context.Context, aggregateID string) (Snapshot, error) {
var snapshot Snapshot
err := s.db.QueryRowContext(ctx, `
SELECT id, aggregate_id, aggregate_type, version, timestamp, data
FROM snapshots
WHERE aggregate_id = $1
ORDER BY version DESC
LIMIT 1
`, aggregateID).Scan(
&snapshot.ID, &snapshot.AggregateID, &snapshot.AggregateType,
&snapshot.Version, &snapshot.Timestamp, &snapshot.Data,
)
if err == sql.ErrNoRows {
return Snapshot{}, errors.New("snapshot not found")
}
return snapshot, err
}
Snapshotting Strategy
Now let’s implement a strategy for when to create snapshots:
package eventsourcing
import (
"context"
"fmt"
)
// SnapshotStrategy defines when snapshots should be created
type SnapshotStrategy interface {
// ShouldSnapshot determines if a snapshot should be created
ShouldSnapshot(aggregateID string, eventCount int, lastSnapshotVersion int) bool
}
// IntervalSnapshotStrategy creates snapshots at fixed event intervals
type IntervalSnapshotStrategy struct {
interval int // Number of events between snapshots
}
// NewIntervalSnapshotStrategy creates a new interval-based snapshot strategy
func NewIntervalSnapshotStrategy(interval int) *IntervalSnapshotStrategy {
if interval <= 0 {
interval = 100 // Default to 100 events
}
return &IntervalSnapshotStrategy{interval: interval}
}
// ShouldSnapshot determines if a snapshot should be created
func (s *IntervalSnapshotStrategy) ShouldSnapshot(aggregateID string, eventCount int, lastSnapshotVersion int) bool {
return eventCount >= lastSnapshotVersion+s.interval
}
// SnapshotManager coordinates snapshot creation and loading
type SnapshotManager struct {
eventStore EventStore
snapshotter Snapshotter
strategy SnapshotStrategy
aggregateFactories map[string]func(id string) SnapshotableAggregate
}
// NewSnapshotManager creates a new snapshot manager
func NewSnapshotManager(
eventStore EventStore,
snapshotter Snapshotter,
strategy SnapshotStrategy,
) *SnapshotManager {
return &SnapshotManager{
eventStore: eventStore,
snapshotter: snapshotter,
strategy: strategy,
aggregateFactories: make(map[string]func(id string) SnapshotableAggregate),
}
}
// RegisterAggregate registers an aggregate type with the snapshot manager
func (m *SnapshotManager) RegisterAggregate(
aggregateType string,
factory func(id string) SnapshotableAggregate,
) {
m.aggregateFactories[aggregateType] = factory
}
// GetAggregate loads an aggregate, using snapshots if available
func (m *SnapshotManager) GetAggregate(
ctx context.Context,
aggregateID string,
aggregateType string,
) (SnapshotableAggregate, error) {
// Get the aggregate factory
factory, exists := m.aggregateFactories[aggregateType]
if !exists {
return nil, fmt.Errorf("unknown aggregate type: %s", aggregateType)
}
// Create the aggregate instance
aggregate := factory(aggregateID)
// Try to load the latest snapshot
snapshot, err := m.snapshotter.GetLatestSnapshot(ctx, aggregateID)
var snapshotVersion int
if err == nil {
// Apply the snapshot
if err := aggregate.FromSnapshot(snapshot.Data); err != nil {
return nil, fmt.Errorf("failed to apply snapshot: %w", err)
}
snapshotVersion = snapshot.Version
} else {
// No snapshot available, start from scratch
snapshotVersion = 0
}
// Load events after the snapshot
events, err := m.eventStore.GetEventsAfterVersion(ctx, aggregateID, snapshotVersion)
if err != nil {
return nil, fmt.Errorf("failed to load events: %w", err)
}
// Apply the events
if err := aggregate.ApplyEvents(events); err != nil {
return nil, fmt.Errorf("failed to apply events: %w", err)
}
// Check if we should create a new snapshot
if m.strategy.ShouldSnapshot(aggregateID, aggregate.Version(), snapshotVersion) {
if err := m.CreateSnapshot(ctx, aggregate); err != nil {
// Log the error but continue
fmt.Printf("Failed to create snapshot: %v\n", err)
}
}
return aggregate, nil
}
// CreateSnapshot creates a snapshot of the aggregate's current state
func (m *SnapshotManager) CreateSnapshot(ctx context.Context, aggregate SnapshotableAggregate) error {
// Serialize the aggregate state
data, err := aggregate.ToSnapshot()
if err != nil {
return fmt.Errorf("failed to serialize aggregate: %w", err)
}
// Create the snapshot
snapshot := Snapshot{
AggregateID: aggregate.ID(),
AggregateType: aggregate.Type(),
Version: aggregate.Version(),
Data: data,
}
// Save the snapshot
return m.snapshotter.SaveSnapshot(ctx, snapshot)
}
Making ProductAggregate Snapshotable
Let’s update our ProductAggregate to support snapshotting:
package ecommerce
import (
"encoding/json"
"github.com/yourdomain/eventsourcing"
)
// ProductSnapshot represents the serialized state of a ProductAggregate
type ProductSnapshot struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
StockLevel int `json:"stock_level"`
IsActive bool `json:"is_active"`
Version int `json:"version"`
}
// ToSnapshot serializes the aggregate state for snapshotting
func (p *ProductAggregate) ToSnapshot() (json.RawMessage, error) {
snapshot := ProductSnapshot{
ID: p.id,
Name: p.name,
Description: p.description,
Price: p.price,
StockLevel: p.stockLevel,
IsActive: p.isActive,
Version: p.version,
}
return json.Marshal(snapshot)
}
// FromSnapshot deserializes a snapshot into the aggregate state
func (p *ProductAggregate) FromSnapshot(data json.RawMessage) error {
var snapshot ProductSnapshot
if err := json.Unmarshal(data, &snapshot); err != nil {
return err
}
p.id = snapshot.ID
p.name = snapshot.Name
p.description = snapshot.Description
p.price = snapshot.Price
p.stockLevel = snapshot.StockLevel
p.isActive = snapshot.IsActive
p.version = snapshot.Version
return nil
}
Event Versioning and Migration
As your system evolves, event schemas will change. Event versioning and migration strategies are essential for maintaining compatibility with historical events.
Event Versioning Strategies
There are several approaches to event versioning:
- Explicit Versioning: Include a version field in the event type or payload
- Upcasting: Transform old event versions to new versions during loading
- Event Adapter Pattern: Use adapters to handle different event versions
Let’s implement an upcasting approach:
package eventsourcing
import (
"encoding/json"
"fmt"
)
// EventUpcaster transforms events from older versions to newer versions
type EventUpcaster interface {
// CanUpcast returns true if this upcaster can handle the given event
CanUpcast(event Event) bool
// Upcast transforms the event to the latest version
Upcast(event Event) (Event, error)
}
// UpcasterChain chains multiple upcasters together
type UpcasterChain struct {
upcasters []EventUpcaster
}
// NewUpcasterChain creates a new upcaster chain
func NewUpcasterChain(upcasters ...EventUpcaster) *UpcasterChain {
return &UpcasterChain{upcasters: upcasters}
}
// AddUpcaster adds an upcaster to the chain
func (c *UpcasterChain) AddUpcaster(upcaster EventUpcaster) {
c.upcasters = append(c.upcasters, upcaster)
}
// Upcast transforms an event through all applicable upcasters
func (c *UpcasterChain) Upcast(event Event) (Event, error) {
result := event
for _, upcaster := range c.upcasters {
if upcaster.CanUpcast(result) {
var err error
result, err = upcaster.Upcast(result)
if err != nil {
return Event{}, fmt.Errorf("upcasting failed: %w", err)
}
}
}
return result, nil
}
// Example upcaster for ProductCreated events
type ProductCreatedV1ToV2Upcaster struct{}
func (u *ProductCreatedV1ToV2Upcaster) CanUpcast(event Event) bool {
return event.EventType == "ProductCreated" && !hasField(event.Data, "category")
}
func (u *ProductCreatedV1ToV2Upcaster) Upcast(event Event) (Event, error) {
// Parse the original data
var v1Data map[string]interface{}
if err := json.Unmarshal(event.Data, &v1Data); err != nil {
return Event{}, err
}
// Add the missing field with a default value
v1Data["category"] = "uncategorized"
// Serialize back to JSON
newData, err := json.Marshal(v1Data)
if err != nil {
return Event{}, err
}
// Create the updated event
result := event
result.Data = json.RawMessage(newData)
return result, nil
}
// Helper function to check if a JSON object has a field
func hasField(data json.RawMessage, field string) bool {
var obj map[string]interface{}
if err := json.Unmarshal(data, &obj); err != nil {
return false
}
_, exists := obj[field]
return exists
}
Integrating Upcasters with Event Store
Now let’s update our event store to use upcasters:
package eventsourcing
import (
"context"
)
// UpcasterEventStore wraps an EventStore and applies upcasting
type UpcasterEventStore struct {
eventStore EventStore
upcaster *UpcasterChain
}
// NewUpcasterEventStore creates a new upcaster event store
func NewUpcasterEventStore(eventStore EventStore, upcaster *UpcasterChain) *UpcasterEventStore {
return &UpcasterEventStore{
eventStore: eventStore,
upcaster: upcaster,
}
}
// SaveEvents persists new events to the store
func (s *UpcasterEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
// Pass through to the underlying store
return s.eventStore.SaveEvents(ctx, aggregateID, expectedVersion, events)
}
// GetEvents retrieves all events for a specific aggregate
func (s *UpcasterEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
// Get events from the underlying store
events, err := s.eventStore.GetEvents(ctx, aggregateID)
if err != nil {
return nil, err
}
// Upcast each event
result := make([]Event, len(events))
for i, event := range events {
upcasted, err := s.upcaster.Upcast(event)
if err != nil {
return nil, err
}
result[i] = upcasted
}
return result, nil
}
// GetEventsByType retrieves events of a specific type
func (s *UpcasterEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
// Get events from the underlying store
events, err := s.eventStore.GetEventsByType(ctx, eventType)
if err != nil {
return nil, err
}
// Upcast each event
result := make([]Event, len(events))
for i, event := range events {
upcasted, err := s.upcaster.Upcast(event)
if err != nil {
return nil, err
}
result[i] = upcasted
}
return result, nil
}
// GetAllEvents retrieves all events, optionally with pagination
func (s *UpcasterEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
// Get events from the underlying store
events, err := s.eventStore.GetAllEvents(ctx, offset, limit)
if err != nil {
return nil, err
}
// Upcast each event
result := make([]Event, len(events))
for i, event := range events {
upcasted, err := s.upcaster.Upcast(event)
if err != nil {
return nil, err
}
result[i] = upcasted
}
return result, nil
}
// GetAggregateVersion returns the current version of an aggregate
func (s *UpcasterEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
// Pass through to the underlying store
return s.eventStore.GetAggregateVersion(ctx, aggregateID)
}
// GetEventsAfterVersion retrieves events after a specific version
func (s *UpcasterEventStore) GetEventsAfterVersion(ctx context.Context, aggregateID string, version int) ([]Event, error) {
// Get events from the underlying store
events, err := s.eventStore.GetEventsAfterVersion(ctx, aggregateID, version)
if err != nil {
return nil, err
}
// Upcast each event
result := make([]Event, len(events))
for i, event := range events {
upcasted, err := s.upcaster.Upcast(event)
if err != nil {
return nil, err
}
result[i] = upcasted
}
return result, nil
}
Saga Pattern Implementation
In distributed systems, transactions that span multiple services present a significant challenge. The Saga pattern addresses this by breaking a distributed transaction into a sequence of local transactions, each with a compensating action to handle failures.
Saga Fundamentals
A saga is a sequence of local transactions where:
- Each local transaction updates a single service
- Each local transaction publishes an event to trigger the next transaction
- If a transaction fails, compensating transactions undo the changes
Let’s implement a saga framework for our e-commerce system:
package eventsourcing
import (
"context"
"errors"
"fmt"
"sync"
)
// SagaStep represents a single step in a saga
type SagaStep struct {
// Action to perform for this step
Action func(ctx context.Context, data interface{}) error
// Compensation to run if the saga needs to be rolled back
Compensation func(ctx context.Context, data interface{}) error
}
// Saga coordinates a sequence of steps that should be executed as a unit
type Saga struct {
name string
steps []SagaStep
data interface{}
mu sync.Mutex
// Track which steps have been executed
executedSteps int
}
// NewSaga creates a new saga with the given name
func NewSaga(name string, data interface{}) *Saga {
return &Saga{
name: name,
data: data,
steps: []SagaStep{},
}
}
// AddStep adds a step to the saga
func (s *Saga) AddStep(action, compensation func(ctx context.Context, data interface{}) error) {
s.steps = append(s.steps, SagaStep{
Action: action,
Compensation: compensation,
})
}
// Execute runs the saga, executing each step in sequence
func (s *Saga) Execute(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
// Start from where we left off (useful for resuming sagas)
for i := s.executedSteps; i < len(s.steps); i++ {
step := s.steps[i]
// Execute the step's action
if err := step.Action(ctx, s.data); err != nil {
// If the action fails, compensate
return s.compensate(ctx, i)
}
// Mark this step as executed
s.executedSteps = i + 1
}
return nil
}
// compensate runs the compensation functions for executed steps in reverse order
func (s *Saga) compensate(ctx context.Context, failedStepIndex int) error {
var compensationErr error
// Run compensations in reverse order
for i := failedStepIndex - 1; i >= 0; i-- {
step := s.steps[i]
if step.Compensation != nil {
if err := step.Compensation(ctx, s.data); err != nil {
// Log compensation errors but continue with other compensations
compensationErr = errors.Join(compensationErr,
fmt.Errorf("compensation failed for step %d: %w", i, err))
}
}
}
return compensationErr
}
Order Processing Saga Example
Let’s implement a concrete saga for processing orders in our e-commerce system:
package ecommerce
import (
"context"
"errors"
"fmt"
"time"
"github.com/yourdomain/eventsourcing"
)
// OrderData contains all the data needed for the order processing saga
type OrderData struct {
OrderID string
CustomerID string
Items []OrderItem
TotalAmount float64
PaymentID string
ShipmentID string
InventoryLock string
}
type OrderItem struct {
ProductID string
Quantity int
UnitPrice float64
}
// OrderProcessingSaga coordinates the order processing workflow
type OrderProcessingSaga struct {
saga *eventsourcing.Saga
eventStore eventsourcing.EventStore
commandBus CommandBus
orderData *OrderData
}
// NewOrderProcessingSaga creates a new order processing saga
func NewOrderProcessingSaga(
orderData *OrderData,
eventStore eventsourcing.EventStore,
commandBus CommandBus,
) *OrderProcessingSaga {
s := &OrderProcessingSaga{
orderData: orderData,
eventStore: eventStore,
commandBus: commandBus,
}
// Create the underlying saga
s.saga = eventsourcing.NewSaga("OrderProcessing", orderData)
// Define the saga steps
s.defineSagaSteps()
return s
}
// defineSagaSteps sets up the steps for the order processing saga
func (s *OrderProcessingSaga) defineSagaSteps() {
// Step 1: Reserve inventory
s.saga.AddStep(
// Action
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Create a command to reserve inventory
cmd := ReserveInventoryCommand{
OrderID: orderData.OrderID,
Items: orderData.Items,
}
// Send the command
result, err := s.commandBus.Send(ctx, "inventory", "ReserveInventory", cmd)
if err != nil {
return fmt.Errorf("failed to reserve inventory: %w", err)
}
// Store the inventory lock ID
orderData.InventoryLock = result.(string)
return nil
},
// Compensation
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Only compensate if we actually got a lock
if orderData.InventoryLock == "" {
return nil
}
// Create a command to release inventory
cmd := ReleaseInventoryCommand{
OrderID: orderData.OrderID,
LockID: orderData.InventoryLock,
}
// Send the command
_, err := s.commandBus.Send(ctx, "inventory", "ReleaseInventory", cmd)
if err != nil {
return fmt.Errorf("failed to release inventory: %w", err)
}
return nil
},
)
// Step 2: Process payment
s.saga.AddStep(
// Action
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Create a command to process payment
cmd := ProcessPaymentCommand{
OrderID: orderData.OrderID,
CustomerID: orderData.CustomerID,
Amount: orderData.TotalAmount,
}
// Send the command
result, err := s.commandBus.Send(ctx, "payment", "ProcessPayment", cmd)
if err != nil {
return fmt.Errorf("failed to process payment: %w", err)
}
// Store the payment ID
orderData.PaymentID = result.(string)
return nil
},
// Compensation
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Only compensate if we actually processed a payment
if orderData.PaymentID == "" {
return nil
}
// Create a command to refund payment
cmd := RefundPaymentCommand{
OrderID: orderData.OrderID,
PaymentID: orderData.PaymentID,
}
// Send the command
_, err := s.commandBus.Send(ctx, "payment", "RefundPayment", cmd)
if err != nil {
return fmt.Errorf("failed to refund payment: %w", err)
}
return nil
},
)
// Step 3: Create shipment
s.saga.AddStep(
// Action
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Create a command to create shipment
cmd := CreateShipmentCommand{
OrderID: orderData.OrderID,
CustomerID: orderData.CustomerID,
Items: orderData.Items,
}
// Send the command
result, err := s.commandBus.Send(ctx, "shipping", "CreateShipment", cmd)
if err != nil {
return fmt.Errorf("failed to create shipment: %w", err)
}
// Store the shipment ID
orderData.ShipmentID = result.(string)
return nil
},
// Compensation
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Only compensate if we actually created a shipment
if orderData.ShipmentID == "" {
return nil
}
// Create a command to cancel shipment
cmd := CancelShipmentCommand{
OrderID: orderData.OrderID,
ShipmentID: orderData.ShipmentID,
}
// Send the command
_, err := s.commandBus.Send(ctx, "shipping", "CancelShipment", cmd)
if err != nil {
return fmt.Errorf("failed to cancel shipment: %w", err)
}
return nil
},
)
// Step 4: Complete order
s.saga.AddStep(
// Action
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Create a command to complete the order
cmd := CompleteOrderCommand{
OrderID: orderData.OrderID,
PaymentID: orderData.PaymentID,
ShipmentID: orderData.ShipmentID,
}
// Send the command
_, err := s.commandBus.Send(ctx, "order", "CompleteOrder", cmd)
if err != nil {
return fmt.Errorf("failed to complete order: %w", err)
}
return nil
},
// Compensation
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Create a command to cancel the order
cmd := CancelOrderCommand{
OrderID: orderData.OrderID,
Reason: "Saga rollback",
}
// Send the command
_, err := s.commandBus.Send(ctx, "order", "CancelOrder", cmd)
if err != nil {
return fmt.Errorf("failed to cancel order: %w", err)
}
return nil
},
)
}
// Execute runs the order processing saga
func (s *OrderProcessingSaga) Execute(ctx context.Context) error {
return s.saga.Execute(ctx)
}
// CommandBus is a simple interface for sending commands to different services
type CommandBus interface {
Send(ctx context.Context, service, commandType string, command interface{}) (interface{}, error)
}
Production Deployment and Monitoring
Deploying event-sourced systems to production requires careful consideration of performance, scalability, and observability. Let’s explore key aspects of running event sourcing in production.
Event Store Scaling
Event stores must scale to handle high throughput and growing event history. Consider these strategies:
- Partitioning: Divide events by aggregate type or ID to distribute load
- Read Replicas: Use separate read-only replicas for projections
- Event Pruning: Archive old events that are no longer needed for active aggregates
Here’s an example of a partitioned event store:
package eventsourcing
import (
"context"
"fmt"
"hash/fnv"
)
// PartitionedEventStore distributes events across multiple event stores
type PartitionedEventStore struct {
partitions []EventStore
numPartitions int
}
// NewPartitionedEventStore creates a new partitioned event store
func NewPartitionedEventStore(partitions []EventStore) *PartitionedEventStore {
return &PartitionedEventStore{
partitions: partitions,
numPartitions: len(partitions),
}
}
// getPartition determines which partition to use for a given aggregate ID
func (s *PartitionedEventStore) getPartition(aggregateID string) (EventStore, error) {
if s.numPartitions == 0 {
return nil, errors.New("no partitions available")
}
// Hash the aggregate ID to determine the partition
h := fnv.New32a()
h.Write([]byte(aggregateID))
partitionIndex := int(h.Sum32()) % s.numPartitions
return s.partitions[partitionIndex], nil
}
// SaveEvents persists new events to the appropriate partition
func (s *PartitionedEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
partition, err := s.getPartition(aggregateID)
if err != nil {
return err
}
return partition.SaveEvents(ctx, aggregateID, expectedVersion, events)
}
// GetEvents retrieves all events for a specific aggregate from the appropriate partition
func (s *PartitionedEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
partition, err := s.getPartition(aggregateID)
if err != nil {
return nil, err
}
return partition.GetEvents(ctx, aggregateID)
}
// GetEventsByType retrieves events of a specific type from all partitions
func (s *PartitionedEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
var allEvents []Event
// Query each partition
for _, partition := range s.partitions {
events, err := partition.GetEventsByType(ctx, eventType)
if err != nil {
return nil, err
}
allEvents = append(allEvents, events...)
}
// Sort events by timestamp
sort.Slice(allEvents, func(i, j int) bool {
return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
})
return allEvents, nil
}
// GetAllEvents retrieves all events from all partitions
func (s *PartitionedEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
// This is inefficient for large event stores
// In a real implementation, you would need a more sophisticated approach
var allEvents []Event
// Query each partition
for _, partition := range s.partitions {
events, err := partition.GetAllEvents(ctx, 0, offset+limit)
if err != nil {
return nil, err
}
allEvents = append(allEvents, events...)
}
// Sort events by timestamp
sort.Slice(allEvents, func(i, j int) bool {
return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
})
// Apply pagination
if offset >= len(allEvents) {
return []Event{}, nil
}
end := offset + limit
if end > len(allEvents) {
end = len(allEvents)
}
return allEvents[offset:end], nil
}
// GetAggregateVersion returns the current version of an aggregate
func (s *PartitionedEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
partition, err := s.getPartition(aggregateID)
if err != nil {
return 0, err
}
return partition.GetAggregateVersion(ctx, aggregateID)
}
Monitoring and Metrics
Effective monitoring is crucial for event-sourced systems. Here’s a metrics collector for event stores:
package eventsourcing
import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// MetricsEventStore wraps an EventStore and collects metrics
type MetricsEventStore struct {
eventStore EventStore
// Metrics
eventSaveCount *prometheus.CounterVec
eventSaveDuration *prometheus.HistogramVec
eventReadCount *prometheus.CounterVec
eventReadDuration *prometheus.HistogramVec
}
// NewMetricsEventStore creates a new metrics-collecting event store
func NewMetricsEventStore(eventStore EventStore, registry *prometheus.Registry) *MetricsEventStore {
store := &MetricsEventStore{
eventStore: eventStore,
eventSaveCount: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "event_store_save_total",
Help: "Total number of events saved",
},
[]string{"aggregate_type"},
),
eventSaveDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "event_store_save_duration_seconds",
Help: "Duration of event save operations",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
},
[]string{"aggregate_type"},
),
eventReadCount: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "event_store_read_total",
Help: "Total number of event read operations",
},
[]string{"operation"},
),
eventReadDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "event_store_read_duration_seconds",
Help: "Duration of event read operations",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
},
[]string{"operation"},
),
}
// Register metrics with Prometheus
registry.MustRegister(
store.eventSaveCount,
store.eventSaveDuration,
store.eventReadCount,
store.eventReadDuration,
)
return store
}
// SaveEvents persists new events to the store
func (s *MetricsEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
if len(events) == 0 {
return nil
}
// Get the aggregate type from the first event
aggregateType := events[0].AggregateType
// Record metrics
timer := prometheus.NewTimer(s.eventSaveDuration.WithLabelValues(aggregateType))
defer timer.ObserveDuration()
err := s.eventStore.SaveEvents(ctx, aggregateID, expectedVersion, events)
if err == nil {
s.eventSaveCount.WithLabelValues(aggregateType).Add(float64(len(events)))
}
return err
}
// GetEvents retrieves all events for a specific aggregate
func (s *MetricsEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetEvents"))
defer timer.ObserveDuration()
events, err := s.eventStore.GetEvents(ctx, aggregateID)
if err == nil {
s.eventReadCount.WithLabelValues("GetEvents").Inc()
}
return events, err
}
// GetEventsByType retrieves events of a specific type
func (s *MetricsEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetEventsByType"))
defer timer.ObserveDuration()
events, err := s.eventStore.GetEventsByType(ctx, eventType)
if err == nil {
s.eventReadCount.WithLabelValues("GetEventsByType").Inc()
}
return events, err
}
// GetAllEvents retrieves all events, optionally with pagination
func (s *MetricsEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetAllEvents"))
defer timer.ObserveDuration()
events, err := s.eventStore.GetAllEvents(ctx, offset, limit)
if err == nil {
s.eventReadCount.WithLabelValues("GetAllEvents").Inc()
}
return events, err
}
// GetAggregateVersion returns the current version of an aggregate
func (s *MetricsEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetAggregateVersion"))
defer timer.ObserveDuration()
version, err := s.eventStore.GetAggregateVersion(ctx, aggregateID)
if err == nil {
s.eventReadCount.WithLabelValues("GetAggregateVersion").Inc()
}
return version, err
}
Deployment Architecture
A robust event-sourced system typically includes these components:
- Command API: Handles incoming commands and validates them
- Event Store: Persists events durably
- Event Bus: Distributes events to subscribers
- Projection Workers: Build and maintain read models
- Query API: Serves read models to clients
Here’s a diagram of a typical deployment architecture:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ │ │ │ │ │
│ Command │────▶│ Event │────▶│ Event │
│ API │ │ Store │ │ Bus │
│ │ │ │ │ │
└─────────────┘ └─────────────┘ └──────┬──────┘
│
▼
┌─────────────┐ ┌─────────────┐
│ │ │ │
│ Query │◀────│ Projection │
│ API │ │ Workers │
│ │ │ │
└─────────────┘ └─────────────┘
Performance Considerations
To ensure optimal performance in production:
- Batch Processing: Process events in batches for efficiency
- Caching: Cache aggregates and projections to reduce database load
- Asynchronous Projections: Update read models asynchronously
- Event Versioning: Handle schema evolution gracefully
- Monitoring: Track event store metrics and projection lag
Here’s an example of a batch event processor:
package eventsourcing
import (
"context"
"time"
)
// BatchEventProcessor processes events in batches
type BatchEventProcessor struct {
eventStore EventStore
eventHandler EventHandler
batchSize int
processingDelay time.Duration
lastEventID string
}
// NewBatchEventProcessor creates a new batch event processor
func NewBatchEventProcessor(
eventStore EventStore,
eventHandler EventHandler,
batchSize int,
processingDelay time.Duration,
) *BatchEventProcessor {
if batchSize <= 0 {
batchSize = 100
}
if processingDelay <= 0 {
processingDelay = 100 * time.Millisecond
}
return &BatchEventProcessor{
eventStore: eventStore,
eventHandler: eventHandler,
batchSize: batchSize,
processingDelay: processingDelay,
}
}
// Start begins processing events in batches
func (p *BatchEventProcessor) Start(ctx context.Context) error {
ticker := time.NewTicker(p.processingDelay)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := p.processBatch(ctx); err != nil {
// Log the error but continue processing
fmt.Printf("Error processing batch: %v\n", err)
}
}
}
}
// processBatch processes a single batch of events
func (p *BatchEventProcessor) processBatch(ctx context.Context) error {
// Get a batch of events after the last processed event
events, err := p.eventStore.GetEventsAfterID(ctx, p.lastEventID, p.batchSize)
if err != nil {
return err
}
// If no events, nothing to do
if len(events) == 0 {
return nil
}
// Process each event
for _, event := range events {
if err := p.eventHandler.HandleEvent(ctx, event); err != nil {
return err
}
// Update the last processed event ID
p.lastEventID = event.ID
}
return nil
}
Disaster Recovery
Event sourcing provides excellent disaster recovery capabilities:
- Event Store Backups: Regular backups of the event store
- Projection Rebuilding: Ability to rebuild projections from events
- Event Replay: Replay events to recover from data corruption
- Audit Trail: Complete history for forensic analysis
Implement a disaster recovery plan that includes:
- Regular backups of the event store
- Automated projection rebuilding
- Testing of recovery procedures
- Documentation of recovery processes