Event Versioning and Migration

As your system evolves, event schemas will change. Event versioning and migration strategies are essential for maintaining compatibility with historical events.

Event Versioning Strategies

There are several approaches to event versioning:

  1. Explicit Versioning: Include a version field in the event type or payload
  2. Upcasting: Transform old event versions to new versions during loading
  3. Event Adapter Pattern: Use adapters to handle different event versions

Let’s implement an upcasting approach:

package eventsourcing

import (
	"encoding/json"
	"fmt"
)

// EventUpcaster transforms events from older versions to newer versions
type EventUpcaster interface {
	// CanUpcast returns true if this upcaster can handle the given event
	CanUpcast(event Event) bool
	
	// Upcast transforms the event to the latest version
	Upcast(event Event) (Event, error)
}

// UpcasterChain chains multiple upcasters together
type UpcasterChain struct {
	upcasters []EventUpcaster
}

// NewUpcasterChain creates a new upcaster chain
func NewUpcasterChain(upcasters ...EventUpcaster) *UpcasterChain {
	return &UpcasterChain{upcasters: upcasters}
}

// AddUpcaster adds an upcaster to the chain
func (c *UpcasterChain) AddUpcaster(upcaster EventUpcaster) {
	c.upcasters = append(c.upcasters, upcaster)
}

// Upcast transforms an event through all applicable upcasters
func (c *UpcasterChain) Upcast(event Event) (Event, error) {
	result := event
	
	for _, upcaster := range c.upcasters {
		if upcaster.CanUpcast(result) {
			var err error
			result, err = upcaster.Upcast(result)
			if err != nil {
				return Event{}, fmt.Errorf("upcasting failed: %w", err)
			}
		}
	}
	
	return result, nil
}

// Example upcaster for ProductCreated events
type ProductCreatedV1ToV2Upcaster struct{}

func (u *ProductCreatedV1ToV2Upcaster) CanUpcast(event Event) bool {
	return event.EventType == "ProductCreated" && !hasField(event.Data, "category")
}

func (u *ProductCreatedV1ToV2Upcaster) Upcast(event Event) (Event, error) {
	// Parse the original data
	var v1Data map[string]interface{}
	if err := json.Unmarshal(event.Data, &v1Data); err != nil {
		return Event{}, err
	}
	
	// Add the missing field with a default value
	v1Data["category"] = "uncategorized"
	
	// Serialize back to JSON
	newData, err := json.Marshal(v1Data)
	if err != nil {
		return Event{}, err
	}
	
	// Create the updated event
	result := event
	result.Data = json.RawMessage(newData)
	
	return result, nil
}

// Helper function to check if a JSON object has a field
func hasField(data json.RawMessage, field string) bool {
	var obj map[string]interface{}
	if err := json.Unmarshal(data, &obj); err != nil {
		return false
	}
	_, exists := obj[field]
	return exists
}

Integrating Upcasters with Event Store

Now let’s update our event store to use upcasters:

package eventsourcing

import (
	"context"
)

// UpcasterEventStore wraps an EventStore and applies upcasting
type UpcasterEventStore struct {
	eventStore EventStore
	upcaster   *UpcasterChain
}

// NewUpcasterEventStore creates a new upcaster event store
func NewUpcasterEventStore(eventStore EventStore, upcaster *UpcasterChain) *UpcasterEventStore {
	return &UpcasterEventStore{
		eventStore: eventStore,
		upcaster:   upcaster,
	}
}

// SaveEvents persists new events to the store
func (s *UpcasterEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
	// Pass through to the underlying store
	return s.eventStore.SaveEvents(ctx, aggregateID, expectedVersion, events)
}

// GetEvents retrieves all events for a specific aggregate
func (s *UpcasterEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
	// Get events from the underlying store
	events, err := s.eventStore.GetEvents(ctx, aggregateID)
	if err != nil {
		return nil, err
	}
	
	// Upcast each event
	result := make([]Event, len(events))
	for i, event := range events {
		upcasted, err := s.upcaster.Upcast(event)
		if err != nil {
			return nil, err
		}
		result[i] = upcasted
	}
	
	return result, nil
}

// GetEventsByType retrieves events of a specific type
func (s *UpcasterEventStore) GetEventsByType(ctx context.Context, eventType string) ([]Event, error) {
	// Get events from the underlying store
	events, err := s.eventStore.GetEventsByType(ctx, eventType)
	if err != nil {
		return nil, err
	}
	
	// Upcast each event
	result := make([]Event, len(events))
	for i, event := range events {
		upcasted, err := s.upcaster.Upcast(event)
		if err != nil {
			return nil, err
		}
		result[i] = upcasted
	}
	
	return result, nil
}

// GetAllEvents retrieves all events, optionally with pagination
func (s *UpcasterEventStore) GetAllEvents(ctx context.Context, offset, limit int) ([]Event, error) {
	// Get events from the underlying store
	events, err := s.eventStore.GetAllEvents(ctx, offset, limit)
	if err != nil {
		return nil, err
	}
	
	// Upcast each event
	result := make([]Event, len(events))
	for i, event := range events {
		upcasted, err := s.upcaster.Upcast(event)
		if err != nil {
			return nil, err
		}
		result[i] = upcasted
	}
	
	return result, nil
}

// GetAggregateVersion returns the current version of an aggregate
func (s *UpcasterEventStore) GetAggregateVersion(ctx context.Context, aggregateID string) (int, error) {
	// Pass through to the underlying store
	return s.eventStore.GetAggregateVersion(ctx, aggregateID)
}

// GetEventsAfterVersion retrieves events after a specific version
func (s *UpcasterEventStore) GetEventsAfterVersion(ctx context.Context, aggregateID string, version int) ([]Event, error) {
	// Get events from the underlying store
	events, err := s.eventStore.GetEventsAfterVersion(ctx, aggregateID, version)
	if err != nil {
		return nil, err
	}
	
	// Upcast each event
	result := make([]Event, len(events))
	for i, event := range events {
		upcasted, err := s.upcaster.Upcast(event)
		if err != nil {
			return nil, err
		}
		result[i] = upcasted
	}
	
	return result, nil
}