Error Handling in Channel Communication
Robust concurrent systems require effective error handling strategies. Go’s channel-based concurrency introduces unique challenges for error propagation and handling.
Error Propagation Patterns
There are several patterns for propagating errors through channel-based systems:
package main
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
// Result represents a computation result or error
type Result struct {
Value int
Err error
}
// errorPropagationBasic demonstrates the basic error propagation pattern
func errorPropagationBasic() {
// Create a channel for results
results := make(chan Result)
// Start a worker that might encounter errors
go func() {
defer close(results)
// Simulate work with possible errors
for i := 0; i < 5; i++ {
// 30% chance of error
if rand.Float32() < 0.3 {
results <- Result{Err: fmt.Errorf("error processing item %d", i)}
continue
}
// Successful computation
results <- Result{Value: i * 10}
}
}()
// Process results, handling errors
for result := range results {
if result.Err != nil {
fmt.Printf("Error: %v\n", result.Err)
} else {
fmt.Printf("Success: %d\n", result.Value)
}
}
}
// errorPropagationPipeline demonstrates error propagation through a pipeline
func errorPropagationPipeline() {
// Create a pipeline with error propagation
source := generateWithErrors(1, 10)
processed := processWithErrors(source)
// Consume results
for result := range processed {
if result.Err != nil {
fmt.Printf("Pipeline error: %v\n", result.Err)
} else {
fmt.Printf("Pipeline result: %d\n", result.Value)
}
}
}
// generateWithErrors produces integers with possible errors
func generateWithErrors(start, end int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for i := start; i <= end; i++ {
// 20% chance of error
if rand.Float32() < 0.2 {
out <- Result{Err: fmt.Errorf("failed to generate item %d", i)}
continue
}
// Simulate work
time.Sleep(50 * time.Millisecond)
out <- Result{Value: i}
}
}()
return out
}
// processWithErrors transforms values and propagates errors
func processWithErrors(in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for result := range in {
// If input already has an error, propagate it
if result.Err != nil {
out <- result
continue
}
// 10% chance of new error during processing
if rand.Float32() < 0.1 {
out <- Result{Err: fmt.Errorf("failed to process value %d", result.Value)}
continue
}
// Successful processing
time.Sleep(30 * time.Millisecond)
out <- Result{Value: result.Value * result.Value}
}
}()
return out
}
// errorAggregation demonstrates collecting and aggregating errors
func errorAggregation() {
// Create multiple workers that might produce errors
numWorkers := 5
results := make(chan Result)
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 40% chance of error
if rand.Float32() < 0.4 {
results <- Result{Err: fmt.Errorf("worker %d failed", id)}
return
}
// Successful computation
results <- Result{Value: id * 10}
}(i)
}
// Close results channel when all workers are done
go func() {
wg.Wait()
close(results)
}()
// Collect and aggregate results
var successCount, errorCount int
var errors []error
var values []int
for result := range results {
if result.Err != nil {
errorCount++
errors = append(errors, result.Err)
} else {
successCount++
values = append(values, result.Value)
}
}
// Report aggregated results
fmt.Printf("Successful operations: %d\n", successCount)
fmt.Printf("Failed operations: %d\n", errorCount)
if errorCount > 0 {
fmt.Println("Errors encountered:")
for _, err := range errors {
fmt.Printf(" - %v\n", err)
}
}
if successCount > 0 {
fmt.Printf("Values: %v\n", values)
}
}
// earlyTerminationOnError demonstrates stopping a pipeline on first error
func earlyTerminationOnError() {
// Create a context for cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a pipeline with early termination
source := generateWithContext(ctx, 1, 20)
processed := processWithContext(ctx, source)
// Consume results until first error
for result := range processed {
if result.Err != nil {
fmt.Printf("Pipeline error: %v\n", result.Err)
fmt.Println("Cancelling pipeline...")
cancel() // Signal all stages to stop
break
}
fmt.Printf("Pipeline result: %d\n", result.Value)
}
// Wait for pipeline to fully terminate
time.Sleep(100 * time.Millisecond)
fmt.Println("Pipeline terminated")
}
// generateWithContext produces integers with possible errors and respects context
func generateWithContext(ctx context.Context, start, end int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for i := start; i <= end; i++ {
// Check for cancellation
select {
case <-ctx.Done():
fmt.Println("Generator cancelled")
return
default:
// Continue processing
}
// 10% chance of error
if rand.Float32() < 0.1 {
select {
case out <- Result{Err: fmt.Errorf("failed to generate item %d", i)}:
// Error sent
case <-ctx.Done():
return
}
continue
}
// Simulate work
time.Sleep(50 * time.Millisecond)
// Send result
select {
case out <- Result{Value: i}:
// Value sent
case <-ctx.Done():
return
}
}
}()
return out
}
// processWithContext transforms values and respects context
func processWithContext(ctx context.Context, in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for {
// Check for cancellation between items
select {
case <-ctx.Done():
fmt.Println("Processor cancelled")
return
default:
// Continue processing
}
// Try to receive next item
var result Result
var ok bool
select {
case result, ok = <-in:
if !ok {
return // Input channel closed
}
case <-ctx.Done():
return // Context cancelled
}
// If input already has an error, propagate it
if result.Err != nil {
select {
case out <- result:
// Error propagated
case <-ctx.Done():
return
}
continue
}
// 15% chance of new error during processing
if rand.Float32() < 0.15 {
select {
case out <- Result{Err: fmt.Errorf("failed to process value %d", result.Value)}:
// Error sent
case <-ctx.Done():
return
}
continue
}
// Successful processing
time.Sleep(30 * time.Millisecond)
select {
case out <- Result{Value: result.Value * result.Value}:
// Result sent
case <-ctx.Done():
return
}
}
}()
return out
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Demonstrate basic error propagation
fmt.Println("=== Basic Error Propagation ===")
errorPropagationBasic()
// Demonstrate error propagation through a pipeline
fmt.Println("\n=== Pipeline Error Propagation ===")
errorPropagationPipeline()
// Demonstrate error aggregation
fmt.Println("\n=== Error Aggregation ===")
errorAggregation()
// Demonstrate early termination on error
fmt.Println("\n=== Early Termination on Error ===")
earlyTerminationOnError()
}
Key error handling patterns:
- Result type pattern: Combining results and errors in a single struct
- Error propagation: Passing errors through pipeline stages
- Error aggregation: Collecting and summarizing errors from multiple operations
- Early termination: Stopping all processing when an error occurs