Explore sophisticated channel patterns that enable elegant, efficient communication between goroutines in complex concurrent applications.

#Channels are Go’s primary tool for goroutine communication. Most developers know the basics - sending and receiving values - but advanced patterns unlock much more powerful concurrent designs.

Beyond Basic Channels

Basic channel operations are straightforward:

ch := make(chan int)
go func() { ch <- 42 }()
value := <-ch

But real systems need more sophisticated coordination:

  • Pipeline Processing: Chain operations together
  • Fan-Out/Fan-In: Distribute work and collect results
  • Rate Limiting: Control operation flow
  • Timeouts: Handle operations that take too long
  • Cancellation: Stop work when it’s no longer needed

Common Channel Pitfalls

Before diving into patterns, understand the traps:

  • Deadlocks: Goroutines waiting forever for each other
  • Goroutine Leaks: Forgetting to close channels or handle cancellation
  • Race Conditions: Channels don’t solve all concurrency issues
  • Blocking Operations: Not handling channel operations that might block

Channel Patterns You’ll Learn

This guide covers practical channel patterns:

  1. Select Patterns: Non-blocking operations and timeouts
  2. Pipeline Patterns: Chaining processing stages
  3. Fan Patterns: Distributing and collecting work
  4. Cancellation Patterns: Stopping work gracefully
  5. Rate Limiting: Controlling operation frequency
  6. Worker Pools: Managing goroutines efficiently

Each pattern includes examples showing when and how to use it effectively.

When to Use Channels vs Sync Primitives

Channels excel at:

  • Passing data between goroutines
  • Coordinating goroutine lifecycles
  • Implementing timeouts and cancellation
  • Building processing pipelines

Use sync primitives (mutexes, etc.) for:

  • Protecting shared state
  • Simple coordination (WaitGroup)
  • Performance-critical sections

Channel Fundamentals and Best Practices

Before diving into advanced patterns, it’s crucial to establish a solid understanding of channel fundamentals and best practices. These core concepts form the foundation upon which more complex patterns are built.

Channel Types and Directionality

Go channels can be bidirectional or unidirectional, with the latter providing important compile-time safety guarantees:

package main

import (
	"fmt"
	"time"
)

// produceValues demonstrates a function that only sends on a channel
func produceValues(ch chan<- int) {
	for i := 0; i < 5; i++ {
		fmt.Printf("Sending: %d\n", i)
		ch <- i
		time.Sleep(100 * time.Millisecond)
	}
	close(ch) // Producer is responsible for closing
}

// consumeValues demonstrates a function that only receives from a channel
func consumeValues(ch <-chan int) {
	// Using range loop automatically handles channel closure
	for val := range ch {
		fmt.Printf("Received: %d\n", val)
	}
}

func main() {
	// Create a bidirectional channel
	ch := make(chan int)
	
	// Start producer and consumer goroutines
	go produceValues(ch) // Channel converted to send-only
	go consumeValues(ch) // Channel converted to receive-only
	
	// Wait for completion
	time.Sleep(1 * time.Second)
	fmt.Println("Done")
}

This example demonstrates several best practices:

  1. Explicit directionality: Functions declare whether they intend to send (chan<-) or receive (<-chan), making the code’s intent clear and preventing accidental misuse.
  2. Producer responsibility: The producer (sender) is responsible for closing the channel when no more values will be sent.
  3. Range loop for consumers: Using range to receive values automatically handles channel closure.

Buffered vs. Unbuffered Channels

The choice between buffered and unbuffered channels significantly impacts program behavior:

package main

import (
	"fmt"
	"time"
)

func bufferingDemo() {
	fmt.Println("Unbuffered channel demonstration:")
	unbuffered := make(chan int)
	
	go func() {
		fmt.Println("Sender: Attempting to send")
		unbuffered <- 42
		fmt.Println("Sender: Send completed")
	}()
	
	// Give sender time to attempt sending
	time.Sleep(100 * time.Millisecond)
	fmt.Println("Receiver: About to receive")
	value := <-unbuffered
	fmt.Printf("Receiver: Received value %d\n", value)
	
	fmt.Println("\nBuffered channel demonstration:")
	buffered := make(chan int, 2)
	
	go func() {
		for i := 0; i < 3; i++ {
			fmt.Printf("Sender: Sending value %d\n", i)
			buffered <- i
			fmt.Printf("Sender: Sent value %d\n", i)
		}
	}()
	
	// Give sender time to send values
	time.Sleep(100 * time.Millisecond)
	
	for i := 0; i < 3; i++ {
		fmt.Println("Receiver: About to receive")
		value := <-buffered
		fmt.Printf("Receiver: Received value %d\n", value)
		time.Sleep(100 * time.Millisecond)
	}
}

func main() {
	bufferingDemo()
	time.Sleep(500 * time.Millisecond) // Ensure all output is printed
}

Key differences to understand:

  1. Unbuffered channels (capacity 0) synchronize the sender and receiver—the sender blocks until a receiver is ready to receive the value.
  2. Buffered channels allow senders to proceed without an immediate receiver, up to the buffer’s capacity.
  3. Blocking behavior: Once a buffered channel is full, senders block until space becomes available.

Channel Closure and the nil Channel

Understanding channel closure and nil channel behavior is critical for advanced patterns:

package main

import (
	"fmt"
	"time"
)

func channelClosureDemo() {
	ch := make(chan int)
	
	// Sender goroutine
	go func() {
		for i := 0; i < 5; i++ {
			ch <- i
		}
		close(ch)
		fmt.Println("Sender: Channel closed")
	}()
	
	// Receiver loop - continues until channel is closed
	for {
		value, ok := <-ch
		if !ok {
			fmt.Println("Receiver: Channel closed detected")
			break
		}
		fmt.Printf("Receiver: Got value %d\n", value)
	}
	
	// Demonstrate behavior of closed and nil channels
	closedCh := make(chan int)
	close(closedCh)
	
	// Reading from closed channel returns zero value
	val, ok := <-closedCh
	fmt.Printf("Reading from closed channel: value=%d, ok=%v\n", val, ok)
	
	// Writing to closed channel panics
	// closedCh <- 1 // This would panic
	
	// Nil channel operations block forever
	var nilCh chan int // nil channel
	
	// Demonstrating nil channel with timeout
	go func() {
		fmt.Println("Attempting to read from nil channel (will block forever)")
		// <-nilCh // This would block forever
	}()
}

func main() {
	channelClosureDemo()
	time.Sleep(500 * time.Millisecond) // Ensure all output is printed
}

Important principles:

  1. Closed channel behavior:
    • Reading from a closed channel returns the zero value and ok=false
    • Writing to a closed channel causes a panic
    • Closing an already closed channel causes a panic
  2. Nil channel behavior:
    • Operations on nil channels block forever
    • This property is useful in select statements for disabling cases

Channel Ownership Principles

Clear channel ownership is essential for preventing concurrency bugs:

package main

import (
	"fmt"
	"sync"
)

// ChannelOwner demonstrates the channel ownership pattern
type ChannelOwner struct {
	values chan int
	done   chan struct{}
}

// NewChannelOwner creates and returns a new ChannelOwner
// The constructor is the only place where the channels are created
func NewChannelOwner() *ChannelOwner {
	return &ChannelOwner{
		values: make(chan int),
		done:   make(chan struct{}),
	}
}

// Start begins producing values and returns a receive-only channel
// The owner starts the producer goroutine and manages its lifecycle
func (co *ChannelOwner) Start() <-chan int {
	go func() {
		defer close(co.values) // Owner ensures channel is closed
		
		for i := 0; i < 5; i++ {
			select {
			case co.values <- i:
				// Value sent successfully
			case <-co.done:
				fmt.Println("Producer received cancellation signal")
				return
			}
		}
	}()
	
	return co.values // Return receive-only channel to consumers
}

// Stop signals the producer to stop and cleans up resources
func (co *ChannelOwner) Stop() {
	close(co.done)
}

func main() {
	// Create owner and start production
	owner := NewChannelOwner()
	valuesCh := owner.Start()
	
	// Consume values
	var wg sync.WaitGroup
	wg.Add(1)
	
	go func() {
		defer wg.Done()
		for value := range valuesCh {
			fmt.Printf("Received: %d\n", value)
		}
		fmt.Println("Consumer finished")
	}()
	
	// Let it run for a bit, then stop
	// In a real application, this might be triggered by a timeout or user action
	for i := 0; i < 3; i++ {
		<-valuesCh
	}
	
	owner.Stop()
	wg.Wait()
}

Key ownership principles:

  1. Single writer principle: Only one goroutine should write to a channel
  2. Clear ownership: The owner creates, writes to, and closes the channel
  3. Consumers only read: Consumers should only read from channels, never close them
  4. Encapsulation: Hide channel creation and management inside constructors and methods

These fundamentals provide the foundation for the advanced patterns we’ll explore next. By adhering to these principles, you can avoid many common concurrency pitfalls and build more reliable concurrent systems.

Advanced Channel Patterns

Building on the fundamentals, we can now explore more sophisticated channel patterns that solve complex concurrency challenges. These patterns provide reusable solutions for common concurrent programming scenarios.

Select Statement Patterns

The select statement is one of Go’s most powerful concurrency primitives, enabling non-blocking operations and coordination between multiple channels:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// timeoutOperation demonstrates using select for timeouts
func timeoutOperation() {
	ch := make(chan string)
	
	go func() {
		// Simulate work that takes random time
		delay := time.Duration(rand.Intn(500)) * time.Millisecond
		time.Sleep(delay)
		ch <- fmt.Sprintf("Operation completed in %v", delay)
	}()
	
	// Wait for result or timeout
	select {
	case result := <-ch:
		fmt.Println("Success:", result)
	case <-time.After(300 * time.Millisecond):
		fmt.Println("Operation timed out")
	}
}

// nonBlockingReceive demonstrates non-blocking channel operations
func nonBlockingReceive(ch chan string) {
	select {
	case msg := <-ch:
		fmt.Println("Received message:", msg)
	default:
		fmt.Println("No message available")
	}
}

// nonBlockingSend demonstrates non-blocking send operation
func nonBlockingSend(ch chan string, msg string) {
	select {
	case ch <- msg:
		fmt.Println("Sent message:", msg)
	default:
		fmt.Println("Cannot send message, channel full or no receivers")
	}
}

// prioritySelect demonstrates channel priority using select
func prioritySelect(high, medium, low chan string) {
	for {
		select {
		case msg := <-high:
			fmt.Println("High priority:", msg)
			return
		default:
			// Continue to nested select
		}
		
		select {
		case msg := <-high:
			fmt.Println("High priority:", msg)
			return
		case msg := <-medium:
			fmt.Println("Medium priority:", msg)
			return
		default:
			// Continue to final select
		}
		
		select {
		case msg := <-high:
			fmt.Println("High priority:", msg)
		case msg := <-medium:
			fmt.Println("Medium priority:", msg)
		case msg := <-low:
			fmt.Println("Low priority:", msg)
		case <-time.After(500 * time.Millisecond):
			fmt.Println("Timed out waiting for messages")
		}
		return
	}
}

