Error Handling and Recovery Patterns

Robust error handling is critical for production worker pools. Tasks may fail for various reasons, and the system must handle these failures gracefully.

Worker Pool with Error Recovery

Let’s implement a worker pool with sophisticated error handling and recovery mechanisms:

package main

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"
)

// TaskFunc represents a function that can be executed by the worker pool
type TaskFunc func(ctx context.Context) (interface{}, error)

// TaskResult contains the result of a task execution
type TaskResult struct {
	Value interface{}
	Err   error
	Retry int
}

// RetryablePool represents a worker pool with error handling and retry logic
type RetryablePool struct {
	workers     int
	maxRetries  int
	tasks       chan *taskWithMetadata
	results     chan TaskResult
	wg          sync.WaitGroup
	ctx         context.Context
	cancelFunc  context.CancelFunc
	errorPolicy ErrorPolicy
}

// taskWithMetadata wraps a task with its metadata
type taskWithMetadata struct {
	task      TaskFunc
	retries   int
	id        int
	createdAt time.Time
}

// ErrorPolicy defines how to handle task errors
type ErrorPolicy interface {
	// ShouldRetry determines if a failed task should be retried
	ShouldRetry(err error, retries int, maxRetries int) bool
	
	// RetryDelay returns how long to wait before retrying
	RetryDelay(retries int) time.Duration
}

// DefaultErrorPolicy implements a simple error policy
type DefaultErrorPolicy struct{}

// ShouldRetry implements ErrorPolicy.ShouldRetry
func (p DefaultErrorPolicy) ShouldRetry(err error, retries, maxRetries int) bool {
	// Don't retry if we've hit the max retries
	if retries >= maxRetries {
		return false
	}
	
	// Don't retry context cancellation errors
	if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
		return false
	}
	
	// Retry all other errors
	return true
}

// RetryDelay implements ErrorPolicy.RetryDelay
func (p DefaultErrorPolicy) RetryDelay(retries int) time.Duration {
	// Exponential backoff: 100ms, 200ms, 400ms, 800ms, etc.
	return time.Duration(100*(1<<retries)) * time.Millisecond
}

// NewRetryablePool creates a new worker pool with error handling and retry logic
func NewRetryablePool(workers, maxRetries, queueSize int) *RetryablePool {
	ctx, cancel := context.WithCancel(context.Background())
	
	return &RetryablePool{
		workers:     workers,
		maxRetries:  maxRetries,
		tasks:       make(chan *taskWithMetadata, queueSize),
		results:     make(chan TaskResult, queueSize),
		ctx:         ctx,
		cancelFunc:  cancel,
		errorPolicy: DefaultErrorPolicy{},
	}
}

// SetErrorPolicy sets a custom error policy
func (p *RetryablePool) SetErrorPolicy(policy ErrorPolicy) {
	p.errorPolicy = policy
}

// Start launches the worker pool
func (p *RetryablePool) Start() {
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go func(workerID int) {
			defer p.wg.Done()
			
			for {
				select {
				case task, ok := <-p.tasks:
					if !ok {
						return // Channel closed
					}
					
					// Create a timeout context for this task
					taskCtx, cancel := context.WithTimeout(p.ctx, 10*time.Second)
					
					// Execute the task
					startTime := time.Now()
					value, err := task.task(taskCtx)
					duration := time.Since(startTime)
					
					// Clean up the timeout context
					cancel()
					
					if err != nil {
						fmt.Printf("Worker %d: Task %d failed after %v: %v (retry %d/%d)\n",
							workerID, task.id, duration, err, task.retries, p.maxRetries)
						
						// Check if we should retry
						if p.errorPolicy.ShouldRetry(err, task.retries, p.maxRetries) {
							// Increment retry counter
							task.retries++
							
							// Schedule retry after delay
							delay := p.errorPolicy.RetryDelay(task.retries)
							
							go func(t *taskWithMetadata) {
								select {
								case <-time.After(delay):
									// Resubmit the task
									select {
									case p.tasks <- t:
										fmt.Printf("Resubmitted task %d for retry %d/%d after %v\n",
											t.id, t.retries, p.maxRetries, delay)
									case <-p.ctx.Done():
										// Pool is shutting down
									}
								case <-p.ctx.Done():
									// Pool is shutting down
								}
							}(task)
							
							continue
						}
						
						// We're not retrying, send the error result
						p.results <- TaskResult{
							Value: nil,
							Err:   err,
							Retry: task.retries,
						}
						continue
					}
					
					// Task succeeded
					fmt.Printf("Worker %d: Task %d completed successfully in %v\n",
						workerID, task.id, duration)
					
					// Send the result
					p.results <- TaskResult{
						Value: value,
						Err:   nil,
						Retry: task.retries,
					}
					
				case <-p.ctx.Done():
					return // Context canceled
				}
			}
		}(i + 1)
	}
	
	// Close results channel when all workers are done
	go func() {
		p.wg.Wait()
		close(p.results)
	}()
}

