Production Deployment and Monitoring

Deploying event-sourced systems to production requires careful consideration of performance, scalability, and observability. Let’s explore key aspects of running event sourcing in production.

Event Store Scaling

Event stores must scale to handle high throughput and growing event history. Consider these strategies:

  1. Partitioning: Divide events by aggregate type or ID to distribute load
  2. Read Replicas: Use separate read-only replicas for projections
  3. Event Pruning: Archive old events that are no longer needed for active aggregates

Here’s an example of a partitioned event store:

package eventsourcing

import (
	"context"
	"fmt"
	"hash/fnv"
)

// PartitionedEventStore distributes events across multiple event stores
type PartitionedEventStore struct {
	partitions []EventStore
	numPartitions int
}

// NewPartitionedEventStore creates a new partitioned event store
func NewPartitionedEventStore(partitions []EventStore) *PartitionedEventStore {
	return &PartitionedEventStore{
		partitions:    partitions,
		numPartitions: len(partitions),
	}
}

// getPartition determines which partition to use for a given aggregate ID
func (s *PartitionedEventStore) getPartition(aggregateID string) (EventStore, error) {
	if s.numPartitions == 0 {
		return nil, errors.New("no partitions available")
	}
	
	// Hash the aggregate ID to determine the partition
	h := fnv.New32a()
	h.Write([]byte(aggregateID))
	partitionIndex := int(h.Sum32()) % s.numPartitions
	
	return s.partitions[partitionIndex], nil
}

// SaveEvents persists new events to the appropriate partition
func (s *PartitionedEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
	partition, err := s.getPartition(aggregateID)
	if err != nil {
		return err
	}
	
	return partition.SaveEvents(ctx, aggregateID, expectedVersion, events)
}

// GetEvents retrieves all events for a specific aggregate from the appropriate partition
func (s *PartitionedEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
	partition, err := s.getPartition(aggregateID)
	if err != nil {
		return nil, err
	}
	
	return partition.GetEvents(ctx, aggregateID)
}

// GetEventsByType retrieves events of a specific type from all partitions
func (s *PartitionedEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
	var allEvents []Event
	
	// Query each partition
	for _, partition := range s.partitions {
		events, err := partition.GetEventsByType(ctx, eventType)
		if err != nil {
			return nil, err
		}
		
		allEvents = append(allEvents, events...)
	}
	
	// Sort events by timestamp
	sort.Slice(allEvents, func(i, j int) bool {
		return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
	})
	
	return allEvents, nil
}

// GetAllEvents retrieves all events from all partitions
func (s *PartitionedEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
	// This is inefficient for large event stores
	// In a real implementation, you would need a more sophisticated approach
	var allEvents []Event
	
	// Query each partition
	for _, partition := range s.partitions {
		events, err := partition.GetAllEvents(ctx, 0, offset+limit)
		if err != nil {
			return nil, err
		}
		
		allEvents = append(allEvents, events...)
	}
	
	// Sort events by timestamp
	sort.Slice(allEvents, func(i, j int) bool {
		return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
	})
	
	// Apply pagination
	if offset >= len(allEvents) {
		return []Event{}, nil
	}
	
	end := offset + limit
	if end > len(allEvents) {
		end = len(allEvents)
	}
	
	return allEvents[offset:end], nil
}

// GetAggregateVersion returns the current version of an aggregate
func (s *PartitionedEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
	partition, err := s.getPartition(aggregateID)
	if err != nil {
		return 0, err
	}
	
	return partition.GetAggregateVersion(ctx, aggregateID)
}

Monitoring and Metrics

Effective monitoring is crucial for event-sourced systems. Here’s a metrics collector for event stores:

package eventsourcing

import (
	"context"
	"time"

	"github.com/prometheus/client_golang/prometheus"
)

// MetricsEventStore wraps an EventStore and collects metrics
type MetricsEventStore struct {
	eventStore EventStore
	
	// Metrics
	eventSaveCount    *prometheus.CounterVec
	eventSaveDuration *prometheus.HistogramVec
	eventReadCount    *prometheus.CounterVec
	eventReadDuration *prometheus.HistogramVec
}

