Error Handling and Recovery in Pipelines
Robust error handling is critical for production-grade pipeline systems. Let’s explore patterns for handling errors in pipeline architectures.
Error Propagation Patterns
There are several approaches to propagating errors through a pipeline:
package main
import (
"errors"
"fmt"
"math/rand"
"time"
)
// Result wraps a value and an error
type Result struct {
Value interface{}
Err error
}
// source generates integers with occasional errors
func source() <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for i := 0; i < 10; i++ {
// Randomly generate errors
if rand.Float64() < 0.3 {
out <- Result{Err: fmt.Errorf("error generating value %d", i)}
continue
}
out <- Result{Value: i, Err: nil}
time.Sleep(100 * time.Millisecond)
}
}()
return out
}
// transform applies a transformation, propagating any errors
func transform(in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for res := range in {
// If we received an error, propagate it
if res.Err != nil {
out <- res
continue
}
// Try to process the value
val, ok := res.Value.(int)
if !ok {
out <- Result{Err: errors.New("expected integer value")}
continue
}
// Randomly fail during processing
if rand.Float64() < 0.2 {
out <- Result{Err: fmt.Errorf("error processing value %d", val)}
continue
}
// Success case
out <- Result{Value: val * val, Err: nil}
}
}()
return out
}
// sink consumes results and handles errors
func sink(in <-chan Result) {
for res := range in {
if res.Err != nil {
fmt.Printf("Error: %v\n", res.Err)
} else {
fmt.Printf("Result: %v\n", res.Value)
}
}
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Build and run the pipeline
results := transform(source())
sink(results)
}
This pattern demonstrates:
- Error Wrapping: Each value is wrapped with potential error information
- Error Propagation: Errors from upstream stages are passed downstream
- Error Generation: Stages can generate their own errors
- Error Handling: The final stage decides how to handle errors
Error Recovery Strategies
For long-running pipelines, recovering from errors rather than failing is often desirable:
package main
import (
"context"
"errors"
"fmt"
"math/rand"
"time"
)
// retryableStage attempts to process items with retries
func retryableStage(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
// Try to process with retries
result, err := processWithRetry(n, 3, 100*time.Millisecond)
if err != nil {
fmt.Printf("Failed to process %d after retries: %v\n", n, err)
continue // Skip this item
}
out <- result
}
}()
return out
}
// processWithRetry attempts to process a value with retries
func processWithRetry(n int, maxRetries int, delay time.Duration) (int, error) {
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
// Attempt to process
result, err := process(n)
if err == nil {
return result, nil // Success
}
lastErr = err
fmt.Printf("Attempt %d failed for %d: %v\n", attempt+1, n, err)
// Don't sleep after the last attempt
if attempt < maxRetries {
// Exponential backoff
sleepTime := delay * time.Duration(1<<attempt)
time.Sleep(sleepTime)
}
}
return 0, fmt.Errorf("all retries failed: %w", lastErr)
}
// process simulates a flaky operation
func process(n int) (int, error) {
// Simulate random failures
if rand.Float64() < 0.6 {
return 0, errors.New("random processing error")
}
return n * n, nil
}
// circuitBreakerStage implements the circuit breaker pattern
func circuitBreakerStage(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
// Circuit breaker state
var failures int
var lastFailure time.Time
const maxFailures = 3
const resetTimeout = 5 * time.Second
circuitOpen := false
for n := range in {
// Check if circuit is open
if circuitOpen {
// Check if we should try to reset
if time.Since(lastFailure) > resetTimeout {
fmt.Println("Circuit half-open, attempting reset")
circuitOpen = false
failures = 0
} else {
fmt.Printf("Circuit open, skipping item %d\n", n)
continue
}
}
// Try to process
result, err := process(n)
if err != nil {
failures++
lastFailure = time.Now()
fmt.Printf("Processing failed for %d: %v (failures: %d)\n", n, err, failures)
// Check if we should open the circuit
if failures >= maxFailures {
fmt.Println("Circuit breaker tripped, opening circuit")
circuitOpen = true
}
continue
}
// Success, reset failure count
failures = 0
out <- result
}
}()
return out
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create input channel
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 20; i++ {
input <- i
time.Sleep(200 * time.Millisecond)
}
}()
// Create pipeline with retry stage
retryOutput := retryableStage(input)
// Consume results
for result := range retryOutput {
fmt.Printf("Got result: %d\n", result)
}
}
This example demonstrates two important error recovery patterns:
- Retry Pattern: Automatically retry failed operations with exponential backoff
- Circuit Breaker Pattern: Prevent cascading failures by temporarily disabling operations after repeated failures
Partial Failure Handling
In distributed pipelines, handling partial failures is essential:
package main
import (
"context"
"fmt"
"sync"
"time"
)
// BatchResult represents the result of processing a batch of items
type BatchResult struct {
Successful []int
Failed map[int]error
}
// batchProcessor processes items in batches with partial failure handling
func batchProcessor(ctx context.Context, in <-chan int, batchSize int) <-chan BatchResult {
out := make(chan BatchResult)
go func() {
defer close(out)
batch := make([]int, 0, batchSize)
// Helper function to process and send the current batch
processBatch := func() {
if len(batch) == 0 {
return
}
result := BatchResult{
Successful: []int{},
Failed: make(map[int]error),
}
// Process each item in the batch
for _, item := range batch {
// Simulate processing with potential failures
if item%3 == 0 {
result.Failed[item] = fmt.Errorf("failed to process item %d", item)
} else {
result.Successful = append(result.Successful, item)
}
}
// Send batch result
select {
case out <- result:
// Result sent successfully
case <-ctx.Done():
return
}
// Clear the batch
batch = make([]int, 0, batchSize)
}
for {
select {
case item, ok := <-in:
if !ok {
// Input channel closed, process remaining items
processBatch()
return
}
// Add item to batch
batch = append(batch, item)
// Process batch if it's full
if len(batch) >= batchSize {
processBatch()
}
case <-ctx.Done():
// Context cancelled
return
}
}
}()
return out
}
func main() {
// Create context with cancellation
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create input channel
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 20; i++ {
input <- i
time.Sleep(100 * time.Millisecond)
}
}()
// Process in batches
results := batchProcessor(ctx, input, 5)
// Handle batch results
for result := range results {
fmt.Printf("Batch processed: %d successful, %d failed\n",
len(result.Successful), len(result.Failed))
if len(result.Successful) > 0 {
fmt.Printf(" Successful items: %v\n", result.Successful)
}
if len(result.Failed) > 0 {
fmt.Println(" Failed items:")
for item, err := range result.Failed {
fmt.Printf(" Item %d: %v\n", item, err)
}
}
}
}
This pattern enables:
- Batch Processing: Process items in groups for efficiency
- Partial Success: Continue processing despite some failures
- Detailed Error Reporting: Track exactly which items failed and why
- Graceful Degradation: The pipeline continues to make progress even with errors
These error handling patterns are essential for building resilient pipeline systems that can recover from failures and continue processing data.