// dynamicSelectCases demonstrates how to handle a dynamic number of channels
func dynamicSelectCases(channels []chan int) {
	cases := make([]reflect.SelectCase, len(channels))
	for i, ch := range channels {
		cases[i] = reflect.SelectCase{
			Dir:  reflect.SelectRecv,
			Chan: reflect.ValueOf(ch),
		}
	}
	
	// Add timeout case
	cases = append(cases, reflect.SelectCase{
		Dir:  reflect.SelectRecv,
		Chan: reflect.ValueOf(time.After(1 * time.Second)),
	})
	
	// Wait for any channel to receive
	chosen, value, ok := reflect.Select(cases)
	if chosen == len(cases)-1 {
		fmt.Println("Timed out")
		return
	}
	
	if ok {
		fmt.Printf("Received value %v from channel %d\n", value.Int(), chosen)
	} else {
		fmt.Printf("Channel %d was closed\n", chosen)
	}
}

func main() {
	// Demonstrate timeout pattern
	fmt.Println("=== Timeout Pattern ===")
	timeoutOperation()
	
	// Demonstrate non-blocking operations
	fmt.Println("\n=== Non-blocking Operations ===")
	ch := make(chan string, 1)
	nonBlockingReceive(ch)
	nonBlockingSend(ch, "Hello")
	nonBlockingReceive(ch)
	nonBlockingSend(ch, "World") // Channel is now full
	
	// Demonstrate priority selection
	fmt.Println("\n=== Priority Selection ===")
	high := make(chan string, 1)
	medium := make(chan string, 1)
	low := make(chan string, 1)
	
	// Try different scenarios
	medium <- "Medium message"
	prioritySelect(high, medium, low)
	
	// For dynamic select case demo, we need the reflect package
	fmt.Println("\n=== Dynamic Select Cases ===")
	channels := make([]chan int, 3)
	for i := range channels {
		channels[i] = make(chan int)
		i := i // Create new variable to avoid closure problem
		go func() {
			time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
			channels[i] <- i
		}()
	}
	
	// Import reflect package at the top if running this code
	// dynamicSelectCases(channels)
	fmt.Println("(Dynamic select cases require the reflect package)")
}

Key select patterns:

  1. Timeout pattern: Combine a work channel with time.After() to implement timeouts
  2. Non-blocking operations: Use the default case to make channel operations non-blocking
  3. Priority selection: Nest multiple select statements to implement channel priority
  4. Dynamic cases: Use the reflect package to handle a dynamic number of channels

Fan-Out and Fan-In Patterns

Fan-out/fan-in is a powerful pattern for parallel processing that distributes work across multiple goroutines and then consolidates the results:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// worker processes items from input and sends results to output
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	
	for job := range jobs {
		fmt.Printf("Worker %d processing job %d\n", id, job)
		time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // Simulate work
		results <- job * 2                                           // Send result
	}
}

// fanOut distributes work across multiple workers
func fanOut(jobs <-chan int, numWorkers int) <-chan int {
	results := make(chan int)
	var wg sync.WaitGroup
	
	// Start workers
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, results, &wg)
	}
	
	// Close results channel when all workers are done
	go func() {
		wg.Wait()
		close(results)
		fmt.Println("All workers completed")
	}()
	
	return results
}

// fanIn merges multiple channels into a single channel
func fanIn(channels ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	multiplexed := make(chan int)
	
	// Function to forward values from input channel to multiplexed channel
	forward := func(ch <-chan int) {
		defer wg.Done()
		for val := range ch {
			multiplexed <- val
		}
	}
	
	// Start a goroutine for each input channel
	wg.Add(len(channels))
	for _, ch := range channels {
		go forward(ch)
	}
	
	// Close multiplexed channel when all input channels are done
	go func() {
		wg.Wait()
		close(multiplexed)
	}()
	
	return multiplexed
}

// fanInReflect uses reflection to merge channels dynamically
func fanInReflect(channels ...<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup
	wg.Add(len(channels))
	
	for _, c := range channels {
		go func(c <-chan int) {
			defer wg.Done()
			for n := range c {
				out <- n
			}
		}(c)
	}
	
	go func() {
		wg.Wait()
		close(out)
	}()
	
	return out
}

func main() {
	// Create jobs channel and send jobs
	jobs := make(chan int, 10)
	for i := 1; i <= 10; i++ {
		jobs <- i
	}
	close(jobs)
	
	// Fan out to 3 workers
	fmt.Println("Starting fan-out with 3 workers")
	results := fanOut(jobs, 3)
	
	// Collect and print results
	for result := range results {
		fmt.Printf("Got result: %d\n", result)
	}
	
	// Demonstrate fan-in with multiple channels
	fmt.Println("\nDemonstrating fan-in pattern")
	ch1 := make(chan int)
	ch2 := make(chan int)
	ch3 := make(chan int)
	
	// Send values on each channel
	go func() {
		for i := 1; i <= 3; i++ {
			ch1 <- i * 10
			time.Sleep(100 * time.Millisecond)
		}
		close(ch1)
	}()
	
	go func() {
		for i := 1; i <= 3; i++ {
			ch2 <- i * 100
			time.Sleep(150 * time.Millisecond)
		}
		close(ch2)
	}()
	
	go func() {
		for i := 1; i <= 3; i++ {
			ch3 <- i * 1000
			time.Sleep(80 * time.Millisecond)
		}
		close(ch3)
	}()
	
	// Fan-in the channels
	merged := fanIn(ch1, ch2, ch3)
	
	// Print merged results
	for result := range merged {
		fmt.Printf("Merged result: %d\n", result)
	}
}

The fan-out/fan-in pattern is particularly useful for:

  1. CPU-bound tasks: Distributing computation across multiple cores
  2. I/O-bound tasks: Managing multiple concurrent I/O operations
  3. Rate limiting: Controlling the degree of parallelism
  4. Batch processing: Processing large datasets in parallel chunks

Pipeline Pattern

Pipelines compose a series of processing stages connected by channels, allowing data to flow through multiple transformations:

package main

import (
	"fmt"
	"math/rand"
	"sync"
)

// generator creates a channel that emits the provided integers
func generator(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for _, n := range nums {
			out <- n
		}
	}()
	return out
}

// square receives integers, squares them, and sends them to a returned channel
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * n
		}
	}()
	return out
}

// filter receives integers and sends only those that satisfy the predicate function
func filter(in <-chan int, predicate func(int) bool) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			if predicate(n) {
				out <- n
			}
		}
	}()
	return out
}

// merge combines multiple channels into a single channel
func merge(cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)
	
	// Start an output goroutine for each input channel
	output := func(c <-chan int) {
		defer wg.Done()
		for n := range c {
			out <- n
		}
	}
	
	wg.Add(len(cs))
	for _, c := range cs {
		go output(c)
	}
	
	// Close the output channel when all input channels are done
	go func() {
		wg.Wait()
		close(out)
	}()
	
	return out
}

// batchProcessor demonstrates processing data in batches
func batchProcessor(in <-chan int, batchSize int) <-chan []int {
	out := make(chan []int)
	
	go func() {
		defer close(out)
		
		batch := make([]int, 0, batchSize)
		for n := range in {
			batch = append(batch, n)
			
			// When batch is full, send it and create a new one
			if len(batch) == batchSize {
				out <- batch
				batch = make([]int, 0, batchSize)
			}
		}
		
		// Send any remaining items in the last batch
		if len(batch) > 0 {
			out <- batch
		}
	}()
	
	return out
}

func main() {
	// Create a pipeline: generate -> square -> filter -> print
	fmt.Println("=== Basic Pipeline ===")
	c := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
	c = square(c)
	c = filter(c, func(n int) bool {
		return n%2 == 0 // Only even numbers
	})
	
	// Consume the output
	for n := range c {
		fmt.Println(n)
	}
	
	// Demonstrate a more complex pipeline with multiple sources and fan-in
	fmt.Println("\n=== Multi-source Pipeline with Fan-in ===")
	
	// Create multiple sources
	c1 := generator(1, 2, 3)
	c2 := generator(4, 5, 6)
	c3 := generator(7, 8, 9)
	
	// Process each source
	c1 = square(c1)
	c2 = square(c2)
	c3 = square(c3)
	
	// Merge the results
	merged := merge(c1, c2, c3)
	
	// Consume the merged output
	for n := range merged {
		fmt.Println(n)
	}
	
	// Demonstrate batch processing
	fmt.Println("\n=== Batch Processing Pipeline ===")
	
	// Generate 10 random numbers
	source := make(chan int)
	go func() {
		defer close(source)
		for i := 0; i < 10; i++ {
			source <- rand.Intn(100)
		}
	}()
	
	// Process in batches of 3
	batches := batchProcessor(source, 3)
	
	// Consume and process batches
	for batch := range batches {
		fmt.Printf("Processing batch: %v\n", batch)
		sum := 0
		for _, n := range batch {
			sum += n
		}
		fmt.Printf("Batch sum: %d\n", sum)
	}
}

Key pipeline characteristics:

  1. Composability: Each stage performs a specific transformation and can be composed with other stages
  2. Unidirectional flow: Data flows in one direction through the pipeline
  3. Concurrent execution: Each stage runs in its own goroutine
  4. Bounded stages: Each stage only processes one item at a time, providing natural backpressure

Worker Pool Pattern

Worker pools manage a fixed number of goroutines to process tasks from a queue:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Task represents a unit of work
type Task struct {
	ID     int
	Data   interface{}
	Result interface{}
	Err    error
}

// WorkerPool manages a pool of workers
type WorkerPool struct {
	tasks       chan Task
	results     chan Task
	concurrency int
	wg          sync.WaitGroup
	ctx         context.Context
	cancel      context.CancelFunc
}

// NewWorkerPool creates a new worker pool with the specified concurrency
func NewWorkerPool(concurrency int) *WorkerPool {
	ctx, cancel := context.WithCancel(context.Background())
	return &WorkerPool{
		tasks:       make(chan Task),
		results:     make(chan Task),
		concurrency: concurrency,
		ctx:         ctx,
		cancel:      cancel,
	}
}

// Start launches the worker pool
func (p *WorkerPool) Start() {
	// Start workers
	for i := 0; i < p.concurrency; i++ {
		p.wg.Add(1)
		go p.worker(i)
	}
	
	// Start result collector
	go func() {
		p.wg.Wait()
		close(p.results)
	}()
}

// worker processes tasks from the task channel
func (p *WorkerPool) worker(id int) {
	defer p.wg.Done()
	
	for {
		select {
		case task, ok := <-p.tasks:
			if !ok {
				return // Task channel closed
			}
			
			// Process the task
			fmt.Printf("Worker %d processing task %d\n", id, task.ID)
			time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // Simulate work
			
			// Generate result (in a real application, this would be actual processing)
			task.Result = task.Data.(int) * 2
			
			// Send result
			select {
			case p.results <- task:
				// Result sent successfully
			case <-p.ctx.Done():
				return // Context cancelled
			}
			
		case <-p.ctx.Done():
			return // Context cancelled
		}
	}
}

