Production Deployment and Monitoring
Deploying event-sourced systems to production requires careful consideration of performance, scalability, and observability. Let’s explore key aspects of running event sourcing in production.
Event Store Scaling
Event stores must scale to handle high throughput and growing event history. Consider these strategies:
- Partitioning: Divide events by aggregate type or ID to distribute load
- Read Replicas: Use separate read-only replicas for projections
- Event Pruning: Archive old events that are no longer needed for active aggregates
Here’s an example of a partitioned event store:
package eventsourcing
import (
"context"
"fmt"
"hash/fnv"
)
// PartitionedEventStore distributes events across multiple event stores
type PartitionedEventStore struct {
partitions []EventStore
numPartitions int
}
// NewPartitionedEventStore creates a new partitioned event store
func NewPartitionedEventStore(partitions []EventStore) *PartitionedEventStore {
return &PartitionedEventStore{
partitions: partitions,
numPartitions: len(partitions),
}
}
// getPartition determines which partition to use for a given aggregate ID
func (s *PartitionedEventStore) getPartition(aggregateID string) (EventStore, error) {
if s.numPartitions == 0 {
return nil, errors.New("no partitions available")
}
// Hash the aggregate ID to determine the partition
h := fnv.New32a()
h.Write([]byte(aggregateID))
partitionIndex := int(h.Sum32()) % s.numPartitions
return s.partitions[partitionIndex], nil
}
// SaveEvents persists new events to the appropriate partition
func (s *PartitionedEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
partition, err := s.getPartition(aggregateID)
if err != nil {
return err
}
return partition.SaveEvents(ctx, aggregateID, expectedVersion, events)
}
// GetEvents retrieves all events for a specific aggregate from the appropriate partition
func (s *PartitionedEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
partition, err := s.getPartition(aggregateID)
if err != nil {
return nil, err
}
return partition.GetEvents(ctx, aggregateID)
}
// GetEventsByType retrieves events of a specific type from all partitions
func (s *PartitionedEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
var allEvents []Event
// Query each partition
for _, partition := range s.partitions {
events, err := partition.GetEventsByType(ctx, eventType)
if err != nil {
return nil, err
}
allEvents = append(allEvents, events...)
}
// Sort events by timestamp
sort.Slice(allEvents, func(i, j int) bool {
return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
})
return allEvents, nil
}
// GetAllEvents retrieves all events from all partitions
func (s *PartitionedEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
// This is inefficient for large event stores
// In a real implementation, you would need a more sophisticated approach
var allEvents []Event
// Query each partition
for _, partition := range s.partitions {
events, err := partition.GetAllEvents(ctx, 0, offset+limit)
if err != nil {
return nil, err
}
allEvents = append(allEvents, events...)
}
// Sort events by timestamp
sort.Slice(allEvents, func(i, j int) bool {
return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
})
// Apply pagination
if offset >= len(allEvents) {
return []Event{}, nil
}
end := offset + limit
if end > len(allEvents) {
end = len(allEvents)
}
return allEvents[offset:end], nil
}
// GetAggregateVersion returns the current version of an aggregate
func (s *PartitionedEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
partition, err := s.getPartition(aggregateID)
if err != nil {
return 0, err
}
return partition.GetAggregateVersion(ctx, aggregateID)
}
Monitoring and Metrics
Effective monitoring is crucial for event-sourced systems. Here’s a metrics collector for event stores:
package eventsourcing
import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// MetricsEventStore wraps an EventStore and collects metrics
type MetricsEventStore struct {
eventStore EventStore
// Metrics
eventSaveCount *prometheus.CounterVec
eventSaveDuration *prometheus.HistogramVec
eventReadCount *prometheus.CounterVec
eventReadDuration *prometheus.HistogramVec
}
// NewMetricsEventStore creates a new metrics-collecting event store
func NewMetricsEventStore(eventStore EventStore, registry *prometheus.Registry) *MetricsEventStore {
store := &MetricsEventStore{
eventStore: eventStore,
eventSaveCount: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "event_store_save_total",
Help: "Total number of events saved",
},
[]string{"aggregate_type"},
),
eventSaveDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "event_store_save_duration_seconds",
Help: "Duration of event save operations",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
},
[]string{"aggregate_type"},
),
eventReadCount: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "event_store_read_total",
Help: "Total number of event read operations",
},
[]string{"operation"},
),
eventReadDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "event_store_read_duration_seconds",
Help: "Duration of event read operations",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
},
[]string{"operation"},
),
}
// Register metrics with Prometheus
registry.MustRegister(
store.eventSaveCount,
store.eventSaveDuration,
store.eventReadCount,
store.eventReadDuration,
)
return store
}
// SaveEvents persists new events to the store
func (s *MetricsEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
if len(events) == 0 {
return nil
}
// Get the aggregate type from the first event
aggregateType := events[0].AggregateType
// Record metrics
timer := prometheus.NewTimer(s.eventSaveDuration.WithLabelValues(aggregateType))
defer timer.ObserveDuration()
err := s.eventStore.SaveEvents(ctx, aggregateID, expectedVersion, events)
if err == nil {
s.eventSaveCount.WithLabelValues(aggregateType).Add(float64(len(events)))
}
return err
}
// GetEvents retrieves all events for a specific aggregate
func (s *MetricsEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetEvents"))
defer timer.ObserveDuration()
events, err := s.eventStore.GetEvents(ctx, aggregateID)
if err == nil {
s.eventReadCount.WithLabelValues("GetEvents").Inc()
}
return events, err
}
// GetEventsByType retrieves events of a specific type
func (s *MetricsEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetEventsByType"))
defer timer.ObserveDuration()
events, err := s.eventStore.GetEventsByType(ctx, eventType)
if err == nil {
s.eventReadCount.WithLabelValues("GetEventsByType").Inc()
}
return events, err
}
// GetAllEvents retrieves all events, optionally with pagination
func (s *MetricsEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetAllEvents"))
defer timer.ObserveDuration()
events, err := s.eventStore.GetAllEvents(ctx, offset, limit)
if err == nil {
s.eventReadCount.WithLabelValues("GetAllEvents").Inc()
}
return events, err
}
// GetAggregateVersion returns the current version of an aggregate
func (s *MetricsEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
timer := prometheus.NewTimer(s.eventReadDuration.WithLabelValues("GetAggregateVersion"))
defer timer.ObserveDuration()
version, err := s.eventStore.GetAggregateVersion(ctx, aggregateID)
if err == nil {
s.eventReadCount.WithLabelValues("GetAggregateVersion").Inc()
}
return version, err
}
Deployment Architecture
A robust event-sourced system typically includes these components:
- Command API: Handles incoming commands and validates them
- Event Store: Persists events durably
- Event Bus: Distributes events to subscribers
- Projection Workers: Build and maintain read models
- Query API: Serves read models to clients
Here’s a diagram of a typical deployment architecture:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ │ │ │ │ │
│ Command │────▶│ Event │────▶│ Event │
│ API │ │ Store │ │ Bus │
│ │ │ │ │ │
└─────────────┘ └─────────────┘ └──────┬──────┘
│
▼
┌─────────────┐ ┌─────────────┐
│ │ │ │
│ Query │◀────│ Projection │
│ API │ │ Workers │
│ │ │ │
└─────────────┘ └─────────────┘
Performance Considerations
To ensure optimal performance in production:
- Batch Processing: Process events in batches for efficiency
- Caching: Cache aggregates and projections to reduce database load
- Asynchronous Projections: Update read models asynchronously
- Event Versioning: Handle schema evolution gracefully
- Monitoring: Track event store metrics and projection lag
Here’s an example of a batch event processor:
package eventsourcing
import (
"context"
"time"
)
// BatchEventProcessor processes events in batches
type BatchEventProcessor struct {
eventStore EventStore
eventHandler EventHandler
batchSize int
processingDelay time.Duration
lastEventID string
}
// NewBatchEventProcessor creates a new batch event processor
func NewBatchEventProcessor(
eventStore EventStore,
eventHandler EventHandler,
batchSize int,
processingDelay time.Duration,
) *BatchEventProcessor {
if batchSize <= 0 {
batchSize = 100
}
if processingDelay <= 0 {
processingDelay = 100 * time.Millisecond
}
return &BatchEventProcessor{
eventStore: eventStore,
eventHandler: eventHandler,
batchSize: batchSize,
processingDelay: processingDelay,
}
}
// Start begins processing events in batches
func (p *BatchEventProcessor) Start(ctx context.Context) error {
ticker := time.NewTicker(p.processingDelay)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := p.processBatch(ctx); err != nil {
// Log the error but continue processing
fmt.Printf("Error processing batch: %v\n", err)
}
}
}
}
// processBatch processes a single batch of events
func (p *BatchEventProcessor) processBatch(ctx context.Context) error {
// Get a batch of events after the last processed event
events, err := p.eventStore.GetEventsAfterID(ctx, p.lastEventID, p.batchSize)
if err != nil {
return err
}
// If no events, nothing to do
if len(events) == 0 {
return nil
}
// Process each event
for _, event := range events {
if err := p.eventHandler.HandleEvent(ctx, event); err != nil {
return err
}
// Update the last processed event ID
p.lastEventID = event.ID
}
return nil
}
Disaster Recovery
Event sourcing provides excellent disaster recovery capabilities:
- Event Store Backups: Regular backups of the event store
- Projection Rebuilding: Ability to rebuild projections from events
- Event Replay: Replay events to recover from data corruption
- Audit Trail: Complete history for forensic analysis
Implement a disaster recovery plan that includes:
- Regular backups of the event store
- Automated projection rebuilding
- Testing of recovery procedures
- Documentation of recovery processes
Key Takeaways
Event sourcing offers a powerful approach to building scalable, auditable, and flexible systems. By storing every state change as an immutable event, you gain a complete history of your system’s evolution, enabling advanced capabilities like temporal queries, audit trails, and robust recovery mechanisms.
In this guide, we’ve explored the implementation of event sourcing in Go, covering:
- Core Concepts: Events, aggregates, and the event store
- CQRS Integration: Separating read and write models for optimal performance
- Projections: Building specialized read models from events
- Snapshotting: Optimizing aggregate loading with periodic state captures
- Event Versioning: Handling schema evolution over time
- Saga Pattern: Coordinating distributed transactions
- Production Deployment: Scaling, monitoring, and disaster recovery
While event sourcing introduces additional complexity compared to traditional state-based storage, the benefits it provides—especially for domains where history and auditability are crucial—make it a valuable pattern in your architectural toolkit.
As you implement event sourcing in your Go applications, remember that it’s not an all-or-nothing approach. You can start with a single bounded context or aggregate type, gradually expanding as you become more comfortable with the pattern. Focus on modeling your domain events carefully, as they form the foundation of your event-sourced system and will likely outlive any particular implementation of your aggregates or projections.
By combining event sourcing with other patterns like CQRS and sagas, you can build highly scalable, resilient systems that adapt well to changing business requirements while maintaining a complete and accurate record of your domain’s history.