Saga Pattern Implementation

In distributed systems, transactions that span multiple services present a significant challenge. The Saga pattern addresses this by breaking a distributed transaction into a sequence of local transactions, each with a compensating action to handle failures.

Saga Fundamentals

A saga is a sequence of local transactions where:

  1. Each local transaction updates a single service
  2. Each local transaction publishes an event to trigger the next transaction
  3. If a transaction fails, compensating transactions undo the changes

Let’s implement a saga framework for our e-commerce system:

package eventsourcing

import (
	"context"
	"errors"
	"fmt"
	"sync"
)

// SagaStep represents a single step in a saga
type SagaStep struct {
	// Action to perform for this step
	Action func(ctx context.Context, data interface{}) error
	
	// Compensation to run if the saga needs to be rolled back
	Compensation func(ctx context.Context, data interface{}) error
}

// Saga coordinates a sequence of steps that should be executed as a unit
type Saga struct {
	name  string
	steps []SagaStep
	data  interface{}
	mu    sync.Mutex
	
	// Track which steps have been executed
	executedSteps int
}

// NewSaga creates a new saga with the given name
func NewSaga(name string, data interface{}) *Saga {
	return &Saga{
		name:  name,
		data:  data,
		steps: []SagaStep{},
	}
}

// AddStep adds a step to the saga
func (s *Saga) AddStep(action, compensation func(ctx context.Context, data interface{}) error) {
	s.steps = append(s.steps, SagaStep{
		Action:       action,
		Compensation: compensation,
	})
}

// Execute runs the saga, executing each step in sequence
func (s *Saga) Execute(ctx context.Context) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	
	// Start from where we left off (useful for resuming sagas)
	for i := s.executedSteps; i < len(s.steps); i++ {
		step := s.steps[i]
		
		// Execute the step's action
		if err := step.Action(ctx, s.data); err != nil {
			// If the action fails, compensate
			return s.compensate(ctx, i)
		}
		
		// Mark this step as executed
		s.executedSteps = i + 1
	}
	
	return nil
}

// compensate runs the compensation functions for executed steps in reverse order
func (s *Saga) compensate(ctx context.Context, failedStepIndex int) error {
	var compensationErr error
	
	// Run compensations in reverse order
	for i := failedStepIndex - 1; i >= 0; i-- {
		step := s.steps[i]
		
		if step.Compensation != nil {
			if err := step.Compensation(ctx, s.data); err != nil {
				// Log compensation errors but continue with other compensations
				compensationErr = errors.Join(compensationErr,
					fmt.Errorf("compensation failed for step %d: %w", i, err))
			}
		}
	}
	
	return compensationErr
}

Order Processing Saga Example

Let’s implement a concrete saga for processing orders in our e-commerce system:

package ecommerce

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/yourdomain/eventsourcing"
)

// OrderData contains all the data needed for the order processing saga
type OrderData struct {
	OrderID       string
	CustomerID    string
	Items         []OrderItem
	TotalAmount   float64
	PaymentID     string
	ShipmentID    string
	InventoryLock string
}

type OrderItem struct {
	ProductID string
	Quantity  int
	UnitPrice float64
}

// OrderProcessingSaga coordinates the order processing workflow
type OrderProcessingSaga struct {
	saga          *eventsourcing.Saga
	eventStore    eventsourcing.EventStore
	commandBus    CommandBus
	orderData     *OrderData
}

// NewOrderProcessingSaga creates a new order processing saga
func NewOrderProcessingSaga(
	orderData *OrderData,
	eventStore eventsourcing.EventStore,
	commandBus CommandBus,
) *OrderProcessingSaga {
	s := &OrderProcessingSaga{
		orderData:  orderData,
		eventStore: eventStore,
		commandBus: commandBus,
	}
	
	// Create the underlying saga
	s.saga = eventsourcing.NewSaga("OrderProcessing", orderData)
	
	// Define the saga steps
	s.defineSagaSteps()
	
	return s
}

