Event Sourcing Fundamentals

Before diving into implementation details, let’s establish a solid understanding of event sourcing concepts and how they differ from traditional data storage approaches.

Core Concepts

Event sourcing revolves around several key concepts:

  1. Events: Immutable records of something that happened in the system. Events are always expressed in the past tense (e.g., OrderPlaced, PaymentProcessed).

  2. Aggregate: A cluster of domain objects that can be treated as a single unit. An aggregate has a root entity (the aggregate root) that enforces invariants for all objects within the aggregate.

  3. Event Store: A specialized database that stores events as the source of truth, rather than the current state.

  4. Projections: Read-optimized views built by consuming events and updating query-friendly data structures.

  5. Commands: Requests for the system to perform an action, which may generate events if successful.

Let’s start by defining the basic structures for our event sourcing system:

package eventsourcing

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/google/uuid"
)

// Event represents something that happened in the system
type Event struct {
	ID            string          // Unique identifier for the event
	AggregateID   string          // ID of the aggregate this event belongs to
	AggregateType string          // Type of the aggregate
	EventType     string          // Type of the event
	Version       int             // Version of the aggregate after this event
	Timestamp     time.Time       // When the event occurred
	Data          json.RawMessage // Event payload data
	Metadata      json.RawMessage // Additional metadata about the event
}

// Command represents a request to change the system state
type Command struct {
	ID            string          // Unique identifier for the command
	AggregateID   string          // ID of the target aggregate
	AggregateType string          // Type of the target aggregate
	CommandType   string          // Type of the command
	Data          json.RawMessage // Command payload data
	Metadata      json.RawMessage // Additional metadata about the command
}

// Aggregate is the base interface for all aggregates
type Aggregate interface {
	ID() string
	Type() string
	Version() int
	ApplyEvent(event Event) error
	ApplyEvents(events []Event) error
}

// EventHandler processes events after they are stored
type EventHandler interface {
	HandleEvent(ctx context.Context, event Event) error
}

// CommandHandler processes commands and produces events
type CommandHandler interface {
	HandleCommand(ctx context.Context, command Command) ([]Event, error)
}

Event Sourcing vs. Traditional State Storage

To understand why event sourcing is valuable, let’s compare it with traditional state storage:

Aspect Traditional Storage Event Sourcing
Data Model Current state only Complete history of changes
Updates Destructive (overwrites previous state) Non-destructive (appends new events)
History Limited or requires additional logging Built-in, comprehensive
Audit Requires additional implementation Inherent to the model
Temporal Queries Difficult or impossible Natural and straightforward
Debugging Limited historical context Complete historical context
Complexity Generally simpler More complex initially

Event sourcing shines in domains where:

  • Historical data is as important as current state
  • Audit trails are required for compliance or debugging
  • Business processes evolve over time
  • Complex domain logic benefits from capturing intent

Event Modeling

Before implementing event sourcing, it’s crucial to model your domain events properly. Events should:

  1. Be named in the past tense (e.g., UserRegistered, OrderShipped)
  2. Contain all data relevant to the event
  3. Be immutable once created
  4. Be self-contained and understandable in isolation

Let’s model a simple e-commerce domain with events:

package ecommerce

import (
	"encoding/json"
	"time"

	"github.com/google/uuid"
)

// Product-related events
type ProductCreated struct {
	ProductID   string  `json:"product_id"`
	Name        string  `json:"name"`
	Description string  `json:"description"`
	Price       float64 `json:"price"`
	StockLevel  int     `json:"stock_level"`
}

type ProductPriceChanged struct {
	ProductID  string  `json:"product_id"`
	OldPrice   float64 `json:"old_price"`
	NewPrice   float64 `json:"new_price"`
	ChangeDate time.Time `json:"change_date"`
}

type ProductStockAdjusted struct {
	ProductID      string `json:"product_id"`
	PreviousStock  int    `json:"previous_stock"`
	NewStockLevel  int    `json:"new_stock_level"`
	AdjustmentType string `json:"adjustment_type"` // "addition", "reduction", "inventory_correction"
}

// Order-related events
type OrderCreated struct {
	OrderID     string    `json:"order_id"`
	CustomerID  string    `json:"customer_id"`
	OrderDate   time.Time `json:"order_date"`
	TotalAmount float64   `json:"total_amount"`
}

type OrderItemAdded struct {
	OrderID   string  `json:"order_id"`
	ProductID string  `json:"product_id"`
	Quantity  int     `json:"quantity"`
	UnitPrice float64 `json:"unit_price"`
}

type OrderShipped struct {
	OrderID          string    `json:"order_id"`
	ShippingDate     time.Time `json:"shipping_date"`
	TrackingNumber   string    `json:"tracking_number"`
	EstimatedArrival time.Time `json:"estimated_arrival"`
}

type OrderCancelled struct {
	OrderID       string    `json:"order_id"`
	CancellationDate time.Time `json:"cancellation_date"`
	Reason        string    `json:"reason"`
}

// Helper function to create a generic event from a specific event type
func CreateEvent(aggregateID, aggregateType string, version int, eventType string, data interface{}) (Event, error) {
	dataBytes, err := json.Marshal(data)
	if err != nil {
		return Event{}, err
	}
	
	return Event{
		ID:            uuid.New().String(),
		AggregateID:   aggregateID,
		AggregateType: aggregateType,
		EventType:     eventType,
		Version:       version,
		Timestamp:     time.Now().UTC(),
		Data:          dataBytes,
	}, nil
}