Error Handling and Recovery in Pipelines

Robust error handling is critical for production-grade pipeline systems. Let’s explore patterns for handling errors in pipeline architectures.

Error Propagation Patterns

There are several approaches to propagating errors through a pipeline:

package main

import (
	"errors"
	"fmt"
	"math/rand"
	"time"
)

// Result wraps a value and an error
type Result struct {
	Value interface{}
	Err   error
}

// source generates integers with occasional errors
func source() <-chan Result {
	out := make(chan Result)
	go func() {
		defer close(out)
		for i := 0; i < 10; i++ {
			// Randomly generate errors
			if rand.Float64() < 0.3 {
				out <- Result{Err: fmt.Errorf("error generating value %d", i)}
				continue
			}
			
			out <- Result{Value: i, Err: nil}
			time.Sleep(100 * time.Millisecond)
		}
	}()
	return out
}

// transform applies a transformation, propagating any errors
func transform(in <-chan Result) <-chan Result {
	out := make(chan Result)
	go func() {
		defer close(out)
		for res := range in {
			// If we received an error, propagate it
			if res.Err != nil {
				out <- res
				continue
			}
			
			// Try to process the value
			val, ok := res.Value.(int)
			if !ok {
				out <- Result{Err: errors.New("expected integer value")}
				continue
			}
			
			// Randomly fail during processing
			if rand.Float64() < 0.2 {
				out <- Result{Err: fmt.Errorf("error processing value %d", val)}
				continue
			}
			
			// Success case
			out <- Result{Value: val * val, Err: nil}
		}
	}()
	return out
}

// sink consumes results and handles errors
func sink(in <-chan Result) {
	for res := range in {
		if res.Err != nil {
			fmt.Printf("Error: %v\n", res.Err)
		} else {
			fmt.Printf("Result: %v\n", res.Value)
		}
	}
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Build and run the pipeline
	results := transform(source())
	sink(results)
}

This pattern demonstrates:

  1. Error Wrapping: Each value is wrapped with potential error information
  2. Error Propagation: Errors from upstream stages are passed downstream
  3. Error Generation: Stages can generate their own errors
  4. Error Handling: The final stage decides how to handle errors

Error Recovery Strategies

For long-running pipelines, recovering from errors rather than failing is often desirable:

package main

import (
	"context"
	"errors"
	"fmt"
	"math/rand"
	"time"
)

// retryableStage attempts to process items with retries
func retryableStage(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			// Try to process with retries
			result, err := processWithRetry(n, 3, 100*time.Millisecond)
			if err != nil {
				fmt.Printf("Failed to process %d after retries: %v\n", n, err)
				continue // Skip this item
			}
			out <- result
		}
	}()
	return out
}

// processWithRetry attempts to process a value with retries
func processWithRetry(n int, maxRetries int, delay time.Duration) (int, error) {
	var lastErr error
	
	for attempt := 0; attempt <= maxRetries; attempt++ {
		// Attempt to process
		result, err := process(n)
		if err == nil {
			return result, nil // Success
		}
		
		lastErr = err
		fmt.Printf("Attempt %d failed for %d: %v\n", attempt+1, n, err)
		
		// Don't sleep after the last attempt
		if attempt < maxRetries {
			// Exponential backoff
			sleepTime := delay * time.Duration(1<<attempt)
			time.Sleep(sleepTime)
		}
	}
	
	return 0, fmt.Errorf("all retries failed: %w", lastErr)
}

// process simulates a flaky operation
func process(n int) (int, error) {
	// Simulate random failures
	if rand.Float64() < 0.6 {
		return 0, errors.New("random processing error")
	}
	return n * n, nil
}

