Event Sourcing Fundamentals
Before diving into implementation details, let’s establish a solid understanding of event sourcing concepts and how they differ from traditional data storage approaches.
Core Concepts
Event sourcing revolves around several key concepts:
-
Events: Immutable records of something that happened in the system. Events are always expressed in the past tense (e.g.,
OrderPlaced
,PaymentProcessed
). -
Aggregate: A cluster of domain objects that can be treated as a single unit. An aggregate has a root entity (the aggregate root) that enforces invariants for all objects within the aggregate.
-
Event Store: A specialized database that stores events as the source of truth, rather than the current state.
-
Projections: Read-optimized views built by consuming events and updating query-friendly data structures.
-
Commands: Requests for the system to perform an action, which may generate events if successful.
Let’s start by defining the basic structures for our event sourcing system:
package eventsourcing
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
)
// Event represents something that happened in the system
type Event struct {
ID string // Unique identifier for the event
AggregateID string // ID of the aggregate this event belongs to
AggregateType string // Type of the aggregate
EventType string // Type of the event
Version int // Version of the aggregate after this event
Timestamp time.Time // When the event occurred
Data json.RawMessage // Event payload data
Metadata json.RawMessage // Additional metadata about the event
}
// Command represents a request to change the system state
type Command struct {
ID string // Unique identifier for the command
AggregateID string // ID of the target aggregate
AggregateType string // Type of the target aggregate
CommandType string // Type of the command
Data json.RawMessage // Command payload data
Metadata json.RawMessage // Additional metadata about the command
}
// Aggregate is the base interface for all aggregates
type Aggregate interface {
ID() string
Type() string
Version() int
ApplyEvent(event Event) error
ApplyEvents(events []Event) error
}
// EventHandler processes events after they are stored
type EventHandler interface {
HandleEvent(ctx context.Context, event Event) error
}
// CommandHandler processes commands and produces events
type CommandHandler interface {
HandleCommand(ctx context.Context, command Command) ([]Event, error)
}
Event Sourcing vs. Traditional State Storage
To understand why event sourcing is valuable, let’s compare it with traditional state storage:
Aspect | Traditional Storage | Event Sourcing |
---|---|---|
Data Model | Current state only | Complete history of changes |
Updates | Destructive (overwrites previous state) | Non-destructive (appends new events) |
History | Limited or requires additional logging | Built-in, comprehensive |
Audit | Requires additional implementation | Inherent to the model |
Temporal Queries | Difficult or impossible | Natural and straightforward |
Debugging | Limited historical context | Complete historical context |
Complexity | Generally simpler | More complex initially |
Event sourcing shines in domains where:
- Historical data is as important as current state
- Audit trails are required for compliance or debugging
- Business processes evolve over time
- Complex domain logic benefits from capturing intent
Event Modeling
Before implementing event sourcing, it’s crucial to model your domain events properly. Events should:
- Be named in the past tense (e.g.,
UserRegistered
,OrderShipped
) - Contain all data relevant to the event
- Be immutable once created
- Be self-contained and understandable in isolation
Let’s model a simple e-commerce domain with events:
package ecommerce
import (
"encoding/json"
"time"
"github.com/google/uuid"
)
// Product-related events
type ProductCreated struct {
ProductID string `json:"product_id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
StockLevel int `json:"stock_level"`
}
type ProductPriceChanged struct {
ProductID string `json:"product_id"`
OldPrice float64 `json:"old_price"`
NewPrice float64 `json:"new_price"`
ChangeDate time.Time `json:"change_date"`
}
type ProductStockAdjusted struct {
ProductID string `json:"product_id"`
PreviousStock int `json:"previous_stock"`
NewStockLevel int `json:"new_stock_level"`
AdjustmentType string `json:"adjustment_type"` // "addition", "reduction", "inventory_correction"
}
// Order-related events
type OrderCreated struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
OrderDate time.Time `json:"order_date"`
TotalAmount float64 `json:"total_amount"`
}
type OrderItemAdded struct {
OrderID string `json:"order_id"`
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
UnitPrice float64 `json:"unit_price"`
}
type OrderShipped struct {
OrderID string `json:"order_id"`
ShippingDate time.Time `json:"shipping_date"`
TrackingNumber string `json:"tracking_number"`
EstimatedArrival time.Time `json:"estimated_arrival"`
}
type OrderCancelled struct {
OrderID string `json:"order_id"`
CancellationDate time.Time `json:"cancellation_date"`
Reason string `json:"reason"`
}
// Helper function to create a generic event from a specific event type
func CreateEvent(aggregateID, aggregateType string, version int, eventType string, data interface{}) (Event, error) {
dataBytes, err := json.Marshal(data)
if err != nil {
return Event{}, err
}
return Event{
ID: uuid.New().String(),
AggregateID: aggregateID,
AggregateType: aggregateType,
EventType: eventType,
Version: version,
Timestamp: time.Now().UTC(),
Data: dataBytes,
}, nil
}