Error Handling in Channel Communication

Robust concurrent systems require effective error handling strategies. Go’s channel-based concurrency introduces unique challenges for error propagation and handling.

Error Propagation Patterns

There are several patterns for propagating errors through channel-based systems:

package main

import (
	"errors"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Result represents a computation result or error
type Result struct {
	Value int
	Err   error
}

// errorPropagationBasic demonstrates the basic error propagation pattern
func errorPropagationBasic() {
	// Create a channel for results
	results := make(chan Result)
	
	// Start a worker that might encounter errors
	go func() {
		defer close(results)
		
		// Simulate work with possible errors
		for i := 0; i < 5; i++ {
			// 30% chance of error
			if rand.Float32() < 0.3 {
				results <- Result{Err: fmt.Errorf("error processing item %d", i)}
				continue
			}
			
			// Successful computation
			results <- Result{Value: i * 10}
		}
	}()
	
	// Process results, handling errors
	for result := range results {
		if result.Err != nil {
			fmt.Printf("Error: %v\n", result.Err)
		} else {
			fmt.Printf("Success: %d\n", result.Value)
		}
	}
}

// errorPropagationPipeline demonstrates error propagation through a pipeline
func errorPropagationPipeline() {
	// Create a pipeline with error propagation
	source := generateWithErrors(1, 10)
	processed := processWithErrors(source)
	
	// Consume results
	for result := range processed {
		if result.Err != nil {
			fmt.Printf("Pipeline error: %v\n", result.Err)
		} else {
			fmt.Printf("Pipeline result: %d\n", result.Value)
		}
	}
}

// generateWithErrors produces integers with possible errors
func generateWithErrors(start, end int) <-chan Result {
	out := make(chan Result)
	
	go func() {
		defer close(out)
		for i := start; i <= end; i++ {
			// 20% chance of error
			if rand.Float32() < 0.2 {
				out <- Result{Err: fmt.Errorf("failed to generate item %d", i)}
				continue
			}
			
			// Simulate work
			time.Sleep(50 * time.Millisecond)
			out <- Result{Value: i}
		}
	}()
	
	return out
}

// processWithErrors transforms values and propagates errors
func processWithErrors(in <-chan Result) <-chan Result {
	out := make(chan Result)
	
	go func() {
		defer close(out)
		for result := range in {
			// If input already has an error, propagate it
			if result.Err != nil {
				out <- result
				continue
			}
			
			// 10% chance of new error during processing
			if rand.Float32() < 0.1 {
				out <- Result{Err: fmt.Errorf("failed to process value %d", result.Value)}
				continue
			}
			
			// Successful processing
			time.Sleep(30 * time.Millisecond)
			out <- Result{Value: result.Value * result.Value}
		}
	}()
	
	return out
}

// errorAggregation demonstrates collecting and aggregating errors
func errorAggregation() {
	// Create multiple workers that might produce errors
	numWorkers := 5
	results := make(chan Result)
	var wg sync.WaitGroup
	
	// Start workers
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			// 40% chance of error
			if rand.Float32() < 0.4 {
				results <- Result{Err: fmt.Errorf("worker %d failed", id)}
				return
			}
			
			// Successful computation
			results <- Result{Value: id * 10}
		}(i)
	}
	
	// Close results channel when all workers are done
	go func() {
		wg.Wait()
		close(results)
	}()
	
	// Collect and aggregate results
	var successCount, errorCount int
	var errors []error
	var values []int
	
	for result := range results {
		if result.Err != nil {
			errorCount++
			errors = append(errors, result.Err)
		} else {
			successCount++
			values = append(values, result.Value)
		}
	}
	
	// Report aggregated results
	fmt.Printf("Successful operations: %d\n", successCount)
	fmt.Printf("Failed operations: %d\n", errorCount)
	
	if errorCount > 0 {
		fmt.Println("Errors encountered:")
		for _, err := range errors {
			fmt.Printf("  - %v\n", err)
		}
	}
	
	if successCount > 0 {
		fmt.Printf("Values: %v\n", values)
	}
}

// earlyTerminationOnError demonstrates stopping a pipeline on first error
func earlyTerminationOnError() {
	// Create a context for cancellation
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	
	// Create a pipeline with early termination
	source := generateWithContext(ctx, 1, 20)
	processed := processWithContext(ctx, source)
	
	// Consume results until first error
	for result := range processed {
		if result.Err != nil {
			fmt.Printf("Pipeline error: %v\n", result.Err)
			fmt.Println("Cancelling pipeline...")
			cancel() // Signal all stages to stop
			break
		}
		
		fmt.Printf("Pipeline result: %d\n", result.Value)
	}
	
	// Wait for pipeline to fully terminate
	time.Sleep(100 * time.Millisecond)
	fmt.Println("Pipeline terminated")
}

// generateWithContext produces integers with possible errors and respects context
func generateWithContext(ctx context.Context, start, end int) <-chan Result {
	out := make(chan Result)
	
	go func() {
		defer close(out)
		for i := start; i <= end; i++ {
			// Check for cancellation
			select {
			case <-ctx.Done():
				fmt.Println("Generator cancelled")
				return
			default:
				// Continue processing
			}
			
			// 10% chance of error
			if rand.Float32() < 0.1 {
				select {
				case out <- Result{Err: fmt.Errorf("failed to generate item %d", i)}:
					// Error sent
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// Simulate work
			time.Sleep(50 * time.Millisecond)
			
			// Send result
			select {
			case out <- Result{Value: i}:
				// Value sent
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

// processWithContext transforms values and respects context
func processWithContext(ctx context.Context, in <-chan Result) <-chan Result {
	out := make(chan Result)
	
	go func() {
		defer close(out)
		for {
			// Check for cancellation between items
			select {
			case <-ctx.Done():
				fmt.Println("Processor cancelled")
				return
			default:
				// Continue processing
			}
			
			// Try to receive next item
			var result Result
			var ok bool
			select {
			case result, ok = <-in:
				if !ok {
					return // Input channel closed
				}
			case <-ctx.Done():
				return // Context cancelled
			}
			
			// If input already has an error, propagate it
			if result.Err != nil {
				select {
				case out <- result:
					// Error propagated
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// 15% chance of new error during processing
			if rand.Float32() < 0.15 {
				select {
				case out <- Result{Err: fmt.Errorf("failed to process value %d", result.Value)}:
					// Error sent
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// Successful processing
			time.Sleep(30 * time.Millisecond)
			select {
			case out <- Result{Value: result.Value * result.Value}:
				// Result sent
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Demonstrate basic error propagation
	fmt.Println("=== Basic Error Propagation ===")
	errorPropagationBasic()
	
	// Demonstrate error propagation through a pipeline
	fmt.Println("\n=== Pipeline Error Propagation ===")
	errorPropagationPipeline()
	
	// Demonstrate error aggregation
	fmt.Println("\n=== Error Aggregation ===")
	errorAggregation()
	
	// Demonstrate early termination on error
	fmt.Println("\n=== Early Termination on Error ===")
	earlyTerminationOnError()
}

Key error handling patterns:

  1. Result type pattern: Combining results and errors in a single struct
  2. Error propagation: Passing errors through pipeline stages
  3. Error aggregation: Collecting and summarizing errors from multiple operations
  4. Early termination: Stopping all processing when an error occurs