// Submit adds a task to the pool
func (p *RetryablePool) Submit(id int, task TaskFunc) bool {
	select {
	case p.tasks <- &taskWithMetadata{
		task:      task,
		retries:   0,
		id:        id,
		createdAt: time.Now(),
	}:
		return true
	case <-p.ctx.Done():
		return false
	}
}

// Results returns the channel of results
func (p *RetryablePool) Results() <-chan TaskResult {
	return p.results
}

// Stop gracefully shuts down the pool
func (p *RetryablePool) Stop() {
	p.cancelFunc() // Signal all workers to stop
	close(p.tasks) // Stop accepting new tasks
	p.wg.Wait()    // Wait for all workers to finish
}

func main() {
	// Create a retryable pool with 3 workers, max 3 retries, and queue size 10
	pool := NewRetryablePool(3, 3, 10)
	
	// Start the pool
	pool.Start()
	
	// Process results in a separate goroutine
	go func() {
		for result := range pool.Results() {
			if result.Err != nil {
				fmt.Printf("Task failed after %d retries: %v\n", result.Retry, result.Err)
			} else {
				fmt.Printf("Task succeeded with result: %v (after %d retries)\n", result.Value, result.Retry)
			}
		}
	}()
	
	// Submit tasks with different error behaviors
	for i := 0; i < 10; i++ {
		taskID := i
		
		// Create different task behaviors
		var task TaskFunc
		
		switch i % 4 {
		case 0:
			// Task that always succeeds
			task = func(ctx context.Context) (interface{}, error) {
				time.Sleep(100 * time.Millisecond)
				return fmt.Sprintf("Result from task %d", taskID), nil
			}
		case 1:
			// Task that fails once then succeeds
			task = func(ctx context.Context) (interface{}, error) {
				time.Sleep(100 * time.Millisecond)
				
				if taskID%10 == 1 {
					// This will succeed on retry
					return fmt.Sprintf("Retry success from task %d", taskID), nil
				}
				
				return nil, fmt.Errorf("transient error in task %d", taskID)
			}
		case 2:
			// Task that always fails
			task = func(ctx context.Context) (interface{}, error) {
				time.Sleep(100 * time.Millisecond)
				return nil, fmt.Errorf("permanent error in task %d", taskID)
			}
		case 3:
			// Task that times out
			task = func(ctx context.Context) (interface{}, error) {
				select {
				case <-time.After(2 * time.Second):
					return fmt.Sprintf("Late result from task %d", taskID), nil
				case <-ctx.Done():
					return nil, ctx.Err()
				}
			}
		}
		
		pool.Submit(taskID, task)
	}
	
	// Wait for tasks to complete
	time.Sleep(5 * time.Second)
	
	// Stop the pool
	pool.Stop()
	fmt.Println("Pool shut down")
}

This worker pool implementation includes sophisticated error handling:

  1. Configurable retry policy: Tasks can be retried based on customizable policies
  2. Exponential backoff: Retry delays increase exponentially to prevent overwhelming the system
  3. Task timeouts: Each task runs with a timeout to prevent hanging workers
  4. Error differentiation: Different types of errors are handled appropriately