// circuitBreakerStage implements the circuit breaker pattern
func circuitBreakerStage(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		
		// Circuit breaker state
		var failures int
		var lastFailure time.Time
		const maxFailures = 3
		const resetTimeout = 5 * time.Second
		circuitOpen := false
		
		for n := range in {
			// Check if circuit is open
			if circuitOpen {
				// Check if we should try to reset
				if time.Since(lastFailure) > resetTimeout {
					fmt.Println("Circuit half-open, attempting reset")
					circuitOpen = false
					failures = 0
				} else {
					fmt.Printf("Circuit open, skipping item %d\n", n)
					continue
				}
			}
			
			// Try to process
			result, err := process(n)
			if err != nil {
				failures++
				lastFailure = time.Now()
				fmt.Printf("Processing failed for %d: %v (failures: %d)\n", n, err, failures)
				
				// Check if we should open the circuit
				if failures >= maxFailures {
					fmt.Println("Circuit breaker tripped, opening circuit")
					circuitOpen = true
				}
				continue
			}
			
			// Success, reset failure count
			failures = 0
			out <- result
		}
	}()
	return out
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create input channel
	input := make(chan int)
	go func() {
		defer close(input)
		for i := 1; i <= 20; i++ {
			input <- i
			time.Sleep(200 * time.Millisecond)
		}
	}()
	
	// Create pipeline with retry stage
	retryOutput := retryableStage(input)
	
	// Consume results
	for result := range retryOutput {
		fmt.Printf("Got result: %d\n", result)
	}
}

This example demonstrates two important error recovery patterns:

  1. Retry Pattern: Automatically retry failed operations with exponential backoff
  2. Circuit Breaker Pattern: Prevent cascading failures by temporarily disabling operations after repeated failures

Partial Failure Handling

In distributed pipelines, handling partial failures is essential:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// BatchResult represents the result of processing a batch of items
type BatchResult struct {
	Successful []int
	Failed     map[int]error
}

// batchProcessor processes items in batches with partial failure handling
func batchProcessor(ctx context.Context, in <-chan int, batchSize int) <-chan BatchResult {
	out := make(chan BatchResult)
	go func() {
		defer close(out)
		
		batch := make([]int, 0, batchSize)
		
		// Helper function to process and send the current batch
		processBatch := func() {
			if len(batch) == 0 {
				return
			}
			
			result := BatchResult{
				Successful: []int{},
				Failed:     make(map[int]error),
			}
			
			// Process each item in the batch
			for _, item := range batch {
				// Simulate processing with potential failures
				if item%3 == 0 {
					result.Failed[item] = fmt.Errorf("failed to process item %d", item)
				} else {
					result.Successful = append(result.Successful, item)
				}
			}
			
			// Send batch result
			select {
			case out <- result:
				// Result sent successfully
			case <-ctx.Done():
				return
			}
			
			// Clear the batch
			batch = make([]int, 0, batchSize)
		}
		
		for {
			select {
			case item, ok := <-in:
				if !ok {
					// Input channel closed, process remaining items
					processBatch()
					return
				}
				
				// Add item to batch
				batch = append(batch, item)
				
				// Process batch if it's full
				if len(batch) >= batchSize {
					processBatch()
				}
				
			case <-ctx.Done():
				// Context cancelled
				return
			}
		}
	}()
	return out
}

func main() {
	// Create context with cancellation
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	// Create input channel
	input := make(chan int)
	go func() {
		defer close(input)
		for i := 1; i <= 20; i++ {
			input <- i
			time.Sleep(100 * time.Millisecond)
		}
	}()
	
	// Process in batches
	results := batchProcessor(ctx, input, 5)
	
	// Handle batch results
	for result := range results {
		fmt.Printf("Batch processed: %d successful, %d failed\n", 
			len(result.Successful), len(result.Failed))
		
		if len(result.Successful) > 0 {
			fmt.Printf("  Successful items: %v\n", result.Successful)
		}
		
		if len(result.Failed) > 0 {
			fmt.Println("  Failed items:")
			for item, err := range result.Failed {
				fmt.Printf("    Item %d: %v\n", item, err)
			}
		}
	}
}

This pattern enables:

  1. Batch Processing: Process items in groups for efficiency
  2. Partial Success: Continue processing despite some failures
  3. Detailed Error Reporting: Track exactly which items failed and why
  4. Graceful Degradation: The pipeline continues to make progress even with errors

These error handling patterns are essential for building resilient pipeline systems that can recover from failures and continue processing data.