// Submit adds a task to the worker pool
func (p *WorkerPool) Submit(task Task) {
	select {
	case p.tasks <- task:
		// Task submitted successfully
	case <-p.ctx.Done():
		// Pool is shutting down
	}
}

// Results returns the channel of completed tasks
func (p *WorkerPool) Results() <-chan Task {
	return p.results
}

// Stop gracefully shuts down the worker pool
func (p *WorkerPool) Stop() {
	p.cancel()  // Signal all workers to stop
	close(p.tasks) // Close task channel
}

// ThrottledWorkerPool extends WorkerPool with rate limiting
type ThrottledWorkerPool struct {
	*WorkerPool
	rate  time.Duration // Minimum time between task submissions
	limit chan struct{} // Semaphore for limiting concurrent tasks
}

// NewThrottledWorkerPool creates a new rate-limited worker pool
func NewThrottledWorkerPool(concurrency int, rate time.Duration, maxQueued int) *ThrottledWorkerPool {
	return &ThrottledWorkerPool{
		WorkerPool: NewWorkerPool(concurrency),
		rate:       rate,
		limit:      make(chan struct{}, maxQueued),
	}
}

// Submit adds a task to the throttled worker pool, respecting rate limits
func (p *ThrottledWorkerPool) Submit(task Task) {
	// Add to limit before submitting
	p.limit <- struct{}{}
	
	go func() {
		defer func() { <-p.limit }() // Release limit when done
		
		// Submit to underlying pool
		p.WorkerPool.Submit(task)
		
		// Enforce rate limit
		time.Sleep(p.rate)
	}()
}

func main() {
	// Create a worker pool with 3 workers
	fmt.Println("=== Basic Worker Pool ===")
	pool := NewWorkerPool(3)
	pool.Start()
	
	// Submit 10 tasks
	for i := 0; i < 10; i++ {
		pool.Submit(Task{
			ID:   i,
			Data: i,
		})
	}
	
	// Close the task channel to signal no more tasks
	pool.Stop()
	
	// Collect results
	for task := range pool.Results() {
		fmt.Printf("Task %d result: %v\n", task.ID, task.Result)
	}
	
	// Demonstrate throttled worker pool
	fmt.Println("\n=== Throttled Worker Pool ===")
	throttled := NewThrottledWorkerPool(3, 200*time.Millisecond, 5)
	throttled.Start()
	
	// Submit tasks rapidly,
	// Submit tasks rapidly, but they'll be rate-limited
	for i := 0; i < 10; i++ {
		throttled.Submit(Task{
			ID:   i,
			Data: i,
		})
		fmt.Printf("Submitted task %d\n", i)
	}
	
	// Wait for a bit to see the rate limiting in action
	time.Sleep(1 * time.Second)
	
	// Stop the pool
	throttled.Stop()
	
	// Collect results
	for task := range throttled.Results() {
		fmt.Printf("Throttled task %d result: %v\n", task.ID, task.Result)
	}
}

The worker pool pattern is particularly useful for:

  1. Controlling concurrency: Limiting the number of concurrent operations to prevent resource exhaustion
  2. Load balancing: Distributing work evenly across available resources
  3. Backpressure handling: Managing the flow of work when producers are faster than consumers
  4. Resource management: Efficiently utilizing system resources like CPU cores, network connections, or database connections

Channel-Based Semaphores

Channels can be used as semaphores to limit concurrent access to resources:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Semaphore represents a counting semaphore
type Semaphore chan struct{}

// Acquire n resources from the semaphore
func (s Semaphore) Acquire(n int) {
	for i := 0; i < n; i++ {
		s <- struct{}{}
	}
}

// Release n resources back to the semaphore
func (s Semaphore) Release(n int) {
	for i := 0; i < n; i++ {
		<-s
	}
}

// TryAcquire attempts to acquire n resources without blocking
// Returns true if successful, false if would block
func (s Semaphore) TryAcquire(n int) bool {
	select {
	case s <- struct{}{}:
		if n > 1 {
			// For multiple resources, we need to be careful
			// If we can't acquire all, release what we've acquired
			if !s.TryAcquire(n - 1) {
				<-s // Release the one we just acquired
				return false
			}
		}
		return true
	default:
		return false
	}
}

// NewSemaphore creates a new semaphore with the given capacity
func NewSemaphore(capacity int) Semaphore {
	return make(Semaphore, capacity)
}

// simulateResourceUsage demonstrates using a semaphore to limit concurrent access
func simulateResourceUsage(id int, sem Semaphore, wg *sync.WaitGroup) {
	defer wg.Done()
	
	// Try to acquire the resource
	fmt.Printf("Client %d: Attempting to acquire resource\n", id)
	
	// Try non-blocking acquire first
	if sem.TryAcquire(1) {
		fmt.Printf("Client %d: Immediately acquired resource\n", id)
	} else {
		fmt.Printf("Client %d: Waiting for resource...\n", id)
		sem.Acquire(1)
		fmt.Printf("Client %d: Eventually acquired resource\n", id)
	}
	
	// Use the resource
	fmt.Printf("Client %d: Using resource\n", id)
	time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
	
	// Release the resource
	fmt.Printf("Client %d: Releasing resource\n", id)
	sem.Release(1)
}

func main() {
	// Create a semaphore with capacity 3
	sem := NewSemaphore(3)
	
	// Create 10 clients that will try to use the resource
	var wg sync.WaitGroup
	for i := 1; i <= 10; i++ {
		wg.Add(1)
		go simulateResourceUsage(i, sem, &wg)
		time.Sleep(50 * time.Millisecond) // Stagger client starts
	}
	
	// Wait for all clients to finish
	wg.Wait()
	fmt.Println("All clients finished")
}

Channel-based semaphores are useful for:

  1. Connection pooling: Limiting the number of concurrent connections to a resource
  2. Rate limiting: Controlling the rate of operations
  3. Resource protection: Preventing resource exhaustion by limiting concurrent access
  4. Concurrency control: Implementing more complex synchronization patterns

Channel Orchestration Techniques

Beyond individual patterns, channels can be orchestrated to create sophisticated concurrent systems. These techniques combine multiple patterns to solve complex coordination problems.

Timeout and Cancellation Patterns

Proper timeout and cancellation handling is essential for robust concurrent systems:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// timeoutOperation demonstrates a simple timeout pattern
func timeoutOperation(timeout time.Duration) (result string, err error) {
	// Create a channel for the operation result
	resultCh := make(chan string, 1)
	
	// Start the operation in a goroutine
	go func() {
		// Simulate work
		time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
		resultCh <- "Operation completed successfully"
	}()
	
	// Wait for result or timeout
	select {
	case result = <-resultCh:
		return result, nil
	case <-time.After(timeout):
		return "", fmt.Errorf("operation timed out after %v", timeout)
	}
}

// contextAwareOperation demonstrates using context for cancellation
func contextAwareOperation(ctx context.Context) (string, error) {
	// Create a channel for the operation result
	resultCh := make(chan string, 1)
	
	// Start the operation in a goroutine
	go func() {
		// Simulate work with periodic cancellation checks
		for i := 0; i < 5; i++ {
			select {
			case <-ctx.Done():
				// Context was cancelled, abort operation
				return
			case <-time.After(200 * time.Millisecond):
				// Continue working
				fmt.Println("Operation in progress...")
			}
		}
		
		resultCh <- "Operation completed successfully"
	}()
	
	// Wait for result or cancellation
	select {
	case result := <-resultCh:
		return result, nil
	case <-ctx.Done():
		return "", ctx.Err()
	}
}

// multiStageTimeout demonstrates different timeouts for different stages
func multiStageTimeout() error {
	// Stage 1: Connect (short timeout)
	connectCtx, cancelConnect := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancelConnect()
	
	fmt.Println("Stage 1: Connecting...")
	if err := stageOperation(connectCtx, "connect", 300); err != nil {
		return fmt.Errorf("connect failed: %w", err)
	}
	
	// Stage 2: Process (longer timeout)
	processCtx, cancelProcess := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancelProcess()
	
	fmt.Println("Stage 2: Processing...")
	if err := stageOperation(processCtx, "process", 1000); err != nil {
		return fmt.Errorf("process failed: %w", err)
	}
	
	// Stage 3: Finalize (medium timeout)
	finalizeCtx, cancelFinalize := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancelFinalize()
	
	fmt.Println("Stage 3: Finalizing...")
	if err := stageOperation(finalizeCtx, "finalize", 500); err != nil {
		return fmt.Errorf("finalize failed: %w", err)
	}
	
	fmt.Println("All stages completed successfully")
	return nil
}

