Error Handling and Recovery Patterns
Robust error handling is crucial for distributed systems where failures are common.
Circuit Breaker Pattern
The circuit breaker pattern prevents cascading failures in distributed systems:
package main
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// CircuitBreakerState represents the state of the circuit breaker
type CircuitBreakerState int
const (
StateClosed CircuitBreakerState = iota // Normal operation, requests pass through
StateOpen // Circuit is open, requests fail fast
StateHalfOpen // Testing if the service is healthy again
)
// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
name string
state CircuitBreakerState
failureThreshold int64
successThreshold int64
resetTimeout time.Duration
failureCount int64
successCount int64
lastStateChange time.Time
mutex sync.RWMutex
onStateChange func(name string, from, to CircuitBreakerState)
consecutiveFailures int64
consecutiveSuccesses int64
totalRequests int64
totalSuccesses int64
totalFailures int64
totalTimeouts int64
totalShortCircuits int64
cumulativeResponseTime int64 // in nanoseconds
}
// CircuitBreakerOption defines a function that configures a CircuitBreaker
type CircuitBreakerOption func(*CircuitBreaker)
// WithFailureThreshold sets the threshold for failures before opening the circuit
func WithFailureThreshold(threshold int64) CircuitBrea
type PipelineStage func(ctx context.Context, in <-chan Data
kerOption {
return func(cb *CircuitBreaker) {
cb.failureThreshold = threshold
}
}
// WithSuccessThreshold sets the threshold for successes before closing the circuit
func WithSuccessThreshold(threshold int64) CircuitBreakerOption {
return func(cb *CircuitBreaker) {
cb.successThreshold = threshold
}
}
// WithResetTimeout sets the timeout before trying to close the circuit again
func WithResetTimeout(timeout time.Duration) CircuitBreakerOption {
return func(cb *CircuitBreaker) {
cb.resetTimeout = timeout
}
}
// WithOnStateChange sets a callback for state changes
func WithOnStateChange(callback func(name string, from, to CircuitBreakerState)) CircuitBreakerOption {
return func(cb *CircuitBreaker) {
cb.onStateChange = callback
}
}
// NewCircuitBreaker creates a new circuit breaker
func NewCircuitBreaker(name string, options ...CircuitBreakerOption) *CircuitBreaker {
cb := &CircuitBreaker{
name: name,
state: StateClosed,
failureThreshold: 5,
successThreshold: 3,
resetTimeout: 10 * time.Second,
lastStateChange: time.Now(),
}
// Apply options
for _, option := range options {
option(cb)
}
return cb
}
// Execute runs the given function with circuit breaker protection
func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() (interface{}, error)) (interface{}, error) {
// Check if the circuit is open
if !cb.allowRequest() {
atomic.AddInt64(&cb.totalShortCircuits, 1)
return nil, errors.New("circuit breaker is open")
}
// Track metrics
atomic.AddInt64(&cb.totalRequests, 1)
startTime := time.Now()
// Execute the protected function
result, err := fn()
// Update metrics based on the result
latency := time.Since(startTime)
atomic.AddInt64(&cb.cumulativeResponseTime, int64(latency))
// Check for timeout
if errors.Is(err, context.DeadlineExceeded) {
atomic.AddInt64(&cb.totalTimeouts, 1)
cb.recordFailure()
return nil, err
}
// Record success or failure
if err != nil {
atomic.AddInt64(&cb.totalFailures, 1)
cb.recordFailure()
return nil, err
}
atomic.AddInt64(&cb.totalSuccesses, 1)
cb.recordSuccess()
return result, nil
}
// allowRequest checks if a request should be allowed based on the circuit state
func (cb *CircuitBreaker) allowRequest() bool {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
switch cb.state {
case StateClosed:
return true
case StateOpen:
// Check if it's time to try again
if time.Since(cb.lastStateChange) > cb.resetTimeout {
// Transition to half-open
cb.mutex.RUnlock()
cb.transitionState(StateHalfOpen)
cb.mutex.RLock()
return true
}
return false
case StateHalfOpen:
// In half-open state, allow limited requests to test the service
return atomic.LoadInt64(&cb.successCount)+atomic.LoadInt64(&cb.failureCount) < cb.successThreshold
default:
return true
}
}
// recordSuccess records a successful request
func (cb *CircuitBreaker) recordSuccess() {
cb.mutex.Lock()
defer cb.mutex.Unlock()
atomic.AddInt64(&cb.successCount, 1)
atomic.StoreInt64(&cb.failureCount, 0)
atomic.AddInt64(&cb.consecutiveSuccesses, 1)
atomic.StoreInt64(&cb.consecutiveFailures, 0)
// If we're in half-open state and have enough successes, close the circuit
if cb.state == StateHalfOpen && atomic.LoadInt64(&cb.successCount) >= cb.successThreshold {
cb.transitionState(StateClosed)
}
}
// recordFailure records a failed request
func (cb *CircuitBreaker) recordFailure() {
cb.mutex.Lock()
defer cb.mutex.Unlock()
atomic.AddInt64(&cb.failureCount, 1)
atomic.StoreInt64(&cb.successCount, 0)
atomic.AddInt64(&cb.consecutiveFailures, 1)
atomic.StoreInt64(&cb.consecutiveSuccesses, 0)
// If we have too many failures, open the circuit
if (cb.state == StateClosed && atomic.LoadInt64(&cb.failureCount) >= cb.failureThreshold) ||
(cb.state == StateHalfOpen && atomic.LoadInt64(&cb.failureCount) > 0) {
cb.transitionState(StateOpen)
}
}
// transitionState changes the state of the circuit breaker
func (cb *CircuitBreaker) transitionState(newState CircuitBreakerState) {
oldState := cb.state
cb.state = newState
cb.lastStateChange = time.Now()
// Reset counters
atomic.StoreInt64(&cb.failureCount, 0)
atomic.StoreInt64(&cb.successCount, 0)
// Notify state change if callback is set
if cb.onStateChange != nil {
cb.onStateChange(cb.name, oldState, newState)
}
log.Printf("Circuit breaker %s state changed from %v to %v", cb.name, oldState, newState)
}
// GetState returns the current state of the circuit breaker
func (cb *CircuitBreaker) GetState() CircuitBreakerState {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
return cb.state
}
// GetMetrics returns the current metrics of the circuit breaker
func (cb *CircuitBreaker) GetMetrics() map[string]interface{} {
return map[string]interface{}{
"state": cb.GetState(),
"total_requests": atomic.LoadInt64(&cb.totalRequests),
"total_successes": atomic.LoadInt64(&cb.totalSuccesses),
"total_failures": atomic.LoadInt64(&cb.totalFailures),
"total_timeouts": atomic.LoadInt64(&cb.totalTimeouts),
"total_short_circuits": atomic.LoadInt64(&cb.totalShortCircuits),
"consecutive_successes": atomic.LoadInt64(&cb.consecutiveSuccesses),
"consecutive_failures": atomic.LoadInt64(&cb.consecutiveFailures),
"average_response_time": time.Duration(atomic.LoadInt64(&cb.cumulativeResponseTime) / max(1, atomic.LoadInt64(&cb.totalRequests))),
"last_state_change_ago": time.Since(cb.lastStateChange),
}
}
// Helper function for max of two int64s
func max(a, b int64) int64 {
if a > b {
return a
}
return b
}
// simulateService simulates a remote service with variable reliability
func simulateService(ctx context.Context, serviceID string, failureRate int, latency time.Duration) (string, error) {
// Simulate random failures
if rand.Intn(100) < failureRate {
return "", fmt.Errorf("service %s failed", serviceID)
}
// Simulate processing time
select {
case <-time.After(latency):
return fmt.Sprintf("Response from %s", serviceID), nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create a circuit breaker
cb := NewCircuitBreaker("example-service",
WithFailureThreshold(3),
WithSuccessThreshold(2),
WithResetTimeout(5*time.Second),
WithOnStateChange(func(name string, from, to CircuitBreakerState) {
fmt.Printf("Circuit breaker %s changed from %v to %v\n", name, from, to)
}),
)
// Simulate a series of requests
for i := 0; i < 20; i++ {
fmt.Printf("\nRequest %d:\n", i+1)
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// Execute the request through the circuit breaker
result, err := cb.Execute(ctx, func() (interface{}, error) {
// Simulate different failure rates and latencies based on the iteration
var failureRate, latencyMs int
if i < 5 {
// First few requests are successful
failureRate = 0
latencyMs = 100
} else if i < 10 {
// Next few requests have high failure rate
failureRate = 80
latencyMs = 300
} else if i < 15 {
// Then we simulate a recovery
failureRate = 0
latencyMs = 100
} else {
// Finally, simulate timeouts
failureRate = 0
latencyMs = 1500 // This will exceed our timeout
}
return simulateService(ctx, "example-service", failureRate, time.Duration(latencyMs)*time.Millisecond)
})
cancel() // Always cancel the context
// Print the result
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Printf("Success: %v\n", result)
}
// Print current metrics
metrics := cb.GetMetrics()
fmt.Printf("Circuit state: %v\n", metrics["state"])
fmt.Printf("Success/Failure: %d/%d\n", metrics["total_successes"], metrics["total_failures"])
// Wait a bit between requests
time.Sleep(500 * time.Millisecond)
}
}
This circuit breaker pattern is essential for distributed systems because it:
- Prevents cascading failures across services
- Fails fast when a service is unhealthy
- Allows for graceful recovery
- Provides detailed metrics for monitoring
- Implements sophisticated state management
Graceful Shutdown Pattern
Proper shutdown handling is crucial for maintaining data integrity:
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// GracefulShutdown manages the graceful shutdown process
type GracefulShutdown struct {
timeout time.Duration
shutdownFuncs []ShutdownFunc
wg sync.WaitGroup
shutdownCh chan struct{}
doneCh chan struct{}
}
// ShutdownFunc represents a function to be called during shutdown
type ShutdownFunc func(ctx context.Context) error
// NewGracefulShutdown creates a new graceful shutdown manager
func NewGracefulShutdown(timeout time.Duration) *GracefulShutdown {
return &GracefulShutdown{
timeout: timeout,
shutdownCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}
// AddShutdownFunc adds a function to be called during shutdown
func (gs *GracefulShutdown) AddShutdownFunc(name string, fn ShutdownFunc) {
gs.shutdownFuncs = append(gs.shutdownFuncs, func(ctx context.Context) error {
log.Printf("Shutting down %s...", name)
err := fn(ctx)
if err != nil {
log.Printf("Error shutting down %s: %v", name, err)
return err
}
log.Printf("%s shutdown complete", name)
return nil
})
}
// Start begins listening for shutdown signals
func (gs *GracefulShutdown) Start() {
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
select {
case sig := <-signalCh:
log.Printf("Received signal: %v", sig)
gs.Shutdown()
case <-gs.shutdownCh:
// Shutdown triggered programmatically
}
}()
}
// Shutdown initiates the graceful shutdown process
func (gs *GracefulShutdown) Shutdown() {
// Ensure we only shut down once
select {
case <-gs.shutdownCh:
// Already shutting down
return
default:
close(gs.shutdownCh)
}
log.Println("Starting graceful shutdown...")
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), gs.timeout)
defer cancel()
// Execute all shutdown functions
for _, fn := range gs.shutdownFuncs {
gs.wg.Add(1)
go func(shutdownFn ShutdownFunc) {
defer gs.wg.Done()
_ = shutdownFn(ctx)
}(fn)
}
// Wait for all shutdown functions to complete or timeout
shutdownComplete := make(chan struct{})
go func() {
gs.wg.Wait()
close(shutdownComplete)
}()
select {
case <-shutdownComplete:
log.Println("Graceful shutdown completed successfully")
case <-ctx.Done():
log.Println("Graceful shutdown timed out")
}
close(gs.doneCh)
}
// Wait blocks until shutdown is complete
func (gs *GracefulShutdown) Wait() {
<-gs.doneCh
}
// IsShuttingDown returns whether shutdown has been initiated
func (gs *GracefulShutdown) IsShuttingDown() bool {
select {
case <-gs.shutdownCh:
return true
default:
return false
}
}
// Example HTTP server with graceful shutdown
type Server struct {
server *http.Server
shutdown *GracefulShutdown
}
// NewServer creates a new HTTP server with graceful shutdown
func NewServer(addr string, shutdown *GracefulShutdown) *Server {
server := &http.Server{
Addr: addr,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check if we're shutting down
if shutdown.IsShuttingDown() {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("Server is shutting down"))
return
}
// Normal request handling
fmt.Fprintf(w, "Hello, World!")
}),
}
s := &Server{
server: server,
shutdown: shutdown,
}
// Register shutdown handler
shutdown.AddShutdownFunc("http-server", func(ctx context.Context) error {
return server.Shutdown(ctx)
})
return s
}
// Start starts the HTTP server
func (s *Server) Start() error {
log.Printf("Starting HTTP server on %s", s.server.Addr)
return s.server.ListenAndServe()
}
// Example worker pool with graceful shutdown
type WorkerPool struct {
jobCh chan string
shutdown *GracefulShutdown
wg sync.WaitGroup
}
// NewWorkerPool creates a new worker pool with graceful shutdown
func NewWorkerPool(workerCount int, jobBufferSize int, shutdown *GracefulShutdown) *WorkerPool {
wp := &WorkerPool{
jobCh: make(chan string, jobBufferSize),
shutdown: shutdown,
}
// Start workers
for i := 0; i < workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
// Register shutdown handler
shutdown.AddShutdownFunc("worker-pool", func(ctx context.Context) error {
log.Println("Closing job channel...")
close(wp.jobCh)
// Wait for all workers to finish
doneCh := make(chan struct{})
go func() {
wp.wg.Wait()
close(doneCh)
}()
select {
case <-doneCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
})
return wp
}
// worker processes jobs
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
log.Printf("Worker %d started", id)
for job := range wp.jobCh {
// Check if we're shutting down
if wp.shutdown.IsShuttingDown() {
log.Printf("Worker %d processing final jobs before shutdown", id)
}
// Process the job
log.Printf("Worker %d processing job: %s", id, job)
time.Sleep(100 * time.Millisecond) // Simulate work
}
log.Printf("Worker %d stopped", id)
}
// SubmitJob submits a job to the worker pool
func (wp *WorkerPool) SubmitJob(job string) error {
if wp.shutdown.IsShuttingDown() {
return fmt.Errorf("worker pool is shutting down")
}
select {
case wp.jobCh <- job:
return nil
default:
return fmt.Errorf("job queue is full")
}
}
func main() {
// Create a graceful shutdown manager with 5 second timeout
shutdown := NewGracefulShutdown(5 * time.Second)
shutdown.Start()
// Create and start HTTP server
server := NewServer(":8080", shutdown)
go func() {
if err := server.Start(); err != nil && err != http.ErrServerClosed {
log.Fatalf("HTTP server error: %v", err)
}
}()
// Create worker pool
workerPool := NewWorkerPool(3, 10, shutdown)
// Submit some jobs
go func() {
for i := 0; i < 20; i++ {
job := fmt.Sprintf("Job %d", i)
if err := workerPool.SubmitJob(job); err != nil {
log.Printf("Failed to submit job: %v", err)
}
time.Sleep(200 * time.Millisecond)
}
}()
// Wait for shutdown signal
log.Println("Server is running. Press Ctrl+C to shutdown.")
shutdown.Wait()
log.Println("Server exited")
}
This graceful shutdown pattern is valuable for distributed systems because it:
- Ensures in-flight operations complete before shutdown
- Prevents data loss during service termination
- Provides a clean shutdown sequence for dependent components
- Implements timeout handling for stuck operations
- Allows for health check integration during shutdown