// defineSagaSteps sets up the steps for the order processing saga
func (s *OrderProcessingSaga) defineSagaSteps() {
	// Step 1: Reserve inventory
	s.saga.AddStep(
		// Action
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Create a command to reserve inventory
			cmd := ReserveInventoryCommand{
				OrderID: orderData.OrderID,
				Items:   orderData.Items,
			}
			
			// Send the command
			result, err := s.commandBus.Send(ctx, "inventory", "ReserveInventory", cmd)
			if err != nil {
				return fmt.Errorf("failed to reserve inventory: %w", err)
			}
			
			// Store the inventory lock ID
			orderData.InventoryLock = result.(string)
			return nil
		},
		// Compensation
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Only compensate if we actually got a lock
			if orderData.InventoryLock == "" {
				return nil
			}
			
			// Create a command to release inventory
			cmd := ReleaseInventoryCommand{
				OrderID: orderData.OrderID,
				LockID:  orderData.InventoryLock,
			}
			
			// Send the command
			_, err := s.commandBus.Send(ctx, "inventory", "ReleaseInventory", cmd)
			if err != nil {
				return fmt.Errorf("failed to release inventory: %w", err)
			}
			
			return nil
		},
	)
	
	// Step 2: Process payment
	s.saga.AddStep(
		// Action
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Create a command to process payment
			cmd := ProcessPaymentCommand{
				OrderID:     orderData.OrderID,
				CustomerID:  orderData.CustomerID,
				Amount:      orderData.TotalAmount,
			}
			
			// Send the command
			result, err := s.commandBus.Send(ctx, "payment", "ProcessPayment", cmd)
			if err != nil {
				return fmt.Errorf("failed to process payment: %w", err)
			}
			
			// Store the payment ID
			orderData.PaymentID = result.(string)
			return nil
		},
		// Compensation
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Only compensate if we actually processed a payment
			if orderData.PaymentID == "" {
				return nil
			}
			
			// Create a command to refund payment
			cmd := RefundPaymentCommand{
				OrderID:   orderData.OrderID,
				PaymentID: orderData.PaymentID,
			}
			
			// Send the command
			_, err := s.commandBus.Send(ctx, "payment", "RefundPayment", cmd)
			if err != nil {
				return fmt.Errorf("failed to refund payment: %w", err)
			}
			
			return nil
		},
	)
	
	// Step 3: Create shipment
	s.saga.AddStep(
		// Action
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Create a command to create shipment
			cmd := CreateShipmentCommand{
				OrderID:    orderData.OrderID,
				CustomerID: orderData.CustomerID,
				Items:      orderData.Items,
			}
			
			// Send the command
			result, err := s.commandBus.Send(ctx, "shipping", "CreateShipment", cmd)
			if err != nil {
				return fmt.Errorf("failed to create shipment: %w", err)
			}
			
			// Store the shipment ID
			orderData.ShipmentID = result.(string)
			return nil
		},
		// Compensation
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Only compensate if we actually created a shipment
			if orderData.ShipmentID == "" {
				return nil
			}
			
			// Create a command to cancel shipment
			cmd := CancelShipmentCommand{
				OrderID:    orderData.OrderID,
				ShipmentID: orderData.ShipmentID,
			}
			
			// Send the command
			_, err := s.commandBus.Send(ctx, "shipping", "CancelShipment", cmd)
			if err != nil {
				return fmt.Errorf("failed to cancel shipment: %w", err)
			}
			
			return nil
		},
	)
	
	// Step 4: Complete order
	s.saga.AddStep(
		// Action
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Create a command to complete the order
			cmd := CompleteOrderCommand{
				OrderID:    orderData.OrderID,
				PaymentID:  orderData.PaymentID,
				ShipmentID: orderData.ShipmentID,
			}
			
			// Send the command
			_, err := s.commandBus.Send(ctx, "order", "CompleteOrder", cmd)
			if err != nil {
				return fmt.Errorf("failed to complete order: %w", err)
			}
			
			return nil
		},
		// Compensation
		func(ctx context.Context, data interface{}) error {
			orderData := data.(*OrderData)
			
			// Create a command to cancel the order
			cmd := CancelOrderCommand{
				OrderID: orderData.OrderID,
				Reason:  "Saga rollback",
			}
			
			// Send the command
			_, err := s.commandBus.Send(ctx, "order", "CancelOrder", cmd)
			if err != nil {
				return fmt.Errorf("failed to cancel order: %w", err)
			}
			
			return nil
		},
	)
}

// Execute runs the order processing saga
func (s *OrderProcessingSaga) Execute(ctx context.Context) error {
	return s.saga.Execute(ctx)
}

// CommandBus is a simple interface for sending commands to different services
type CommandBus interface {
	Send(ctx context.Context, service, commandType string, command interface{}) (interface{}, error)
}