// NewMetricsEventStore creates a new metrics-collecting event store
func NewMetricsEventStore(eventStore EventStore, registry *prometheus.Registry) *MetricsEventStore {
	store := &MetricsEventStore{
		eventStore: eventStore,
		
		eventSaveCount: prometheus.NewCounterVec(
			prometheus.CounterOpts{
				Name: "event_store_save_total",
				Help: "Total number of events saved",
			},
			[]string{"aggregate_type"},
		),
		
		eventSaveDuration: prometheus.NewHistogramVec(
			prometheus.HistogramOpts{
				Name:    "event_store_save_duration_seconds",
				Help:    "Duration of event save operations",
				Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
			},
			[]string{"aggregate_type"},
		),
		
		eventReadCount: prometheus.NewCounterVec(
			prometheus.CounterOpts{
				Name: "event_store_read_total",
				Help: "Total number of event read operations",
			},
			[]string{"operation"},
		),
		
		eventReadDuration: prometheus.NewHistogramVec(
			prometheus.HistogramOpts{
				Name:    "event_store_read_duration_seconds",
				Help:    "Duration of event read operations",
				Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
			},
			[]string{"operation"},
		),
	}
	
	// Register metrics with Prometheus
	registry.MustRegister(
		store.eventSaveCount,
		store.eventSaveDuration,
		store.eventReadCount,
		store.eventReadDuration,
	)
	
	return store
}

// SaveEvents persists new events to the store
func (s *MetricsEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
	if len(events) == 0 {
		return nil
	}
	
	// Get the aggregate type from the first event
	aggregateType := events[0].AggregateType
	
	// Record metrics
	timer := prometheus.NewTimer(s.eventSaveDuration.WithLabelValues(aggregateType))
	defer timer.ObserveDuration()
	
	err := s.eventStore.SaveEvents(ctx, aggregateID, expectedVersion, events)
	
	if err == nil {
		s.eventSaveCount.WithLabelValues(aggregateType).Add(float64(len(events)))
	}
	
	return err
}

// GetEvents retrieves all events for a specific aggregate
func (s *MetricsEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
	timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetEvents"))
	defer timer.ObserveDuration()
	
	events, err := s.eventStore.GetEvents(ctx, aggregateID)
	
	if err == nil {
		s.eventReadCount.WithLabelValues("GetEvents").Inc()
	}
	
	return events, err
}

// GetEventsByType retrieves events of a specific type
func (s *MetricsEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
	timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetEventsByType"))
	defer timer.ObserveDuration()
	
	events, err := s.eventStore.GetEventsByType(ctx, eventType)
	
	if err == nil {
		s.eventReadCount.WithLabelValues("GetEventsByType").Inc()
	}
	
	return events, err
}

// GetAllEvents retrieves all events, optionally with pagination
func (s *MetricsEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
	timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetAllEvents"))
	defer timer.ObserveDuration()
	
	events, err := s.eventStore.GetAllEvents(ctx, offset, limit)
	
	if err == nil {
		s.eventReadCount.WithLabelValues("GetAllEvents").Inc()
	}
	
	return events, err
}

// GetAggregateVersion returns the current version of an aggregate
func (s *MetricsEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
	timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetAggregateVersion"))
	defer timer.ObserveDuration()
	
	version, err := s.eventStore.GetAggregateVersion(ctx, aggregateID)
	
	if err == nil {
		s.eventReadCount.WithLabelValues("GetAggregateVersion").Inc()
	}
	
	return version, err
}

Deployment Architecture

A robust event-sourced system typically includes these components:

  1. Command API: Handles incoming commands and validates them
  2. Event Store: Persists events durably
  3. Event Bus: Distributes events to subscribers
  4. Projection Workers: Build and maintain read models
  5. Query API: Serves read models to clients

Here’s a diagram of a typical deployment architecture:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│             │     │             │     │             │
│  Command    │────▶│   Event     │────▶│   Event     │
│    API      │     │   Store     │     │    Bus      │
│             │     │             │     │             │
└─────────────┘     └─────────────┘     └──────┬──────┘
                                               │
                                               ▼
                    ┌─────────────┐     ┌─────────────┐
                    │             │     │             │
                    │   Query     │◀────│ Projection  │
                    │    API      │     │  Workers    │
                    │             │     │             │
                    └─────────────┘     └─────────────┘

