Saga Pattern Implementation
In distributed systems, transactions that span multiple services present a significant challenge. The Saga pattern addresses this by breaking a distributed transaction into a sequence of local transactions, each with a compensating action to handle failures.
Saga Fundamentals
A saga is a sequence of local transactions where:
- Each local transaction updates a single service
- Each local transaction publishes an event to trigger the next transaction
- If a transaction fails, compensating transactions undo the changes
Let’s implement a saga framework for our e-commerce system:
package eventsourcing
import (
"context"
"errors"
"fmt"
"sync"
)
// SagaStep represents a single step in a saga
type SagaStep struct {
// Action to perform for this step
Action func(ctx context.Context, data interface{}) error
// Compensation to run if the saga needs to be rolled back
Compensation func(ctx context.Context, data interface{}) error
}
// Saga coordinates a sequence of steps that should be executed as a unit
type Saga struct {
name string
steps []SagaStep
data interface{}
mu sync.Mutex
// Track which steps have been executed
executedSteps int
}
// NewSaga creates a new saga with the given name
func NewSaga(name string, data interface{}) *Saga {
return &Saga{
name: name,
data: data,
steps: []SagaStep{},
}
}
// AddStep adds a step to the saga
func (s *Saga) AddStep(action, compensation func(ctx context.Context, data interface{}) error) {
s.steps = append(s.steps, SagaStep{
Action: action,
Compensation: compensation,
})
}
// Execute runs the saga, executing each step in sequence
func (s *Saga) Execute(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
// Start from where we left off (useful for resuming sagas)
for i := s.executedSteps; i < len(s.steps); i++ {
step := s.steps[i]
// Execute the step's action
if err := step.Action(ctx, s.data); err != nil {
// If the action fails, compensate
return s.compensate(ctx, i)
}
// Mark this step as executed
s.executedSteps = i + 1
}
return nil
}
// compensate runs the compensation functions for executed steps in reverse order
func (s *Saga) compensate(ctx context.Context, failedStepIndex int) error {
var compensationErr error
// Run compensations in reverse order
for i := failedStepIndex - 1; i >= 0; i-- {
step := s.steps[i]
if step.Compensation != nil {
if err := step.Compensation(ctx, s.data); err != nil {
// Log compensation errors but continue with other compensations
compensationErr = errors.Join(compensationErr,
fmt.Errorf("compensation failed for step %d: %w", i, err))
}
}
}
return compensationErr
}
Order Processing Saga Example
Let’s implement a concrete saga for processing orders in our e-commerce system:
package ecommerce
import (
"context"
"errors"
"fmt"
"time"
"github.com/yourdomain/eventsourcing"
)
// OrderData contains all the data needed for the order processing saga
type OrderData struct {
OrderID string
CustomerID string
Items []OrderItem
TotalAmount float64
PaymentID string
ShipmentID string
InventoryLock string
}
type OrderItem struct {
ProductID string
Quantity int
UnitPrice float64
}
// OrderProcessingSaga coordinates the order processing workflow
type OrderProcessingSaga struct {
saga *eventsourcing.Saga
eventStore eventsourcing.EventStore
commandBus CommandBus
orderData *OrderData
}
// NewOrderProcessingSaga creates a new order processing saga
func NewOrderProcessingSaga(
orderData *OrderData,
eventStore eventsourcing.EventStore,
commandBus CommandBus,
) *OrderProcessingSaga {
s := &OrderProcessingSaga{
orderData: orderData,
eventStore: eventStore,
commandBus: commandBus,
}
// Create the underlying saga
s.saga = eventsourcing.NewSaga("OrderProcessing", orderData)
// Define the saga steps
s.defineSagaSteps()
return s
}
// defineSagaSteps sets up the steps for the order processing saga
func (s *OrderProcessingSaga) defineSagaSteps() {
// Step 1: Reserve inventory
s.saga.AddStep(
// Action
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Create a command to reserve inventory
cmd := ReserveInventoryCommand{
OrderID: orderData.OrderID,
Items: orderData.Items,
}
// Send the command
result, err := s.commandBus.Send(ctx, "inventory", "ReserveInventory", cmd)
if err != nil {
return fmt.Errorf("failed to reserve inventory: %w", err)
}
// Store the inventory lock ID
orderData.InventoryLock = result.(string)
return nil
},
// Compensation
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Only compensate if we actually got a lock
if orderData.InventoryLock == "" {
return nil
}
// Create a command to release inventory
cmd := ReleaseInventoryCommand{
OrderID: orderData.OrderID,
LockID: orderData.InventoryLock,
}
// Send the command
_, err := s.commandBus.Send(ctx, "inventory", "ReleaseInventory", cmd)
if err != nil {
return fmt.Errorf("failed to release inventory: %w", err)
}
return nil
},
)
// Step 2: Process payment
s.saga.AddStep(
// Action
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Create a command to process payment
cmd := ProcessPaymentCommand{
OrderID: orderData.OrderID,
CustomerID: orderData.CustomerID,
Amount: orderData.TotalAmount,
}
// Send the command
result, err := s.commandBus.Send(ctx, "payment", "ProcessPayment", cmd)
if err != nil {
return fmt.Errorf("failed to process payment: %w", err)
}
// Store the payment ID
orderData.PaymentID = result.(string)
return nil
},
// Compensation
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Only compensate if we actually processed a payment
if orderData.PaymentID == "" {
return nil
}
// Create a command to refund payment
cmd := RefundPaymentCommand{
OrderID: orderData.OrderID,
PaymentID: orderData.PaymentID,
}
// Send the command
_, err := s.commandBus.Send(ctx, "payment", "RefundPayment", cmd)
if err != nil {
return fmt.Errorf("failed to refund payment: %w", err)
}
return nil
},
)
// Step 3: Create shipment
s.saga.AddStep(
// Action
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Create a command to create shipment
cmd := CreateShipmentCommand{
OrderID: orderData.OrderID,
CustomerID: orderData.CustomerID,
Items: orderData.Items,
}
// Send the command
result, err := s.commandBus.Send(ctx, "shipping", "CreateShipment", cmd)
if err != nil {
return fmt.Errorf("failed to create shipment: %w", err)
}
// Store the shipment ID
orderData.ShipmentID = result.(string)
return nil
},
// Compensation
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Only compensate if we actually created a shipment
if orderData.ShipmentID == "" {
return nil
}
// Create a command to cancel shipment
cmd := CancelShipmentCommand{
OrderID: orderData.OrderID,
ShipmentID: orderData.ShipmentID,
}
// Send the command
_, err := s.commandBus.Send(ctx, "shipping", "CancelShipment", cmd)
if err != nil {
return fmt.Errorf("failed to cancel shipment: %w", err)
}
return nil
},
)
// Step 4: Complete order
s.saga.AddStep(
// Action
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Create a command to complete the order
cmd := CompleteOrderCommand{
OrderID: orderData.OrderID,
PaymentID: orderData.PaymentID,
ShipmentID: orderData.ShipmentID,
}
// Send the command
_, err := s.commandBus.Send(ctx, "order", "CompleteOrder", cmd)
if err != nil {
return fmt.Errorf("failed to complete order: %w", err)
}
return nil
},
// Compensation
func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
// Create a command to cancel the order
cmd := CancelOrderCommand{
OrderID: orderData.OrderID,
Reason: "Saga rollback",
}
// Send the command
_, err := s.commandBus.Send(ctx, "order", "CancelOrder", cmd)
if err != nil {
return fmt.Errorf("failed to cancel order: %w", err)
}
return nil
},
)
}
// Execute runs the order processing saga
func (s *OrderProcessingSaga) Execute(ctx context.Context) error {
return s.saga.Execute(ctx)
}
// CommandBus is a simple interface for sending commands to different services
type CommandBus interface {
Send(ctx context.Context, service, commandType string, command interface{}) (interface{}, error)
}