// stageOperation simulates a stage in a multi-stage operation
func stageOperation(ctx context.Context, name string, maxDuration int) error {
	// Simulate work with random duration
	duration := time.Duration(rand.Intn(maxDuration)) * time.Millisecond
	
	select {
	case <-time.After(duration):
		fmt.Printf("Stage '%s' completed in %v\n", name, duration)
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

// propagatingCancellation demonstrates how cancellation propagates through a pipeline
func propagatingCancellation() {
	// Create a cancellable context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // Ensure cancellation in all cases
	
	// Create a pipeline with this context
	nums := generateNumbers(ctx, 1, 100)
	squares := squareNumbers(ctx, nums)
	results := filterNumbers(ctx, squares, func(n int) bool {
		return n%10 == 0 // Only numbers divisible by 10
	})
	
	// Process results for a while, then cancel
	go func() {
		time.Sleep(500 * time.Millisecond)
		fmt.Println("Cancelling pipeline...")
		cancel()
	}()
	
	// Consume results until cancellation
	for result := range results {
		fmt.Printf("Result: %d\n", result)
		time.Sleep(100 * time.Millisecond) // Slow consumer
	}
	
	fmt.Println("Pipeline terminated")
}

// generateNumbers produces a sequence of integers, respecting context cancellation
func generateNumbers(ctx context.Context, start, end int) <-chan int {
	out := make(chan int)
	
	go func() {
		defer close(out)
		for i := start; i <= end; i++ {
			select {
			case <-ctx.Done():
				fmt.Println("Generator cancelled")
				return
			case out <- i:
				// Successfully sent value
			}
		}
	}()
	
	return out
}

// squareNumbers transforms input numbers by squaring them
func squareNumbers(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	
	go func() {
		defer close(out)
		for n := range in {
			select {
			case <-ctx.Done():
				fmt.Println("Square operation cancelled")
				return
			case out <- n * n:
				// Successfully sent value
			}
		}
	}()
	
	return out
}

// filterNumbers filters numbers based on a predicate function
func filterNumbers(ctx context.Context, in <-chan int, predicate func(int) bool) <-chan int {
	out := make(chan int)
	
	go func() {
		defer close(out)
		for n := range in {
			if predicate(n) {
				select {
				case <-ctx.Done():
					fmt.Println("Filter operation cancelled")
					return
				case out <- n:
					// Successfully sent value
				}
			}
		}
	}()
	
	return out
}

func main() {
	// Demonstrate simple timeout
	fmt.Println("=== Simple Timeout Pattern ===")
	result, err := timeoutOperation(500 * time.Millisecond)
	if err != nil {
		fmt.Printf("Error: %v\n", err)
	} else {
		fmt.Printf("Result: %s\n", result)
	}
	
	// Demonstrate context cancellation
	fmt.Println("\n=== Context Cancellation Pattern ===")
	ctx, cancel := context.WithTimeout(context.Background(), 700*time.Millisecond)
	defer cancel()
	
	result, err = contextAwareOperation(ctx)
	if err != nil {
		fmt.Printf("Error: %v\n", err)
	} else {
		fmt.Printf("Result: %s\n", result)
	}
	
	// Demonstrate multi-stage timeout
	fmt.Println("\n=== Multi-stage Timeout Pattern ===")
	if err := multiStageTimeout(); err != nil {
		fmt.Printf("Error: %v\n", err)
	}
	
	// Demonstrate cancellation propagation
	fmt.Println("\n=== Cancellation Propagation Pattern ===")
	propagatingCancellation()
}

Key timeout and cancellation patterns:

  1. Simple timeout: Using select with time.After to implement timeouts
  2. Context-based cancellation: Using Go’s context package for cancellation propagation
  3. Multi-stage timeouts: Different timeouts for different stages of an operation
  4. Cancellation propagation: Ensuring cancellation signals flow through all parts of a system

Coordinating Multiple Goroutines

Complex concurrent systems often require sophisticated coordination between multiple goroutines:

package main

import (
	"fmt"
	"sync"
	"time"
)

// Barrier synchronizes multiple goroutines to a common point
type Barrier struct {
	count    int
	mutex    sync.Mutex
	cond     *sync.Cond
	required int
}

// NewBarrier creates a new barrier that waits for the required number of goroutines
func NewBarrier(required int) *Barrier {
	b := &Barrier{required: required}
	b.cond = sync.NewCond(&b.mutex)
	return b
}

// Wait blocks until all goroutines have reached the barrier
func (b *Barrier) Wait() {
	b.mutex.Lock()
	defer b.mutex.Unlock()
	
	b.count++
	if b.count == b.required {
		// Last goroutine to arrive, reset and broadcast
		b.count = 0
		b.cond.Broadcast()
	} else {
		// Wait for all goroutines to arrive
		b.cond.Wait()
	}
}

// WorkGroup coordinates multiple goroutines with phases
type WorkGroup struct {
	size      int
	wg        sync.WaitGroup
	startChan chan struct{}
	doneChan  chan struct{}
	barriers  []*Barrier
}

// NewWorkGroup creates a new work group for coordinating goroutines
func NewWorkGroup(size int, phases int) *WorkGroup {
	wg := &WorkGroup{
		size:      size,
		startChan: make(chan struct{}),
		doneChan:  make(chan struct{}),
		barriers:  make([]*Barrier, phases),
	}
	
	// Create barriers for each phase
	for i := 0; i < phases; i++ {
		wg.barriers[i] = NewBarrier(size)
	}
	
	return wg
}

// Start launches the work group
func (wg *WorkGroup) Start(work func(id, phase int)) {
	// Start all workers
	wg.wg.Add(wg.size)
	for i := 0; i < wg.size; i++ {
		go func(id int) {
			defer wg.wg.Done()
			
			// Wait for start signal
			<-wg.startChan
			
			// Execute work through all phases
			for phase := 0; phase < len(wg.barriers); phase++ {
				work(id, phase)
				wg.barriers[phase].Wait() // Synchronize at barrier
			}
		}(i)
	}
	
	// Signal all goroutines to start
	close(wg.startChan)
	
	// Wait for all to complete in a separate goroutine
	go func() {
		wg.wg.Wait()
		close(wg.doneChan)
	}()
}

// Wait blocks until all work is complete
func (wg *WorkGroup) Wait() {
	<-wg.doneChan
}

// Rendezvous coordinates pairs of goroutines
type Rendezvous struct {
	firstArrived  chan int
	secondArrived chan struct{}
}

// NewRendezvous creates a new rendezvous point
func NewRendezvous() *Rendezvous {
	return &Rendezvous{
		firstArrived:  make(chan int),
		secondArrived: make(chan struct{}),
	}
}

// Arrive waits for a pair of goroutines to meet
// Returns the ID of the first to arrive if this is the second arrival
func (r *Rendezvous) Arrive(id int) (int, bool) {
	select {
	case r.firstArrived <- id:
		// We're first to arrive, wait for second
		<-r.secondArrived
		return 0, false
	case firstID := <-r.firstArrived:
		// We're second to arrive, signal first
		close(r.secondArrived)
		return firstID, true
	}
}

func main() {
	// Demonstrate barrier synchronization
	fmt.Println("=== Barrier Synchronization ===")
	barrier := NewBarrier(3)
	
	for i := 0; i < 3; i++ {
		go func(id int) {
			fmt.Printf("Goroutine %d starting\n", id)
			time.Sleep(time.Duration(id*300) * time.Millisecond) // Different work times
			fmt.Printf("Goroutine %d arriving at barrier\n", id)
			barrier.Wait()
			fmt.Printf("Goroutine %d continuing after barrier\n", id)
		}(i)
	}
	
	// Demonstrate phased work group
	fmt.Println("\n=== Phased Work Group ===")
	workGroup := NewWorkGroup(4, 3)
	
	workGroup.Start(func(id, phase int) {
		fmt.Printf("Worker %d executing phase %d\n", id, phase)
		time.Sleep(time.Duration(100*(id+phase)) * time.Millisecond)
		fmt.Printf("Worker %d completed phase %d\n", id, phase)
	})
	
	workGroup.Wait()
	fmt.Println("All workers completed all phases")
	
	// Demonstrate rendezvous pattern
	fmt.Println("\n=== Rendezvous Pattern ===")
	rendezvous := NewRendezvous()
	
	go func() {
		fmt.Println("Goroutine A starting")
		time.Sleep(300 * time.Millisecond)
		fmt.Println("Goroutine A arriving at rendezvous")
		otherID, isSecond := rendezvous.Arrive(1)
		if isSecond {
			fmt.Printf("Goroutine A met with Goroutine %d\n", otherID)
		} else {
			fmt.Println("Goroutine A continuing after rendezvous")
		}
	}()
	
	go func() {
		fmt.Println("Goroutine B starting")
		time.Sleep(100 * time.Millisecond)
		fmt.Println("Goroutine B arriving at rendezvous")
		otherID, isSecond := rendezvous.Arrive(2)
		if isSecond {
			fmt.Printf("Goroutine B met with Goroutine %d\n", otherID)
		} else {
			fmt.Println("Goroutine B continuing after rendezvous")
		}
	}()
	
	// Wait for demonstration to complete
	time.Sleep(1 * time.Second)
}

These coordination techniques enable:

  1. Phased execution: Coordinating multiple goroutines through distinct phases
  2. Barrier synchronization: Ensuring all goroutines reach a common point before proceeding
  3. Rendezvous: Coordinating pairs of goroutines to meet at a common point
  4. Work distribution: Efficiently distributing work across multiple goroutines

Multiplexing and Demultiplexing Channels

Channel multiplexing combines multiple input channels into a single output channel, while demultiplexing does the reverse:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// multiplexChannels combines multiple input channels into a single output channel
func multiplexChannels(inputs ...<-chan int) <-chan int {
	output := make(chan int)
	var wg sync.WaitGroup
	
	// Start a goroutine for each input channel
	for i, ch := range inputs {
		wg.Add(1)
		go func(id int, ch <-chan int) {
			defer wg.Done()
			for val := range ch {
				fmt.Printf("Multiplexer receiving from input %d: %d\n", id, val)
				output <- val
			}
		}(i, ch)
	}
	
	// Close the output channel when all input channels are done
	go func() {
		wg.Wait()
		close(output)
		fmt.Println("All input channels closed, multiplexer shutting down")
	}()
	
	return output
}

// demultiplexChannel splits a single input channel into multiple output channels
func demultiplexChannel(input <-chan int, n int) []<-chan int {
	outputs := make([]chan int, n)
	for i := range outputs {
		outputs[i] = make(chan int)
	}
	
	// Convert to read-only channels for return
	readOnlyOutputs := make([]<-chan int, n)
	for i, ch := range outputs {
		readOnlyOutputs[i] = ch
	}
	
	// Start the demultiplexer
	go func() {
		defer func() {
			for _, ch := range outputs {
				close(ch)
			}
			fmt.Println("Demultiplexer shutting down, all output channels closed")
		}()
		
		// Round-robin distribution
		for val := range input {
			// Determine which output channel to use based on the value
			outChan := val % n
			fmt.Printf("Demultiplexer sending %d to output %d\n", val, outChan)
			outputs[outChan] <- val
		}
	}()
	
	return readOnlyOutputs
}

// contentBasedDemux routes messages based on their content
func contentBasedDemux(input <-chan int, predicates ...func(int) bool) []<-chan int {
	outputs := make([]chan int, len(predicates))
	for i := range outputs {
		outputs[i] = make(chan int)
	}
	
	// Convert to read-only channels for return
	readOnlyOutputs := make([]<-chan int, len(predicates))
	for i, ch := range outputs {
		readOnlyOutputs[i] = ch
	}
	
	// Start the content-based router
	go func() {
		defer func() {
			for _, ch := range outputs {
				close(ch)
			}
			fmt.Println("Content router shutting down, all output channels closed")
		}()
		
		for val := range input {
			// Send to all matching outputs
			for i, predicate := range predicates {
				if predicate(val) {
					fmt.Printf("Router sending %d to output %d\n", val, i)
					outputs[i] <- val
				}
			}
		}
	}()
	
	return readOnlyOutputs
}

// broadcastChannel sends each input value to all output channels
func broadcastChannel(input <-chan int, n int) []<-chan int {
	outputs := make([]chan int, n)
	for i := range outputs {
		outputs[i] = make(chan int, 5) // Buffer to prevent blocking
	}
	
	// Convert to read-only channels for return
	readOnlyOutputs := make([]<-chan int, n)
	for i, ch := range outputs {
		readOnlyOutputs[i] = ch
	}
	
	// Start the broadcaster
	go func() {
		defer func() {
			for _, ch := range outputs {
				close(ch)
			}
			fmt.Println("Broadcaster shutting down, all output channels closed")
		}()
		
		for val := range input {
			fmt.Printf("Broadcasting value: %d\n", val)
			for i, ch := range outputs {
				select {
				case ch <- val:
					// Value sent successfully
				default:
					// Channel buffer full, log and continue
					fmt.Printf("Warning: Output %d buffer full, dropping value %d\n", i, val)
				}
			}
		}
	}()
	
	return readOnlyOutputs
}

func main() {
	// Create input channels for multiplexing demo
	fmt.Println("=== Channel Multiplexing ===")
	inputs := make([]chan int, 3)
	for i := range inputs {
		inputs[i] = make(chan int)
		i := i // Create new variable to avoid closure problem
		
		// Start producer for each input channel
		go func() {
			defer close(inputs[i])
			for j := 0; j < 3; j++ {
				val := i*10 + j
				fmt.Printf("Producer %d sending: %d\n", i, val)
				inputs[i] <- val
				time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
			}
		}()
	}
	
	// Convert to read-only channels for multiplexing
	readOnlyInputs := make([]<-chan int, len(inputs))
	for i, ch := range inputs {
		readOnlyInputs[i] = ch
	}
	
	// Multiplex the channels
	multiplexed := multiplexChannels(readOnlyInputs...)
	
	// Consume multiplexed output
	go func() {
		for val := range multiplexed {
			fmt.Printf("Consumer received from multiplexed channel: %d\n", val)
		}
	}()
	
	// Wait for multiplexing demo to complete
	time.Sleep(2 * time.Second)
	
	// Demonstrate demultiplexing
	fmt.Println("\n=== Channel Demultiplexing ===")
	input := make(chan int)
	
	// Start producer for input channel
	go func() {
		defer close(input)
		for i := 0; i < 10; i++ {
			fmt.Printf("Demux producer sending: %d\n", i)
			input <- i
			time.Sleep(100 * time.Millisecond)
		}
	}()
	
	// Demultiplex into 3 output channels
	outputs := demultiplexChannel(input, 3)
	
	// Start consumers for each output channel
	var wg sync.WaitGroup
	for i, ch := range outputs {
		wg.Add(1)
		i := i // Create new variable to avoid closure problem
		ch := ch
		
		go func() {
			defer wg.Done()
			for val := range ch {
				fmt.Printf("Demux consumer %d received: %d\n", i, val)
			}
		}()
	}
	
	// Wait for demultiplexing demo to complete
	time.Sleep(2 * time.Second)
	
	// Demonstrate content-based routing
	fmt.Println("\n=== Content-Based Routing ===")
	routerInput := make(chan int)
	
	// Define predicates for routing
	isEven := func(n int) bool { return n%2 == 0 }
	isDivisibleBy3 := func(n int) bool { return n%3 == 0 }
	isGreaterThan5 := func(n int) bool { return n > 5 }
	
	// Create router
	routedOutputs := contentBasedDemux(routerInput, isEven, isDivisibleBy3, isGreaterThan5)
	
	// Start consumers for each routed output
	for i, ch := range routedOutputs {
		wg.Add(1)
		i := i // Create new variable to avoid closure problem
		ch := ch
		
		go func() {
			defer wg.Done()
			for val := range ch {
				var condition string
				switch i {
				case 0:
					condition = "even"
				case 1:
					condition = "divisible by 3"
				case 2:
					condition = "greater than 5"
				}
				fmt.Printf("Router consumer %d received %d (%s)\n", i, val, condition)
			}
		}()
	}
	
	// Send values to the router
	go func() {
		defer close(routerInput)
		for i := 0; i < 10; i++ {
			fmt.Printf("Router producer sending: %d\n", i)
			routerInput <- i
			time.Sleep(100 * time.Millisecond)
		}
	}()
	
	// Wait for routing demo to complete
	time.Sleep(2 * time.Second)
}

These multiplexing and demultiplexing patterns enable:

  1. Channel consolidation: Combining multiple input sources into a single stream
  2. Load distribution: Distributing work across multiple workers
  3. Content-based routing: Directing messages based on their content
  4. Broadcasting: Sending the same message to multiple recipients

Error Handling in Channel Communication

Robust concurrent systems require effective error handling strategies. Go’s channel-based concurrency introduces unique challenges for error propagation and handling.

Error Propagation Patterns

There are several patterns for propagating errors through channel-based systems:

package main

import (
	"errors"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Result represents a computation result or error
type Result struct {
	Value int
	Err   error
}

// errorPropagationBasic demonstrates the basic error propagation pattern
func errorPropagationBasic() {
	// Create a channel for results
	results := make(chan Result)
	
	// Start a worker that might encounter errors
	go func() {
		defer close(results)
		
		// Simulate work with possible errors
		for i := 0; i < 5; i++ {
			// 30% chance of error
			if rand.Float32() < 0.3 {
				results <- Result{Err: fmt.Errorf("error processing item %d", i)}
				continue
			}
			
			// Successful computation
			results <- Result{Value: i * 10}
		}
	}()
	
	// Process results, handling errors
	for result := range results {
		if result.Err != nil {
			fmt.Printf("Error: %v\n", result.Err)
		} else {
			fmt.Printf("Success: %d\n", result.Value)
		}
	}
}

// errorPropagationPipeline demonstrates error propagation through a pipeline
func errorPropagationPipeline() {
	// Create a pipeline with error propagation
	source := generateWithErrors(1, 10)
	processed := processWithErrors(source)
	
	// Consume results
	for result := range processed {
		if result.Err != nil {
			fmt.Printf("Pipeline error: %v\n", result.Err)
		} else {
			fmt.Printf("Pipeline result: %d\n", result.Value)
		}
	}
}

// generateWithErrors produces integers with possible errors
func generateWithErrors(start, end int) <-chan Result {
	out := make(chan Result)
	
	go func() {
		defer close(out)
		for i := start; i <= end; i++ {
			// 20% chance of error
			if rand.Float32() < 0.2 {
				out <- Result{Err: fmt.Errorf("failed to generate item %d", i)}
				continue
			}
			
			// Simulate work
			time.Sleep(50 * time.Millisecond)
			out <- Result{Value: i}
		}
	}()
	
	return out
}

// processWithErrors transforms values and propagates errors
func processWithErrors(in <-chan Result) <-chan Result {
	out := make(chan Result)
	
	go func() {
		defer close(out)
		for result := range in {
			// If input already has an error, propagate it
			if result.Err != nil {
				out <- result
				continue
			}
			
			// 10% chance of new error during processing
			if rand.Float32() < 0.1 {
				out <- Result{Err: fmt.Errorf("failed to process value %d", result.Value)}
				continue
			}
			
			// Successful processing
			time.Sleep(30 * time.Millisecond)
			out <- Result{Value: result.Value * result.Value}
		}
	}()
	
	return out
}

// errorAggregation demonstrates collecting and aggregating errors
func errorAggregation() {
	// Create multiple workers that might produce errors
	numWorkers := 5
	results := make(chan Result)
	var wg sync.WaitGroup
	
	// Start workers
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			// 40% chance of error
			if rand.Float32() < 0.4 {
				results <- Result{Err: fmt.Errorf("worker %d failed", id)}
				return
			}
			
			// Successful computation
			results <- Result{Value: id * 10}
		}(i)
	}
	
	// Close results channel when all workers are done
	go func() {
		wg.Wait()
		close(results)
	}()
	
	// Collect and aggregate results
	var successCount, errorCount int
	var errors []error
	var values []int
	
	for result := range results {
		if result.Err != nil {
			errorCount++
			errors = append(errors, result.Err)
		} else {
			successCount++
			values = append(values, result.Value)
		}
	}
	
	// Report aggregated results
	fmt.Printf("Successful operations: %d\n", successCount)
	fmt.Printf("Failed operations: %d\n", errorCount)
	
	if errorCount > 0 {
		fmt.Println("Errors encountered:")
		for _, err := range errors {
			fmt.Printf("  - %v\n", err)
		}
	}
	
	if successCount > 0 {
		fmt.Printf("Values: %v\n", values)
	}
}

// earlyTerminationOnError demonstrates stopping a pipeline on first error
func earlyTerminationOnError() {
	// Create a context for cancellation
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	
	// Create a pipeline with early termination
	source := generateWithContext(ctx, 1, 20)
	processed := processWithContext(ctx, source)
	
	// Consume results until first error
	for result := range processed {
		if result.Err != nil {
			fmt.Printf("Pipeline error: %v\n", result.Err)
			fmt.Println("Cancelling pipeline...")
			cancel() // Signal all stages to stop
			break
		}
		
		fmt.Printf("Pipeline result: %d\n", result.Value)
	}
	
	// Wait for pipeline to fully terminate
	time.Sleep(100 * time.Millisecond)
	fmt.Println("Pipeline terminated")
}

// generateWithContext produces integers with possible errors and respects context
func generateWithContext(ctx context.Context, start, end int) <-chan Result {
	out := make(chan Result)
	
	go func() {
		defer close(out)
		for i := start; i <= end; i++ {
			// Check for cancellation
			select {
			case <-ctx.Done():
				fmt.Println("Generator cancelled")
				return
			default:
				// Continue processing
			}
			
			// 10% chance of error
			if rand.Float32() < 0.1 {
				select {
				case out <- Result{Err: fmt.Errorf("failed to generate item %d", i)}:
					// Error sent
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// Simulate work
			time.Sleep(50 * time.Millisecond)
			
			// Send result
			select {
			case out <- Result{Value: i}:
				// Value sent
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

// processWithContext transforms values and respects context
func processWithContext(ctx context.Context, in <-chan Result) <-chan Result {
	out := make(chan Result)
	
	go func() {
		defer close(out)
		for {
			// Check for cancellation between items
			select {
			case <-ctx.Done():
				fmt.Println("Processor cancelled")
				return
			default:
				// Continue processing
			}
			
			// Try to receive next item
			var result Result
			var ok bool
			select {
			case result, ok = <-in:
				if !ok {
					return // Input channel closed
				}
			case <-ctx.Done():
				return // Context cancelled
			}
			
			// If input already has an error, propagate it
			if result.Err != nil {
				select {
				case out <- result:
					// Error propagated
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// 15% chance of new error during processing
			if rand.Float32() < 0.15 {
				select {
				case out <- Result{Err: fmt.Errorf("failed to process value %d", result.Value)}:
					// Error sent
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// Successful processing
			time.Sleep(30 * time.Millisecond)
			select {
			case out <- Result{Value: result.Value * result.Value}:
				// Result sent
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Demonstrate basic error propagation
	fmt.Println("=== Basic Error Propagation ===")
	errorPropagationBasic()
	
	// Demonstrate error propagation through a pipeline
	fmt.Println("\n=== Pipeline Error Propagation ===")
	errorPropagationPipeline()
	
	// Demonstrate error aggregation
	fmt.Println("\n=== Error Aggregation ===")
	errorAggregation()
	
	// Demonstrate early termination on error
	fmt.Println("\n=== Early Termination on Error ===")
	earlyTerminationOnError()
}

Key error handling patterns:

  1. Result type pattern: Combining results and errors in a single struct
  2. Error propagation: Passing errors through pipeline stages
  3. Error aggregation: Collecting and summarizing errors from multiple operations
  4. Early termination: Stopping all processing when an error occurs

Performance Optimization and Monitoring

Building high-performance concurrent systems requires careful attention to channel usage patterns and performance characteristics.

Channel Sizing and Buffering Strategies

The size of channel buffers can significantly impact performance:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// benchmarkChannelBuffering measures the performance impact of different buffer sizes
func benchmarkChannelBuffering() {
	// Test parameters
	numItems := 10000
	bufferSizes := []int{0, 1, 10, 100, 1000}
	
	fmt.Println("Testing channel performance with different buffer sizes")
	fmt.Println("Buffer Size | Producer Time | Consumer Time | Total Time")
	fmt.Println("-----------|---------------|---------------|------------")
	
	for _, bufferSize := range bufferSizes {
		// Create channel with specified buffer size
		ch := make(chan int, bufferSize)
		
		var wg sync.WaitGroup
		wg.Add(2) // One for producer, one for consumer
		
		// Track timing
		start := time.Now()
		var producerDone time.Time
		
		// Start producer
		go func() {
			defer wg.Done()
			defer close(ch)
			
			for i := 0; i < numItems; i++ {
				ch <- i
			}
			
			producerDone = time.Now()
		}()
		
		// Start consumer
		go func() {
			defer wg.Done()
			
			count := 0
			for range ch {
				count++
				
				// Simulate variable processing time
				if rand.Intn(100) < 10 {
					time.Sleep(10 * time.Microsecond)
				}
			}
		}()
		
		// Wait for both to finish
		wg.Wait()
		totalTime := time.Since(start)
		producerTime := producerDone.Sub(start)
		consumerTime := totalTime
		
		// Report results
		fmt.Printf("%-11d | %-13s | %-13s | %s\n",
			bufferSize,
			producerTime.String(),
			consumerTime.String(),
			totalTime.String())
	}
}

// demonstrateBackpressure shows how buffered channels provide backpressure
func demonstrateBackpressure() {
	fmt.Println("\nDemonstrating backpressure with buffered channels")
	
	// Create a channel with limited buffer
	bufferSize := 5
	ch := make(chan int, bufferSize)
	
	// Start a slow consumer
	var wg sync.WaitGroup
	wg.Add(1)
	
	go func() {
		defer wg.Done()
		
		for item := range ch {
			fmt.Printf("Consumer processing item %d\n", item)
			time.Sleep(200 * time.Millisecond) // Slow consumer
		}
	}()
	
	// Producer tries to send items faster than consumer can process
	for i := 0; i < 10; i++ {
		fmt.Printf("Producer attempting to send item %d\n", i)
		start := time.Now()
		ch <- i
		elapsed := time.Since(start)
		
		if elapsed > 100*time.Millisecond {
			fmt.Printf("Producer blocked for %s while sending item %d (backpressure in action)\n",
				elapsed, i)
		} else {
			fmt.Printf("Producer sent item %d immediately\n", i)
		}
		
		time.Sleep(100 * time.Millisecond)
	}
	
	close(ch)
	wg.Wait()
}

// channelOverheadComparison compares channels to other synchronization methods
func channelOverheadComparison() {
	fmt.Println("\nComparing channel overhead to other synchronization methods")
	
	iterations := 100000
	
	// Test mutex-based synchronization
	start := time.Now()
	var mutex sync.Mutex
	var counter int
	
	var wg sync.WaitGroup
	wg.Add(2)
	
	go func() {
		defer wg.Done()
		for i := 0; i < iterations; i++ {
			mutex.Lock()
			counter++
			mutex.Unlock()
		}
	}()
	
	go func() {
		defer wg.Done()
		for i := 0; i < iterations; i++ {
			mutex.Lock()
			counter++
			mutex.Unlock()
		}
	}()
	
	wg.Wait()
	mutexTime := time.Since(start)
	
	// Test channel-based synchronization
	start = time.Now()
	ch := make(chan int)
	
	wg.Add(2)
	
	go func() {
		defer wg.Done()
		for i := 0; i < iterations; i++ {
			ch <- 1
		}
	}()
	
	go func() {
		defer wg.Done()
		for i := 0; i < iterations; i++ {
			<-ch
		}
	}()
	
	wg.Wait()
	channelTime := time.Since(start)
	
	// Test buffered channel
	start = time.Now()
	bufferedCh := make(chan int, 1000)
	
	wg.Add(2)
	
	go func() {
		defer wg.Done()
		for i := 0; i < iterations;
		for i := 0; i < iterations; i++ {
			bufferedCh <- 1
		}
	}()
	
	go func() {
		defer wg.Done()
		for i := 0; i < iterations; i++ {
			<-bufferedCh
		}
	}()
	
	wg.Wait()
	bufferedChannelTime := time.Since(start)
	
	// Report results
	fmt.Printf("Mutex:           %s\n", mutexTime)
	fmt.Printf("Unbuffered Chan: %s\n", channelTime)
	fmt.Printf("Buffered Chan:   %s\n", bufferedChannelTime)
}

func main() {
	// Benchmark different buffer sizes
	benchmarkChannelBuffering()
	
	// Demonstrate backpressure
	demonstrateBackpressure()
	
	// Compare channel overhead to other synchronization methods
	channelOverheadComparison()
}

Key performance considerations:

  1. Buffer sizing: Larger buffers can improve throughput but increase memory usage
  2. Backpressure: Buffered channels naturally implement backpressure when producers outpace consumers
  3. Overhead comparison: Channels have higher overhead than mutexes but provide more functionality
  4. Batching: Processing items in batches can reduce channel communication overhead

Monitoring Channel Health

Monitoring channel behavior is essential for identifying bottlenecks and deadlocks:

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

// ChannelStats tracks statistics about a channel
type ChannelStats struct {
	Name           string
	SendCount      int64
	ReceiveCount   int64
	BlockedSends   int64
	BlockedReceives int64
	LastActivity   time.Time
	mutex          sync.Mutex
}

// NewChannelStats creates a new channel statistics tracker
func NewChannelStats(name string) *ChannelStats {
	return &ChannelStats{
		Name:         name,
		LastActivity: time.Now(),
	}
}

// RecordSend records a send operation
func (cs *ChannelStats) RecordSend(blocked bool) {
	cs.mutex.Lock()
	defer cs.mutex.Unlock()
	
	cs.SendCount++
	if blocked {
		cs.BlockedSends++
	}
	cs.LastActivity = time.Now()
}

// RecordReceive records a receive operation
func (cs *ChannelStats) RecordReceive(blocked bool) {
	cs.mutex.Lock()
	defer cs.mutex.Unlock()
	
	cs.ReceiveCount++
	if blocked {
		cs.BlockedReceives++
	}
	cs.LastActivity = time.Now()
}

// GetStats returns the current statistics
func (cs *ChannelStats) GetStats() map[string]interface{} {
	cs.mutex.Lock()
	defer cs.mutex.Unlock()
	
	return map[string]interface{}{
		"name":            cs.Name,
		"sends":           cs.SendCount,
		"receives":        cs.ReceiveCount,
		"blocked_sends":   cs.BlockedSends,
		"blocked_receives": cs.BlockedReceives,
		"idle_time":       time.Since(cs.LastActivity).String(),
	}
}

// InstrumentedChannel wraps a channel with monitoring
type InstrumentedChannel struct {
	ch    chan int
	stats *ChannelStats
}

// NewInstrumentedChannel creates a new instrumented channel
func NewInstrumentedChannel(name string, buffer int) *InstrumentedChannel {
	return &InstrumentedChannel{
		ch:    make(chan int, buffer),
		stats: NewChannelStats(name),
	}
}

// Send sends a value on the channel with instrumentation
func (ic *InstrumentedChannel) Send(value int) {
	// Try non-blocking send first
	select {
	case ic.ch <- value:
		ic.stats.RecordSend(false)
	default:
		// Blocking send
		ic.stats.RecordSend(true)
		ic.ch <- value
	}
}

// Receive receives a value from the channel with instrumentation
func (ic *InstrumentedChannel) Receive() (int, bool) {
	// Try non-blocking receive first
	select {
	case value, ok := <-ic.ch:
		ic.stats.RecordReceive(false)
		return value, ok
	default:
		// Blocking receive
		ic.stats.RecordReceive(true)
		value, ok := <-ic.ch
		return value, ok
	}
}

// Close closes the underlying channel
func (ic *InstrumentedChannel) Close() {
	close(ic.ch)
}

// GetStats returns the channel statistics
func (ic *InstrumentedChannel) GetStats() map[string]interface{} {
	return ic.stats.GetStats()
}

// monitorChannels periodically reports channel statistics
func monitorChannels(channels []*InstrumentedChannel, interval time.Duration, done <-chan struct{}) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			fmt.Println("\nChannel Statistics:")
			fmt.Println("-------------------")
			
			for _, ch := range channels {
				stats := ch.GetStats()
				fmt.Printf("Channel: %s\n", stats["name"])
				fmt.Printf("  Sends: %d (blocked: %d)\n", stats["sends"], stats["blocked_sends"])
				fmt.Printf("  Receives: %d (blocked: %d)\n", stats["receives"], stats["blocked_receives"])
				fmt.Printf("  Idle time: %s\n", stats["idle_time"])
			}
			
			// Report goroutine count
			fmt.Printf("\nTotal goroutines: %d\n", runtime.NumGoroutine())
			
		case <-done:
			return
		}
	}
}

// simulateChannelWorkload demonstrates channel monitoring
func simulateChannelWorkload() {
	// Create instrumented channels
	fastChannel := NewInstrumentedChannel("fast", 10)
	slowChannel := NewInstrumentedChannel("slow", 5)
	
	// Create done channel for cleanup
	done := make(chan struct{})
	
	// Start monitoring
	go monitorChannels([]*InstrumentedChannel{fastChannel, slowChannel}, 1*time.Second, done)
	
	// Start producer for fast channel
	go func() {
		for i := 0; i < 1000; i++ {
			fastChannel.Send(i)
			time.Sleep(10 * time.Millisecond)
		}
		fastChannel.Close()
	}()
	
	// Start consumer for fast channel
	go func() {
		for {
			_, ok := fastChannel.Receive()
			if !ok {
				break
			}
			time.Sleep(20 * time.Millisecond) // Consumer is slower than producer
		}
	}()
	
	// Start producer for slow channel
	go func() {
		for i := 0; i < 100; i++ {
			slowChannel.Send(i)
			time.Sleep(50 * time.Millisecond)
		}
		slowChannel.Close()
	}()
	
	// Start consumer for slow channel
	go func() {
		for {
			_, ok := slowChannel.Receive()
			if !ok {
				break
			}
			time.Sleep(10 * time.Millisecond) // Consumer is faster than producer
		}
	}()
	
	// Run simulation for 10 seconds
	time.Sleep(10 * time.Second)
	close(done)
}

func main() {
	simulateChannelWorkload()
}

Key monitoring techniques:

  1. Instrumented channels: Wrapping channels with monitoring code
  2. Operation tracking: Recording send and receive operations
  3. Blocked operation detection: Identifying when operations block
  4. Idle time tracking: Detecting potentially deadlocked channels
  5. Goroutine count monitoring: Identifying potential goroutine leaks

Channel Memory Management

Efficient channel memory management is crucial for high-performance systems:

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

// demonstrateChannelMemoryUsage shows memory usage patterns of channels
func demonstrateChannelMemoryUsage() {
	fmt.Println("=== Channel Memory Usage ===")
	
	// Print initial memory stats
	printMemStats("Initial")
	
	// Create many small channels
	fmt.Println("\nCreating 100,000 small channels...")
	smallChannels := make([]chan int, 100000)
	for i := range smallChannels {
		smallChannels[i] = make(chan int, 1)
	}
	
	// Print memory stats after creating small channels
	printMemStats("After small channels")
	
	// Create a few large channels
	fmt.Println("\nCreating 10 large channels (buffer size 10,000)...")
	largeChannels := make([]chan int, 10)
	for i := range largeChannels {
		largeChannels[i] = make(chan int, 10000)
	}
	
	// Print memory stats after creating large channels
	printMemStats("After large channels")
	
	// Fill the large channels
	fmt.Println("\nFilling large channels...")
	for _, ch := range largeChannels {
		for i := 0; i < 10000; i++ {
			ch <- i
		}
	}
	
	// Print memory stats after filling large channels
	printMemStats("After filling large channels")
	
	// Clear references to allow garbage collection
	fmt.Println("\nClearing references...")
	smallChannels = nil
	largeChannels = nil
	
	// Force garbage collection
	runtime.GC()
	
	// Print final memory stats
	printMemStats("After garbage collection")
}

// printMemStats prints current memory statistics
func printMemStats(label string) {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	
	fmt.Printf("%s:\n", label)
	fmt.Printf("  Alloc: %.2f MB\n", float64(m.Alloc)/1024/1024)
	fmt.Printf("  Sys: %.2f MB\n", float64(m.Sys)/1024/1024)
	fmt.Printf("  NumGC: %d\n", m.NumGC)
}

// objectPool demonstrates reusing channel values to reduce allocations
type objectPool struct {
	pool chan []byte
}

// newObjectPool creates a new object pool
func newObjectPool(size int, bufferSize int) *objectPool {
	p := &objectPool{
		pool: make(chan []byte, size),
	}
	
	// Pre-allocate objects
	for i := 0; i < size; i++ {
		p.pool <- make([]byte, bufferSize)
	}
	
	return p
}

// get retrieves an object from the pool or creates a new one if none available
func (p *objectPool) get() []byte {
	select {
	case obj := <-p.pool:
		return obj
	default:
		// Pool is empty, create a new object
		return make([]byte, 4096)
	}
}

// put returns an object to the pool
func (p *objectPool) put(obj []byte) {
	// Clear the buffer for reuse
	for i := range obj {
		obj[i] = 0
	}
	
	select {
	case p.pool <- obj:
		// Object returned to pool
	default:
		// Pool is full, let the object be garbage collected
	}
}

// demonstrateObjectPooling shows how to reduce allocations with object pooling
func demonstrateObjectPooling() {
	fmt.Println("\n=== Object Pooling ===")
	
	// Create a pool of 100 byte slices, each 4KB
	pool := newObjectPool(100, 4096)
	
	// Benchmark without pooling
	printMemStats("Before non-pooled operations")
	
	start := time.Now()
	var wg sync.WaitGroup
	
	for i := 0; i < 10000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			
			// Allocate and use a new buffer each time
			buf := make([]byte, 4096)
			for i := range buf {
				buf[i] = byte(i % 256)
			}
		}()
	}
	
	wg.Wait()
	
	nonPooledTime := time.Since(start)
	printMemStats("After non-pooled operations")
	
	// Force garbage collection
	runtime.GC()
	
	// Benchmark with pooling
	printMemStats("Before pooled operations")
	
	start = time.Now()
	
	for i := 0; i < 10000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			
			// Get buffer from pool
			buf := pool.get()
			
			// Use the buffer
			for i := range buf {
				buf[i] = byte(i % 256)
			}
			
			// Return buffer to pool
			pool.put(buf)
		}()
	}
	
	wg.Wait()
	
	pooledTime := time.Since(start)
	printMemStats("After pooled operations")
	
	// Report timing results
	fmt.Printf("\nNon-pooled time: %s\n", nonPooledTime)
	fmt.Printf("Pooled time: %s\n", pooledTime)
	fmt.Printf("Improvement: %.2f%%\n", 100*(1-float64(pooledTime)/float64(nonPooledTime)))
}

func main() {
	// Demonstrate channel memory usage
	demonstrateChannelMemoryUsage()
	
	// Demonstrate object pooling
	demonstrateObjectPooling()
}

Key memory management techniques:

  1. Buffer sizing: Choosing appropriate buffer sizes to balance memory usage and performance
  2. Object pooling: Reusing objects to reduce allocation and garbage collection overhead
  3. Memory monitoring: Tracking memory usage to identify leaks and inefficiencies
  4. Pre-allocation: Allocating channels and buffers upfront to reduce dynamic allocations

Production Implementation Strategies

Implementing channel-based concurrency patterns in production systems requires careful consideration of reliability, maintainability, and performance.

Graceful Shutdown Patterns

Proper shutdown handling is essential for production systems:

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

// Worker represents a long-running worker goroutine
type Worker struct {
	id          int
	jobs        <-chan int
	results     chan<- int
	ctx         context.Context
	cancel      context.CancelFunc
	gracePeriod time.Duration
	wg          *sync.WaitGroup
}

// NewWorker creates a new worker
func NewWorker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) *Worker {
	ctx, cancel := context.WithCancel(context.Background())
	return &Worker{
		id:          id,
		jobs:        jobs,
		results:     results,
		ctx:         ctx,
		cancel:      cancel,
		gracePeriod: 2 * time.Second,
		wg:          wg,
	}
}

// Start begins the worker's processing loop
func (w *Worker) Start() {
	w.wg.Add(1)
	go func() {
		defer w.wg.Done()
		
		for {
			select {
			case <-w.ctx.Done():
				fmt.Printf("Worker %d shutting down\n", w.id)
				return
				
			case job, ok := <-w.jobs:
				if !ok {
					fmt.Printf("Worker %d: job channel closed\n", w.id)
					return
				}
				
				// Process the job
				fmt.Printf("Worker %d processing job %d\n", w.id, job)
				time.Sleep(500 * time.Millisecond) // Simulate work
				
				// Try to send result, respecting cancellation
				select {
				case w.results <- job * 2:
					// Result sent successfully
				case <-w.ctx.Done():
					fmt.Printf("Worker %d: cancelled while sending result\n", w.id)
					return
				}
			}
		}
	}()
}

// Stop signals the worker to stop and waits for graceful shutdown
func (w *Worker) Stop() {
	fmt.Printf("Stopping worker %d (grace period: %v)\n", w.id, w.gracePeriod)
	
	// Signal worker to stop
	w.cancel()
	
	// Create a channel that will be closed after the grace period
	gracePeriodExpired := make(chan struct{})
	go func() {
		time.Sleep(w.gracePeriod)
		close(gracePeriodExpired)
	}()
	
	// Wait for either the worker to finish or the grace period to expire
	select {
	case <-gracePeriodExpired:
		fmt.Printf("Grace period expired for worker %d\n", w.id)
	}
}

// WorkerPool manages a pool of workers
type WorkerPool struct {
	workers     []*Worker
	jobs        chan int
	results     chan int
	ctx         context.Context
	cancel      context.CancelFunc
	wg          sync.WaitGroup
	shutdownWg  sync.WaitGroup
	gracePeriod time.Duration
}

// NewWorkerPool creates a new worker pool
func NewWorkerPool(numWorkers int) *WorkerPool {
	ctx, cancel := context.WithCancel(context.Background())
	return &WorkerPool{
		workers:     make([]*Worker, numWorkers),
		jobs:        make(chan int),
		results:     make(chan int),
		ctx:         ctx,
		cancel:      cancel,
		gracePeriod: 5 * time.Second,
	}
}

// Start launches the worker pool
func (wp *WorkerPool) Start() {
	// Start workers
	for i := 0; i < len(wp.workers); i++ {
		wp.workers[i] = NewWorker(i, wp.jobs, wp.results, &wp.wg)
		wp.workers[i].Start()
	}
	
	// Start result collector
	wp.shutdownWg.Add(1)
	go func() {
		defer wp.shutdownWg.Done()
		
		for {
			select {
			case result, ok := <-wp.results:
				if !ok {
					return
				}
				fmt.Printf("Got result: %d\n", result)
				
			case <-wp.ctx.Done():
				fmt.Println("Result collector shutting down")
				return
			}
		}
	}()
}

// SubmitJob adds a job to the pool
func (wp *WorkerPool) SubmitJob(job int) error {
	select {
	case wp.jobs <- job:
		return nil
	case <-wp.ctx.Done():
		return fmt.Errorf("worker pool is shutting down")
	}
}

// Shutdown gracefully shuts down the worker pool
func (wp *WorkerPool) Shutdown() {
	fmt.Printf("Initiating graceful shutdown (grace period: %v)\n", wp.gracePeriod)
	
	// Signal shutdown
	wp.cancel()
	
	// Close the jobs channel to signal no more jobs
	close(wp.jobs)
	
	// Create a channel that will be closed after the grace period
	gracePeriodExpired := make(chan struct{})
	go func() {
		time.Sleep(wp.gracePeriod)
		close(gracePeriodExpired)
	}()
	
	// Wait for either all workers to finish or the grace period to expire
	doneChannel := make(chan struct{})
	go func() {
		wp.wg.Wait()
		close(doneChannel)
	}()
	
	select {
	case <-doneChannel:
		fmt.Println("All workers completed gracefully")
	case <-gracePeriodExpired:
		fmt.Println("Grace period expired, some workers may still be running")
	}
	
	// Close the results channel
	close(wp.results)
	
	// Wait for the result collector to finish
	wp.shutdownWg.Wait()
	
	fmt.Println("Worker pool shutdown complete")
}

// setupGracefulShutdown sets up signal handling for graceful shutdown
func setupGracefulShutdown(shutdown func()) {
	// Create channel to receive OS signals
	sigs := make(chan os.Signal, 1)
	
	// Register for SIGINT (Ctrl+C) and SIGTERM
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	
	// Start goroutine to handle signals
	go func() {
		sig := <-sigs
		fmt.Printf("\nReceived signal: %s\n", sig)
		shutdown()
	}()
}

func main() {
	// Create a worker pool with 3 workers
	pool := NewWorkerPool(3)
	
	// Set up graceful shutdown
	setupGracefulShutdown(pool.Shutdown)
	
	// Start the worker pool
	pool.Start()
	
	// Submit some jobs
	fmt.Println("Submitting jobs...")
	for i := 1; i <= 10; i++ {
		if err := pool.SubmitJob(i); err != nil {
			fmt.Printf("Error submitting job: %v\n", err)
		}
	}
	
	// Wait for a bit to let some processing happen
	time.Sleep(3 * time.Second)
	
	// Initiate graceful shutdown
	fmt.Println("Initiating shutdown...")
	pool.Shutdown()
	
	fmt.Println("Main function exiting")
}

Key shutdown patterns:

  1. Context cancellation: Using context to signal shutdown to all components
  2. Grace periods: Allowing components time to finish current work
  3. Signal handling: Responding to OS signals for clean shutdown
  4. Resource cleanup: Ensuring all resources are properly released

Error Handling and Recovery

Robust error handling and recovery are essential for production systems:

package main

import (
	"errors"
	"fmt"
	"log"
	"math/rand"
	"sync"
	"time"
)

// RecoverableWorker represents a worker that can recover from panics
type RecoverableWorker struct {
	id           int
	jobs         <-chan int
	results      chan<- Result
	errors       chan<- error
	restartDelay time.Duration
	maxRestarts  int
	wg           *sync.WaitGroup
}

// Result represents a job result or error
type Result struct {
	JobID  int
	Value  int
	Worker int
}

// NewRecoverableWorker creates a new worker that can recover from panics
func NewRecoverableWorker(id int, jobs <-chan int, results chan<- Result, errors chan<- error, wg *sync.WaitGroup) *RecoverableWorker {
	return &RecoverableWorker{
		id:           id,
		jobs:         jobs,
		results:      results,
		errors:       errors,
		restartDelay: 1 * time.Second,
		maxRestarts:  3,
		wg:           wg,
	}
}

// Start begins the worker's processing loop with recovery
func (w *RecoverableWorker) Start() {
	w.wg.Add(1)
	go func() {
		defer w.wg.Done()
		
		restarts := 0
		for restarts <= w.maxRestarts {
			if restarts > 0 {
				log.Printf("Worker %d: restarting (%d/%d) after delay of %v", 
					w.id, restarts, w.maxRestarts, w.restartDelay)
				time.Sleep(w.restartDelay)
			}
			
			// Run the worker with panic recovery
			if w.runWithRecovery() {
				// Normal exit, no need to restart
				return
			}
			
			restarts++
		}
		
		log.Printf("Worker %d: exceeded maximum restarts (%d)", w.id, w.maxRestarts)
		w.errors <- fmt.Errorf("worker %d exceeded maximum restarts (%d)", w.id, w.maxRestarts)
	}()
}

// runWithRecovery runs the worker's main loop with panic recovery
// Returns true if the worker exited normally, false if it panicked
func (w *RecoverableWorker) runWithRecovery() (normalExit bool) {
	defer func() {
		if r := recover(); r != nil {
			log.Printf("Worker %d: recovered from panic: %v", w.id, r)
			normalExit = false
		}
	}()
	
	for job := range w.jobs {
		// Simulate random panics
		if rand.Float32() < 0.1 {
			panic(fmt.Sprintf("simulated panic processing job %d", job))
		}
		
		// Simulate work
		time.Sleep(200 * time.Millisecond)
		
		// Simulate random errors
		if rand.Float32() < 0.2 {
			w.errors <- fmt.Errorf("worker %d: error processing job %d", w.id, job)
			continue
		}
		
		// Send successful result
		w.results <- Result{
			JobID:  job,
			Value:  job * 2,
			Worker: w.id,
		}
	}
	
	return true // Normal exit
}

// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
	failures       int
	threshold      int
	resetTimeout   time.Duration
	halfOpenTimeout time.Duration
	lastFailure    time.Time
	state          string
	mutex          sync.Mutex
}

// NewCircuitBreaker creates a new circuit breaker
func NewCircuitBreaker(threshold int, resetTimeout time.Duration) *CircuitBreaker {
	return &CircuitBreaker{
		threshold:      threshold,
		resetTimeout:   resetTimeout,
		halfOpenTimeout: 5 * time.Second,
		state:          "closed",
	}
}

// Execute runs the given function with circuit breaker protection
func (cb *CircuitBreaker) Execute(operation func() error) error {
	cb.mutex.Lock()
	
	// Check if circuit is open
	if cb.state == "open" {
		// Check if reset timeout has elapsed
		if time.Since(cb.lastFailure) > cb.resetTimeout {
			log.Println("Circuit half-open, allowing trial request")
			cb.state = "half-open"
		} else {
			cb.mutex.Unlock()
			return errors.New("circuit breaker is open")
		}
	}
	
	cb.mutex.Unlock()
	
	// Execute the operation
	err := operation()
	
	cb.mutex.Lock()
	defer cb.mutex.Unlock()
	
	if err != nil {
		// Operation failed
		cb.failures++
		cb.lastFailure = time.Now()
		
		if cb.state == "half-open" || cb.failures >= cb.threshold {
			// Open the circuit
			cb.state = "open"
			log.Printf("Circuit opened: %v failures (threshold: %d)", cb.failures, cb.threshold)
		}
		
		return err
	}
	
	// Operation succeeded
	if cb.state == "half-open" {
		// Reset the circuit
		cb.state = "closed"
		cb.failures = 0
		log.Println("Circuit closed: trial request succeeded")
	}
	
	return nil
}

// State returns the current state of the circuit breaker
func (cb *CircuitBreaker) State() string {
	cb.mutex.Lock()
	defer cb.mutex.Unlock()
	return cb.state
}

// demonstrateCircuitBreaker shows how to use a circuit breaker with channels
func demonstrateCircuitBreaker() {
	// Create a circuit breaker
	cb := NewCircuitBreaker(3, 5*time.Second)
	
	// Create channels
	requests := make(chan int, 10)
	results := make(chan Result)
	errors := make(chan error)
	
	// Start worker
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		
		for req := range requests {
			// Use circuit breaker to protect the operation
			err := cb.Execute(func() error {
				// Simulate an unreliable operation
				if rand.Float32() < 0.7 {
					return fmt.Errorf("simulated error processing request %d", req)
				}
				
				// Successful operation
				results <- Result{JobID: req, Value: req * 10}
				return nil
			})
			
			if err != nil {
				errors <- err
			}
		}
	}()
	
	// Start error and result handlers
	go func() {
		for err := range errors {
			log.Printf("Error: %v", err)
		}
	}()
	
	go func() {
		for result := range results {
			log.Printf("Result: %+v", result)
		}
	}()
	
	// Send requests
	log.Println("Sending requests...")
	for i := 1; i <= 20; i++ {
		requests <- i
		log.Printf("Request %d sent, circuit state: %s", i, cb.State())
		time.Sleep(500 * time.Millisecond)
	}
	
	close(requests)
	wg.Wait()
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Demonstrate recoverable workers
	log.Println("=== Recoverable Workers ===")
	
	// Create channels
	jobs := make(chan int, 10)
	results := make(chan Result)
	errors := make(chan error)
	
	// Create and start workers
	var wg sync.WaitGroup
	numWorkers := 3
	
	for i := 0; i < numWorkers; i++ {
		worker := NewRecoverableWorker(i, jobs, results, errors, &wg)
		worker.Start()
	}
	
	// Start error handler
	go func() {
		for err := range errors {
			log.Printf("Error: %v", err)
		}
	}()
	
	// Start result handler
	go func() {
		for result := range results {
			log.Printf("Result: Job %d = %d (Worker %d)", 
				result.JobID, result.Value, result.Worker)
		}
	}()
	
	// Send jobs
	for i := 1; i <= 20; i++ {
		jobs <- i
	}
	
	// Close jobs channel and wait for workers to finish
	close(jobs)
	wg.Wait()
	
	// Demonstrate circuit breaker
	log.Println("\n=== Circuit Breaker ===")
	demonstrateCircuitBreaker()
}

Key error handling patterns:

  1. Panic recovery: Recovering from panics to prevent goroutine crashes
  2. Worker restart: Automatically restarting failed workers
  3. Circuit breaker: Preventing cascading failures by failing fast
  4. Error propagation: Sending errors through dedicated channels

Testing Concurrent Code

Testing concurrent code requires specialized techniques:

package main

import (
	"context"
	"fmt"
	"sync"
	"testing"
	"time"
)

// Pipeline represents a simple data processing pipeline
type Pipeline struct {
	source      func(ctx context.Context) <-chan int
	transform   func(ctx context.Context, in <-chan int) <-chan int
	sink        func(ctx context.Context, in <-chan int) <-chan Result
}

// TestPipeline demonstrates testing a concurrent pipeline
func TestPipeline(t *testing.T) {
	// Create a test pipeline
	p := Pipeline{
		source: func(ctx context.Context) <-chan int {
			out := make(chan int)
			go func() {
				defer close(out)
				for i := 1; i <= 5; i++ {
					select {
					case out <- i:
					case <-ctx.Done():
						return
					}
				}
			}()
			return out
		},
		transform: func(ctx context.Context, in <-chan int) <-chan int {
			out := make(chan int)
			go func() {
				defer close(out)
				for n := range in {
					select {
					case out <- n * 2:
					case <-ctx.Done():
						return
					}
				}
			}()
			return out
		},
		sink: func(ctx context.Context, in <-chan int) <-chan Result {
			out := make(chan Result)
			go func() {
				defer close(out)
				for n := range in {
					select {
					case out <- Result{Value: n}:
					case <-ctx.Done():
						return
					}
				}
			}()
			return out
		},
	}
	
	// Create context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()
	
	// Run the pipeline
	source := p.source(ctx)
	transformed := p.transform(ctx, source)
	results := p.sink(ctx, transformed)
	
	// Collect and verify results
	var actual []int
	for result := range results {
		actual = append(actual, result.Value)
	}
	
	// Verify results
	expected := []int{2, 4, 6, 8, 10}
	if len(actual) != len(expected) {
		t.Errorf("Expected %d results, got %d", len(expected), len(actual))
	}
	
	for i, v := range actual {
		if v != expected[i] {
			t.Errorf("Expected %d at position %d, got %d", expected[i], i, v)
		}
	}
}

// TestRaceConditions demonstrates testing for race conditions
func TestRaceConditions(t *testing.T) {
	// Create a shared counter
	var counter int
	var mutex sync.Mutex
	
	// Create a WaitGroup to synchronize goroutines
	var wg sync.WaitGroup
	
	// Launch multiple goroutines to increment the counter
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			
			// Properly synchronized increment
			mutex.Lock()
			counter++
			mutex.Unlock()
			
			// Incorrectly synchronized increment would be:
			// counter++ // This would cause a race condition
		}()
	}
	
	// Wait for all goroutines to finish
	wg.Wait()
	
	// Verify the counter value
	if counter != 100 {
		t.Errorf("Expected counter to be 100, got %d", counter)
	}
	
	// Note: This test should be run with the -race flag:
	// go test -race
}

// TestDeadlockDetection demonstrates testing for deadlocks
func TestDeadlockDetection(t *testing.T) {
	// Create a channel for communication
	ch := make(chan int)
	
	// Set a timeout to detect deadlocks
	timeout := time.After(500 * time.Millisecond)
	
	// Start a goroutine that sends a value
	go func() {
		ch <- 42
	}()
	
	// Try to receive with a timeout
	select {
	case val := <-ch:
		fmt.Printf("Received: %d\n", val)
	case <-timeout:
		t.Fatal("Deadlock detected: timed out waiting for channel send/receive")
	}
	
	// Example of a potential deadlock (commented out)
	/*
	unbufferedCh := make(chan int) // Unbuffered channel
	
	// This would deadlock if uncommented:
	// unbufferedCh <- 1 // Send without a receiver
	
	// Instead, use a goroutine:
	go func() {
		unbufferedCh <- 1
	}()
	
	// And receive with a timeout:
	select {
	case <-unbufferedCh:
		// Success
	case <-time.After(500 * time.Millisecond):
		t.Fatal("Deadlock detected")
	}
	*/
}

func main() {
	// These functions would typically be run as tests
	// TestPipeline(nil)
	// TestRaceConditions(nil)
	// TestDeadlockDetection(nil)
	
	fmt.Println("Run these functions as tests with 'go test'")
}

Key testing techniques:

  1. Timeout-based testing: Using timeouts to detect deadlocks and hangs
  2. Race detection: Using Go’s race detector to find race conditions
  3. Context cancellation: Testing proper cancellation handling
  4. Deterministic testing: Creating reproducible tests for concurrent code