Testing in Production and Observability
While pre-deployment testing is essential, modern systems also require testing and monitoring in production environments.
Feature Flags and Canary Deployments
Feature flags enable controlled rollouts and testing in production:
package featureflags
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// FeatureFlag represents a configurable feature flag
type FeatureFlag struct {
Name string
Description string
Enabled bool
Percentage float64 // 0.0 to 1.0 for percentage rollout
UserGroups []string
}
// FeatureFlagService manages feature flags
type FeatureFlagService struct {
flags map[string]*FeatureFlag
userGroups map[string][]string
mu sync.RWMutex
}
// NewFeatureFlagService creates a new feature flag service
func NewFeatureFlagService() *FeatureFlagService {
return &FeatureFlagService{
flags: make(map[string]*FeatureFlag),
userGroups: make(map[string][]string),
}
}
// RegisterFlag adds a new feature flag
func (s *FeatureFlagService) RegisterFlag(flag *FeatureFlag) {
s.mu.Lock()
defer s.mu.Unlock()
s.flags[flag.Name] = flag
}
// AssignUserToGroup assigns a user to a group
func (s *FeatureFlagService) AssignUserToGroup(userID, group string) {
s.mu.Lock()
defer s.mu.Unlock()
if groups, ok := s.userGroups[userID]; ok {
// Check if user is already in the group
for _, g := range groups {
if g == group {
return
}
}
s.userGroups[userID] = append(groups, group)
} else {
s.userGroups[userID] = []string{group}
}
}
// IsEnabled checks if a feature flag is enabled for a specific user
func (s *FeatureFlagService) IsEnabled(ctx context.Context, flagName, userID string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
flag, ok := s.flags[flagName]
if !ok || !flag.Enabled {
return false
}
// Check if user is in an enabled group
if userGroups, ok := s.userGroups[userID]; ok {
for _, userGroup := range userGroups {
for _, flagGroup := range flag.UserGroups {
if userGroup == flagGroup {
return true
}
}
}
}
// Check percentage rollout
if flag.Percentage > 0 {
// Use consistent hashing to ensure the same user always gets the same result
hash := consistentHash(flagName + userID)
return hash <= flag.Percentage
}
return false
}
// consistentHash generates a consistent hash value between 0.0 and 1.0
func consistentHash(s string) float64 {
h := 0
for i := 0; i < len(s); i++ {
h = 31*h + int(s[i])
}
// Convert to a value between 0.0 and 1.0
return float64(h&0x7fffffff) / float64(0x7fffffff)
}
// CanaryDeployment demonstrates a canary deployment strategy
func CanaryDeployment(ctx context.Context, flagService *FeatureFlagService) {
// Register feature flag for new algorithm
flagService.RegisterFlag(&FeatureFlag{
Name: "new-recommendation-algorithm",
Description: "New machine learning recommendation algorithm",
Enabled: true,
Percentage: 0.05, // Start with 5% of users
UserGroups: []string{"beta-testers", "internal-users"},
})
// Monitor the canary deployment
go func() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Collect metrics for the canary deployment
metrics := collectMetrics("new-recommendation-algorithm")
// If metrics look good, increase the rollout percentage
if metrics.ErrorRate < 0.01 && metrics.Latency < 100*time.Millisecond {
flag, ok := flagService.flags["new-recommendation-algorithm"]
if ok && flag.Percentage < 1.0 {
flagService.mu.Lock()
// Increase by 10% each time
flag.Percentage = min(flag.Percentage+0.1, 1.0)
flagService.mu.Unlock()
fmt.Printf("Increased rollout to %.1f%%\n", flag.Percentage*100)
}
} else {
// If metrics look bad, roll back
flagService.mu.Lock()
flag, ok := flagService.flags["new-recommendation-algorithm"]
if ok {
flag.Enabled = false
fmt.Println("Rolling back due to poor metrics")
}
flagService.mu.Unlock()
return
}
case <-ctx.Done():
return
}
}
}()
}
// DeploymentMetrics represents metrics for a deployment
type DeploymentMetrics struct {
ErrorRate float64
Latency time.Duration
Throughput int
}
// collectMetrics simulates collecting metrics for a feature
func collectMetrics(featureName string) DeploymentMetrics {
// In a real system, this would collect actual metrics from monitoring systems
return DeploymentMetrics{
ErrorRate: rand.Float64() * 0.02, // 0-2% error rate
Latency: time.Duration(rand.Intn(200)) * time.Millisecond,
Throughput: rand.Intn(1000),
}
}
// min returns the minimum of two float64 values
func min(a, b float64) float64 {
if a < b {
return a
}
return b
}
This feature flag implementation demonstrates:
- Gradual rollouts: Controlling feature exposure with percentage-based rollouts
- User targeting: Enabling features for specific user groups
- Consistent hashing: Ensuring users get consistent experiences
- Metric-based decisions: Automatically adjusting rollout based on metrics
- Rollback capability: Quickly disabling features if problems arise
Distributed Tracing and Observability
Integrating tracing into your tests ensures observability in production:
package observability
import (
"context"
"fmt"
"log"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
"go.opentelemetry.io/otel/trace"
)
// OrderService handles order processing
type OrderService struct {
inventoryClient *InventoryClient
paymentClient *PaymentClient
shippingClient *ShippingClient
notificationClient *NotificationClient
tracer trace.Tracer
}
// NewOrderService creates a new order service with tracing
func NewOrderService() (*OrderService, error) {
// Initialize tracer
exporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
if err != nil {
return nil, fmt.Errorf("failed to create exporter: %w", err)
}
resource := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("order-service"),
semconv.ServiceVersionKey.String("1.0.0"),
)
provider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resource),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
otel.SetTracerProvider(provider)
tracer := otel.Tracer("order-service")
return &OrderService{
inventoryClient: NewInventoryClient(),
paymentClient: NewPaymentClient(),
shippingClient: NewShippingClient(),
notificationClient: NewNotificationClient(),
tracer: tracer,
}, nil
}
// ProcessOrder handles the end-to-end order processing flow
func (s *OrderService) ProcessOrder(ctx context.Context, order *Order) error {
ctx, span := s.tracer.Start(ctx, "ProcessOrder",
trace.WithAttributes(
attribute.String("order.id", order.ID),
attribute.Float64("order.total", order.Total),
))
defer span.End()
// Check inventory
if err := s.checkInventory(ctx, order); err != nil {
span.RecordError(err)
return fmt.Errorf("inventory check failed: %w", err)
}
// Process payment
if err := s.processPayment(ctx, order); err != nil {
span.RecordError(err)
return fmt.Errorf("payment processing failed: %w", err)
}
// Create shipment
if err := s.createShipment(ctx, order); err != nil {
span.RecordError(err)
return fmt.Errorf("shipment creation failed: %w", err)
}
// Send notification
if err := s.sendNotification(ctx, order); err != nil {
// Non-critical error, just log it
span.RecordError(err)
log.Printf("Failed to send notification: %v", err)
}
return nil
}
// checkInventory verifies that all items are in stock
func (s *OrderService) checkInventory(ctx context.Context, order *Order) error {
ctx, span := s.tracer.Start(ctx, "CheckInventory")
defer span.End()
// Add order items as span attributes for better debugging
for i, item := range order.Items {
span.SetAttributes(
attribute.String(fmt.Sprintf("item.%d.id", i), item.ProductID),
attribute.Int(fmt.Sprintf("item.%d.quantity", i), item.Quantity),
)
}
return s.inventoryClient.CheckAvailability(ctx, order.Items)
}
// processPayment handles payment processing
func (s *OrderService) processPayment(ctx context.Context, order *Order) error {
ctx, span := s.tracer.Start(ctx, "ProcessPayment",
trace.WithAttributes(
attribute.String("payment.method", order.PaymentMethod),
attribute.Float64("payment.amount", order.Total),
))
defer span.End()
return s.paymentClient.ProcessPayment(ctx, order.ID, order.Total, order.PaymentMethod)
}
// createShipment creates a shipment for the order
func (s *OrderService) createShipment(ctx context.Context, order *Order) error {
ctx, span := s.tracer.Start(ctx, "CreateShipment")
defer span.End()
return s.shippingClient.CreateShipment(ctx, order.ID, order.ShippingAddress)
}
// sendNotification sends an order confirmation
func (s *OrderService) sendNotification(ctx context.Context, order *Order) error {
ctx, span := s.tracer.Start(ctx, "SendNotification")
defer span.End()
return s.notificationClient.SendOrderConfirmation(ctx, order.ID, order.CustomerEmail)
}
// Order represents a customer order
type Order struct {
ID string
CustomerID string
CustomerEmail string
Items []OrderItem
Total float64
PaymentMethod string
ShippingAddress string
}
// OrderItem represents an item in an order
type OrderItem struct {
ProductID string
Quantity int
Price float64
}
// Mock clients for demonstration
type InventoryClient struct{}
type PaymentClient struct{}
type ShippingClient struct{}
type NotificationClient struct{}
func NewInventoryClient() *InventoryClient { return &InventoryClient{} }
func NewPaymentClient() *PaymentClient { return &PaymentClient{} }
func NewShippingClient() *ShippingClient { return &ShippingClient{} }
func NewNotificationClient() *NotificationClient { return &NotificationClient{} }
func (c *InventoryClient) CheckAvailability(ctx context.Context, items []OrderItem) error {
// Simulate API call with tracing
tracer := otel.Tracer("inventory-client")
_, span := tracer.Start(ctx, "InventoryAPI.CheckAvailability")
defer span.End()
// Simulate processing time
time.Sleep(50 * time.Millisecond)
return nil
}
func (c *PaymentClient) ProcessPayment(ctx context.Context, orderID string, amount float64, method string) error {
tracer := otel.Tracer("payment-client")
_, span := tracer.Start(ctx, "PaymentAPI.ProcessPayment")
defer span.End()
// Simulate processing time
time.Sleep(100 * time.Millisecond)
return nil
}
func (c *ShippingClient) CreateShipment(ctx context.Context, orderID, address string) error {
tracer := otel.Tracer("shipping-client")
_, span := tracer.Start(ctx, "ShippingAPI.CreateShipment")
defer span.End()
// Simulate processing time
time.Sleep(75 * time.Millisecond)
return nil
}
func (c *NotificationClient) SendOrderConfirmation(ctx context.Context, orderID, email string) error {
tracer := otel.Tracer("notification-client")
_, span := tracer.Start(ctx, "NotificationAPI.SendOrderConfirmation")
defer span.End()
// Simulate processing time
time.Sleep(25 * time.Millisecond)
return nil
}
This tracing implementation demonstrates:
- Distributed tracing: Tracking requests across service boundaries
- Context propagation: Passing trace context between components
- Span attributes: Adding metadata to traces for debugging
- Error recording: Capturing errors in the trace
- Sampling control: Configuring trace sampling rates
Chaos Testing
Chaos testing verifies system resilience by deliberately introducing failures:
package chaos
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
// ChaosMonkey introduces controlled failures into the system
type ChaosMonkey struct {
enabled bool
failureRate float64
latencyRange [2]time.Duration // Min and max latency
mu sync.RWMutex
targetService string
}
// NewChaosMonkey creates a new chaos monkey
func NewChaosMonkey(targetService string) *ChaosMonkey {
return &ChaosMonkey{
enabled: false,
failureRate: 0.05, // 5% failure rate by default
latencyRange: [2]time.Duration{50 * time.Millisecond, 500 * time.Millisecond},
targetService: targetService,
}
}
// Enable activates the chaos monkey
func (c *ChaosMonkey) Enable() {
c.mu.Lock()
defer c.mu.Unlock()
c.enabled = true
fmt.Printf("Chaos Monkey enabled for %s\n", c.targetService)
}
// Disable deactivates the chaos monkey
func (c *ChaosMonkey) Disable() {
c.mu.Lock()
defer c.mu.Unlock()
c.enabled = false
fmt.Printf("Chaos Monkey disabled for %s\n", c.targetService)
}
// SetFailureRate sets the probability of failures
func (c *ChaosMonkey) SetFailureRate(rate float64) {
c.mu.Lock()
defer c.mu.Unlock()
c.failureRate = rate
fmt.Printf("Chaos Monkey failure rate set to %.1f%% for %s\n", rate*100, c.targetService)
}
// SetLatencyRange sets the min and max latency range
func (c *ChaosMonkey) SetLatencyRange(min, max time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.latencyRange = [2]time.Duration{min, max}
fmt.Printf("Chaos Monkey latency range set to %v-%v for %s\n", min, max, c.targetService)
}
// MaybeInjectFailure potentially injects a failure based on configuration
func (c *ChaosMonkey) MaybeInjectFailure(ctx context.Context) error {
c.mu.RLock()
defer c.mu.RUnlock()
if !c.enabled {
return nil
}
// Check if context is already canceled
if err := ctx.Err(); err != nil {
return err
}
// Randomly inject failures
if rand.Float64() < c.failureRate {
failureType := rand.Intn(3)
switch failureType {
case 0:
// Error injection
return errors.New("chaos monkey injected error")
case 1:
// Latency injection
latency := c.latencyRange[0] + time.Duration(rand.Int63n(int64(c.latencyRange[1]-c.latencyRange[0])))
fmt.Printf("Chaos Monkey injecting %v latency in %s\n", latency, c.targetService)
select {
case <-time.After(latency):
return nil
case <-ctx.Done():
return ctx.Err()
}
case 2:
// Context cancellation
fmt.Printf("Chaos Monkey canceling context in %s\n", c.targetService)
return context.Canceled
}
}
return nil
}
// ResilientService demonstrates resilience patterns
type ResilientService struct {
dependencies map[string]Service
chaos map[string]*ChaosMonkey
}
// Service represents a dependency service
type Service interface {
Call(ctx context.Context, request interface{}) (interface{}, error)
}
// NewResilientService creates a new resilient service
func NewResilientService(dependencies map[string]Service) *ResilientService {
chaos := make(map[string]*ChaosMonkey)
for name := range dependencies {
chaos[name] = NewChaosMonkey(name)
}
return &ResilientService{
dependencies: dependencies,
chaos: chaos,
}
}
// EnableChaosFor enables chaos testing for a specific dependency
func (s *ResilientService) EnableChaosFor(dependency string) {
if monkey, ok := s.chaos[dependency]; ok {
monkey.Enable()
}
}
// DisableChaosFor disables chaos testing for a specific dependency
func (s *ResilientService) DisableChaosFor(dependency string) {
if monkey, ok := s.chaos[dependency]; ok {
monkey.Disable()
}
}
// CallWithRetry demonstrates resilient service calls with retries
func (s *ResilientService) CallWithRetry(ctx context.Context, dependency string, request interface{}) (interface{}, error) {
if service, ok := s.dependencies[dependency]; ok {
monkey := s.chaos[dependency]
// Retry configuration
maxRetries := 3
backoff := 100 * time.Millisecond
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
// Maybe inject chaos
if err := monkey.MaybeInjectFailure(ctx); err != nil {
fmt.Printf("Chaos injected failure in %s: %v (attempt %d)\n", dependency, err, attempt)
lastErr = err
// Exponential backoff
if attempt < maxRetries {
sleepTime := backoff * time.Duration(1<<attempt)
time.Sleep(sleepTime)
}
continue
}
// Make the actual service call
result, err := service.Call(ctx, request)
if err != nil {
fmt.Printf("Service call to %s failed: %v (attempt %d)\n", dependency, err, attempt)
lastErr = err
// Exponential backoff
if attempt < maxRetries {
sleepTime := backoff * time.Duration(1<<attempt)
time.Sleep(sleepTime)
}
continue
}
// Success
return result, nil
}
return nil, fmt.Errorf("service call to %s failed after %d attempts: %w", dependency, maxRetries+1, lastErr)
}
return nil, fmt.Errorf("unknown dependency: %s", dependency)
}
// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
name string
state string // "closed", "open", "half-open"
failureCount int
failureThreshold int
resetTimeout time.Duration
lastFailure time.Time
mu sync.RWMutex
}
// NewCircuitBreaker creates a new circuit breaker
func NewCircuitBreaker(name string) *CircuitBreaker {
return &CircuitBreaker{
name: name,
state: "closed",
failureCount: 0,
failureThreshold: 5,
resetTimeout: 30 * time.Second,
}
}
// Execute runs a function with circuit breaker protection
func (cb *CircuitBreaker) Execute(ctx context.Context, fn func(context.Context) error) error {
cb.mu.RLock()
state := cb.state
cb.mu.RUnlock()
// If circuit is open, check if we should try again
if state == "open" {
cb.mu.RLock()
timeSinceLastFailure := time.Since(cb.lastFailure)
cb.mu.RUnlock()
if timeSinceLastFailure < cb.resetTimeout {
return fmt.Errorf("circuit breaker %s is open", cb.name)
}
// Transition to half-open
cb.mu.Lock()
cb.state = "half-open"
cb.mu.Unlock()
fmt.Printf("Circuit breaker %s transitioning to half-open\n", cb.name)
}
// Execute the function
err := fn(ctx)
// Update circuit breaker state based on result
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
// Handle failure
cb.failureCount++
cb.lastFailure = time.Now()
// Check if we should open the circuit
if cb.state == "closed" && cb.failureCount >= cb.failureThreshold {
cb.state = "open"
fmt.Printf("Circuit breaker %s opened after %d failures\n", cb.name, cb.failureCount)
} else if cb.state == "half-open" {
cb.state = "open"
fmt.Printf("Circuit breaker %s reopened after failure in half-open state\n", cb.name)
}
return err
}
// Handle success
if cb.state == "half-open" {
cb.state = "closed"
cb.failureCount = 0
fmt.Printf("Circuit breaker %s closed after success in half-open state\n", cb.name)
} else if cb.state == "closed" {
cb.failureCount = 0
}
return nil
}
This chaos testing implementation demonstrates:
- Controlled failure injection: Introducing errors, latency, and cancellations
- Resilience patterns: Implementing retries and circuit breakers
- Configurable chaos: Adjusting failure rates and types
- Service targeting: Applying chaos to specific dependencies
- Failure recovery: Testing system recovery from failures