Error Handling and Recovery Patterns
Robust error handling is critical for production worker pools. Tasks may fail for various reasons, and the system must handle these failures gracefully.
Worker Pool with Error Recovery
Let’s implement a worker pool with sophisticated error handling and recovery mechanisms:
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
// TaskFunc represents a function that can be executed by the worker pool
type TaskFunc func(ctx context.Context) (interface{}, error)
// TaskResult contains the result of a task execution
type TaskResult struct {
Value interface{}
Err error
Retry int
}
// RetryablePool represents a worker pool with error handling and retry logic
type RetryablePool struct {
workers int
maxRetries int
tasks chan *taskWithMetadata
results chan TaskResult
wg sync.WaitGroup
ctx context.Context
cancelFunc context.CancelFunc
errorPolicy ErrorPolicy
}
// taskWithMetadata wraps a task with its metadata
type taskWithMetadata struct {
task TaskFunc
retries int
id int
createdAt time.Time
}
// ErrorPolicy defines how to handle task errors
type ErrorPolicy interface {
// ShouldRetry determines if a failed task should be retried
ShouldRetry(err error, retries int, maxRetries int) bool
// RetryDelay returns how long to wait before retrying
RetryDelay(retries int) time.Duration
}
// DefaultErrorPolicy implements a simple error policy
type DefaultErrorPolicy struct{}
// ShouldRetry implements ErrorPolicy.ShouldRetry
func (p DefaultErrorPolicy) ShouldRetry(err error, retries, maxRetries int) bool {
// Don't retry if we've hit the max retries
if retries >= maxRetries {
return false
}
// Don't retry context cancellation errors
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}
// Retry all other errors
return true
}
// RetryDelay implements ErrorPolicy.RetryDelay
func (p DefaultErrorPolicy) RetryDelay(retries int) time.Duration {
// Exponential backoff: 100ms, 200ms, 400ms, 800ms, etc.
return time.Duration(100*(1<<retries)) * time.Millisecond
}
// NewRetryablePool creates a new worker pool with error handling and retry logic
func NewRetryablePool(workers, maxRetries, queueSize int) *RetryablePool {
ctx, cancel := context.WithCancel(context.Background())
return &RetryablePool{
workers: workers,
maxRetries: maxRetries,
tasks: make(chan *taskWithMetadata, queueSize),
results: make(chan TaskResult, queueSize),
ctx: ctx,
cancelFunc: cancel,
errorPolicy: DefaultErrorPolicy{},
}
}
// SetErrorPolicy sets a custom error policy
func (p *RetryablePool) SetErrorPolicy(policy ErrorPolicy) {
p.errorPolicy = policy
}
// Start launches the worker pool
func (p *RetryablePool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for {
select {
case task, ok := <-p.tasks:
if !ok {
return // Channel closed
}
// Create a timeout context for this task
taskCtx, cancel := context.WithTimeout(p.ctx, 10*time.Second)
// Execute the task
startTime := time.Now()
value, err := task.task(taskCtx)
duration := time.Since(startTime)
// Clean up the timeout context
cancel()
if err != nil {
fmt.Printf("Worker %d: Task %d failed after %v: %v (retry %d/%d)\n",
workerID, task.id, duration, err, task.retries, p.maxRetries)
// Check if we should retry
if p.errorPolicy.ShouldRetry(err, task.retries, p.maxRetries) {
// Increment retry counter
task.retries++
// Schedule retry after delay
delay := p.errorPolicy.RetryDelay(task.retries)
go func(t *taskWithMetadata) {
select {
case <-time.After(delay):
// Resubmit the task
select {
case p.tasks <- t:
fmt.Printf("Resubmitted task %d for retry %d/%d after %v\n",
t.id, t.retries, p.maxRetries, delay)
case <-p.ctx.Done():
// Pool is shutting down
}
case <-p.ctx.Done():
// Pool is shutting down
}
}(task)
continue
}
// We're not retrying, send the error result
p.results <- TaskResult{
Value: nil,
Err: err,
Retry: task.retries,
}
continue
}
// Task succeeded
fmt.Printf("Worker %d: Task %d completed successfully in %v\n",
workerID, task.id, duration)
// Send the result
p.results <- TaskResult{
Value: value,
Err: nil,
Retry: task.retries,
}
case <-p.ctx.Done():
return // Context canceled
}
}
}(i + 1)
}
// Close results channel when all workers are done
go func() {
p.wg.Wait()
close(p.results)
}()
}
// Submit adds a task to the pool
func (p *RetryablePool) Submit(id int, task TaskFunc) bool {
select {
case p.tasks <- &taskWithMetadata{
task: task,
retries: 0,
id: id,
createdAt: time.Now(),
}:
return true
case <-p.ctx.Done():
return false
}
}
// Results returns the channel of results
func (p *RetryablePool) Results() <-chan TaskResult {
return p.results
}
// Stop gracefully shuts down the pool
func (p *RetryablePool) Stop() {
p.cancelFunc() // Signal all workers to stop
close(p.tasks) // Stop accepting new tasks
p.wg.Wait() // Wait for all workers to finish
}
func main() {
// Create a retryable pool with 3 workers, max 3 retries, and queue size 10
pool := NewRetryablePool(3, 3, 10)
// Start the pool
pool.Start()
// Process results in a separate goroutine
go func() {
for result := range pool.Results() {
if result.Err != nil {
fmt.Printf("Task failed after %d retries: %v\n", result.Retry, result.Err)
} else {
fmt.Printf("Task succeeded with result: %v (after %d retries)\n", result.Value, result.Retry)
}
}
}()
// Submit tasks with different error behaviors
for i := 0; i < 10; i++ {
taskID := i
// Create different task behaviors
var task TaskFunc
switch i % 4 {
case 0:
// Task that always succeeds
task = func(ctx context.Context) (interface{}, error) {
time.Sleep(100 * time.Millisecond)
return fmt.Sprintf("Result from task %d", taskID), nil
}
case 1:
// Task that fails once then succeeds
task = func(ctx context.Context) (interface{}, error) {
time.Sleep(100 * time.Millisecond)
if taskID%10 == 1 {
// This will succeed on retry
return fmt.Sprintf("Retry success from task %d", taskID), nil
}
return nil, fmt.Errorf("transient error in task %d", taskID)
}
case 2:
// Task that always fails
task = func(ctx context.Context) (interface{}, error) {
time.Sleep(100 * time.Millisecond)
return nil, fmt.Errorf("permanent error in task %d", taskID)
}
case 3:
// Task that times out
task = func(ctx context.Context) (interface{}, error) {
select {
case <-time.After(2 * time.Second):
return fmt.Sprintf("Late result from task %d", taskID), nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
pool.Submit(taskID, task)
}
// Wait for tasks to complete
time.Sleep(5 * time.Second)
// Stop the pool
pool.Stop()
fmt.Println("Pool shut down")
}
This worker pool implementation includes sophisticated error handling:
- Configurable retry policy: Tasks can be retried based on customizable policies
- Exponential backoff: Retry delays increase exponentially to prevent overwhelming the system
- Task timeouts: Each task runs with a timeout to prevent hanging workers
- Error differentiation: Different types of errors are handled appropriately