Performance Considerations

To ensure optimal performance in production:

  1. Batch Processing: Process events in batches for efficiency
  2. Caching: Cache aggregates and projections to reduce database load
  3. Asynchronous Projections: Update read models asynchronously
  4. Event Versioning: Handle schema evolution gracefully
  5. Monitoring: Track event store metrics and projection lag

Here’s an example of a batch event processor:

package eventsourcing

import (
	"context"
	"time"
)

// BatchEventProcessor processes events in batches
type BatchEventProcessor struct {
	eventStore      EventStore
	eventHandler    EventHandler
	batchSize       int
	processingDelay time.Duration
	lastEventID     string
}

// NewBatchEventProcessor creates a new batch event processor
func NewBatchEventProcessor(
	eventStore EventStore,
	eventHandler EventHandler,
	batchSize int,
	processingDelay time.Duration,
) *BatchEventProcessor {
	if batchSize <= 0 {
		batchSize = 100
	}
	
	if processingDelay <= 0 {
		processingDelay = 100 * time.Millisecond
	}
	
	return &BatchEventProcessor{
		eventStore:      eventStore,
		eventHandler:    eventHandler,
		batchSize:       batchSize,
		processingDelay: processingDelay,
	}
}

// Start begins processing events in batches
func (p *BatchEventProcessor) Start(ctx context.Context) error {
	ticker := time.NewTicker(p.processingDelay)
	defer ticker.Stop()
	
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-ticker.C:
			if err := p.processBatch(ctx); err != nil {
				// Log the error but continue processing
				fmt.Printf("Error processing batch: %v\n", err)
			}
		}
	}
}

// processBatch processes a single batch of events
func (p *BatchEventProcessor) processBatch(ctx context.Context) error {
	// Get a batch of events after the last processed event
	events, err := p.eventStore.GetEventsAfterID(ctx, p.lastEventID, p.batchSize)
	if err != nil {
		return err
	}
	
	// If no events, nothing to do
	if len(events) == 0 {
		return nil
	}
	
	// Process each event
	for _, event := range events {
		if err := p.eventHandler.HandleEvent(ctx, event); err != nil {
			return err
		}
		
		// Update the last processed event ID
		p.lastEventID = event.ID
	}
	
	return nil
}

Disaster Recovery

Event sourcing provides excellent disaster recovery capabilities:

  1. Event Store Backups: Regular backups of the event store
  2. Projection Rebuilding: Ability to rebuild projections from events
  3. Event Replay: Replay events to recover from data corruption
  4. Audit Trail: Complete history for forensic analysis

Implement a disaster recovery plan that includes:

  1. Regular backups of the event store
  2. Automated projection rebuilding
  3. Testing of recovery procedures
  4. Documentation of recovery processes

Key Takeaways

Event sourcing offers a powerful approach to building scalable, auditable, and flexible systems. By storing every state change as an immutable event, you gain a complete history of your system’s evolution, enabling advanced capabilities like temporal queries, audit trails, and robust recovery mechanisms.

In this guide, we’ve explored the implementation of event sourcing in Go, covering:

  1. Core Concepts: Events, aggregates, and the event store
  2. CQRS Integration: Separating read and write models for optimal performance
  3. Projections: Building specialized read models from events
  4. Snapshotting: Optimizing aggregate loading with periodic state captures
  5. Event Versioning: Handling schema evolution over time
  6. Saga Pattern: Coordinating distributed transactions
  7. Production Deployment: Scaling, monitoring, and disaster recovery

While event sourcing introduces additional complexity compared to traditional state-based storage, the benefits it provides—especially for domains where history and auditability are crucial—make it a valuable pattern in your architectural toolkit.

As you implement event sourcing in your Go applications, remember that it’s not an all-or-nothing approach. You can start with a single bounded context or aggregate type, gradually expanding as you become more comfortable with the pattern. Focus on modeling your domain events carefully, as they form the foundation of your event-sourced system and will likely outlive any particular implementation of your aggregates or projections.

By combining event sourcing with other patterns like CQRS and sagas, you can build highly scalable, resilient systems that adapt well to changing business requirements while maintaining a complete and accurate record of your domain’s history.