Advanced Channel Patterns

Building on the fundamentals, we can now explore more sophisticated channel patterns that solve complex concurrency challenges. These patterns provide reusable solutions for common concurrent programming scenarios.

Select Statement Patterns

The select statement is one of Go’s most powerful concurrency primitives, enabling non-blocking operations and coordination between multiple channels:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// timeoutOperation demonstrates using select for timeouts
func timeoutOperation() {
	ch := make(chan string)
	
	go func() {
		// Simulate work that takes random time
		delay := time.Duration(rand.Intn(500)) * time.Millisecond
		time.Sleep(delay)
		ch <- fmt.Sprintf("Operation completed in %v", delay)
	}()
	
	// Wait for result or timeout
	select {
	case result := <-ch:
		fmt.Println("Success:", result)
	case <-time.After(300 * time.Millisecond):
		fmt.Println("Operation timed out")
	}
}

// nonBlockingReceive demonstrates non-blocking channel operations
func nonBlockingReceive(ch chan string) {
	select {
	case msg := <-ch:
		fmt.Println("Received message:", msg)
	default:
		fmt.Println("No message available")
	}
}

// nonBlockingSend demonstrates non-blocking send operation
func nonBlockingSend(ch chan string, msg string) {
	select {
	case ch <- msg:
		fmt.Println("Sent message:", msg)
	default:
		fmt.Println("Cannot send message, channel full or no receivers")
	}
}

// prioritySelect demonstrates channel priority using select
func prioritySelect(high, medium, low chan string) {
	for {
		select {
		case msg := <-high:
			fmt.Println("High priority:", msg)
			return
		default:
			// Continue to nested select
		}
		
		select {
		case msg := <-high:
			fmt.Println("High priority:", msg)
			return
		case msg := <-medium:
			fmt.Println("Medium priority:", msg)
			return
		default:
			// Continue to final select
		}
		
		select {
		case msg := <-high:
			fmt.Println("High priority:", msg)
		case msg := <-medium:
			fmt.Println("Medium priority:", msg)
		case msg := <-low:
			fmt.Println("Low priority:", msg)
		case <-time.After(500 * time.Millisecond):
			fmt.Println("Timed out waiting for messages")
		}
		return
	}
}

// dynamicSelectCases demonstrates how to handle a dynamic number of channels
func dynamicSelectCases(channels []chan int) {
	cases := make([]reflect.SelectCase, len(channels))
	for i, ch := range channels {
		cases[i] = reflect.SelectCase{
			Dir:  reflect.SelectRecv,
			Chan: reflect.ValueOf(ch),
		}
	}
	
	// Add timeout case
	cases = append(cases, reflect.SelectCase{
		Dir:  reflect.SelectRecv,
		Chan: reflect.ValueOf(time.After(1 * time.Second)),
	})
	
	// Wait for any channel to receive
	chosen, value, ok := reflect.Select(cases)
	if chosen == len(cases)-1 {
		fmt.Println("Timed out")
		return
	}
	
	if ok {
		fmt.Printf("Received value %v from channel %d\n", value.Int(), chosen)
	} else {
		fmt.Printf("Channel %d was closed\n", chosen)
	}
}

func main() {
	// Demonstrate timeout pattern
	fmt.Println("=== Timeout Pattern ===")
	timeoutOperation()
	
	// Demonstrate non-blocking operations
	fmt.Println("\n=== Non-blocking Operations ===")
	ch := make(chan string, 1)
	nonBlockingReceive(ch)
	nonBlockingSend(ch, "Hello")
	nonBlockingReceive(ch)
	nonBlockingSend(ch, "World") // Channel is now full
	
	// Demonstrate priority selection
	fmt.Println("\n=== Priority Selection ===")
	high := make(chan string, 1)
	medium := make(chan string, 1)
	low := make(chan string, 1)
	
	// Try different scenarios
	medium <- "Medium message"
	prioritySelect(high, medium, low)
	
	// For dynamic select case demo, we need the reflect package
	fmt.Println("\n=== Dynamic Select Cases ===")
	channels := make([]chan int, 3)
	for i := range channels {
		channels[i] = make(chan int)
		i := i // Create new variable to avoid closure problem
		go func() {
			time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
			channels[i] <- i
		}()
	}
	
	// Import reflect package at the top if running this code
	// dynamicSelectCases(channels)
	fmt.Println("(Dynamic select cases require the reflect package)")
}

Key select patterns:

  1. Timeout pattern: Combine a work channel with time.After() to implement timeouts
  2. Non-blocking operations: Use the default case to make channel operations non-blocking
  3. Priority selection: Nest multiple select statements to implement channel priority
  4. Dynamic cases: Use the reflect package to handle a dynamic number of channels

Fan-Out and Fan-In Patterns

Fan-out/fan-in is a powerful pattern for parallel processing that distributes work across multiple goroutines and then consolidates the results:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// worker processes items from input and sends results to output
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	
	for job := range jobs {
		fmt.Printf("Worker %d processing job %d\n", id, job)
		time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // Simulate work
		results <- job * 2                                           // Send result
	}
}

// fanOut distributes work across multiple workers
func fanOut(jobs <-chan int, numWorkers int) <-chan int {
	results := make(chan int)
	var wg sync.WaitGroup
	
	// Start workers
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, results, &wg)
	}
	
	// Close results channel when all workers are done
	go func() {
		wg.Wait()
		close(results)
		fmt.Println("All workers completed")
	}()
	
	return results
}

// fanIn merges multiple channels into a single channel
func fanIn(channels ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	multiplexed := make(chan int)
	
	// Function to forward values from input channel to multiplexed channel
	forward := func(ch <-chan int) {
		defer wg.Done()
		for val := range ch {
			multiplexed <- val
		}
	}
	
	// Start a goroutine for each input channel
	wg.Add(len(channels))
	for _, ch := range channels {
		go forward(ch)
	}
	
	// Close multiplexed channel when all input channels are done
	go func() {
		wg.Wait()
		close(multiplexed)
	}()
	
	return multiplexed
}

// fanInReflect uses reflection to merge channels dynamically
func fanInReflect(channels ...<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup
	wg.Add(len(channels))
	
	for _, c := range channels {
		go func(c <-chan int) {
			defer wg.Done()
			for n := range c {
				out <- n
			}
		}(c)
	}
	
	go func() {
		wg.Wait()
		close(out)
	}()
	
	return out
}

func main() {
	// Create jobs channel and send jobs
	jobs := make(chan int, 10)
	for i := 1; i <= 10; i++ {
		jobs <- i
	}
	close(jobs)
	
	// Fan out to 3 workers
	fmt.Println("Starting fan-out with 3 workers")
	results := fanOut(jobs, 3)
	
	// Collect and print results
	for result := range results {
		fmt.Printf("Got result: %d\n", result)
	}
	
	// Demonstrate fan-in with multiple channels
	fmt.Println("\nDemonstrating fan-in pattern")
	ch1 := make(chan int)
	ch2 := make(chan int)
	ch3 := make(chan int)
	
	// Send values on each channel
	go func() {
		for i := 1; i <= 3; i++ {
			ch1 <- i * 10
			time.Sleep(100 * time.Millisecond)
		}
		close(ch1)
	}()
	
	go func() {
		for i := 1; i <= 3; i++ {
			ch2 <- i * 100
			time.Sleep(150 * time.Millisecond)
		}
		close(ch2)
	}()
	
	go func() {
		for i := 1; i <= 3; i++ {
			ch3 <- i * 1000
			time.Sleep(80 * time.Millisecond)
		}
		close(ch3)
	}()
	
	// Fan-in the channels
	merged := fanIn(ch1, ch2, ch3)
	
	// Print merged results
	for result := range merged {
		fmt.Printf("Merged result: %d\n", result)
	}
}

The fan-out/fan-in pattern is particularly useful for:

  1. CPU-bound tasks: Distributing computation across multiple cores
  2. I/O-bound tasks: Managing multiple concurrent I/O operations
  3. Rate limiting: Controlling the degree of parallelism
  4. Batch processing: Processing large datasets in parallel chunks

Pipeline Pattern

Pipelines compose a series of processing stages connected by channels, allowing data to flow through multiple transformations:

package main

import (
	"fmt"
	"math/rand"
	"sync"
)

// generator creates a channel that emits the provided integers
func generator(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for _, n := range nums {
			out <- n
		}
	}()
	return out
}

// square receives integers, squares them, and sends them to a returned channel
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * n
		}
	}()
	return out
}

// filter receives integers and sends only those that satisfy the predicate function
func filter(in <-chan int, predicate func(int) bool) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			if predicate(n) {
				out <- n
			}
		}
	}()
	return out
}

// merge combines multiple channels into a single channel
func merge(cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)
	
	// Start an output goroutine for each input channel
	output := func(c <-chan int) {
		defer wg.Done()
		for n := range c {
			out <- n
		}
	}
	
	wg.Add(len(cs))
	for _, c := range cs {
		go output(c)
	}
	
	// Close the output channel when all input channels are done
	go func() {
		wg.Wait()
		close(out)
	}()
	
	return out
}

// batchProcessor demonstrates processing data in batches
func batchProcessor(in <-chan int, batchSize int) <-chan []int {
	out := make(chan []int)
	
	go func() {
		defer close(out)
		
		batch := make([]int, 0, batchSize)
		for n := range in {
			batch = append(batch, n)
			
			// When batch is full, send it and create a new one
			if len(batch) == batchSize {
				out <- batch
				batch = make([]int, 0, batchSize)
			}
		}
		
		// Send any remaining items in the last batch
		if len(batch) > 0 {
			out <- batch
		}
	}()
	
	return out
}

func main() {
	// Create a pipeline: generate -> square -> filter -> print
	fmt.Println("=== Basic Pipeline ===")
	c := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
	c = square(c)
	c = filter(c, func(n int) bool {
		return n%2 == 0 // Only even numbers
	})
	
	// Consume the output
	for n := range c {
		fmt.Println(n)
	}
	
	// Demonstrate a more complex pipeline with multiple sources and fan-in
	fmt.Println("\n=== Multi-source Pipeline with Fan-in ===")
	
	// Create multiple sources
	c1 := generator(1, 2, 3)
	c2 := generator(4, 5, 6)
	c3 := generator(7, 8, 9)
	
	// Process each source
	c1 = square(c1)
	c2 = square(c2)
	c3 = square(c3)
	
	// Merge the results
	merged := merge(c1, c2, c3)
	
	// Consume the merged output
	for n := range merged {
		fmt.Println(n)
	}
	
	// Demonstrate batch processing
	fmt.Println("\n=== Batch Processing Pipeline ===")
	
	// Generate 10 random numbers
	source := make(chan int)
	go func() {
		defer close(source)
		for i := 0; i < 10; i++ {
			source <- rand.Intn(100)
		}
	}()
	
	// Process in batches of 3
	batches := batchProcessor(source, 3)
	
	// Consume and process batches
	for batch := range batches {
		fmt.Printf("Processing batch: %v\n", batch)
		sum := 0
		for _, n := range batch {
			sum += n
		}
		fmt.Printf("Batch sum: %d\n", sum)
	}
}

Key pipeline characteristics:

  1. Composability: Each stage performs a specific transformation and can be composed with other stages
  2. Unidirectional flow: Data flows in one direction through the pipeline
  3. Concurrent execution: Each stage runs in its own goroutine
  4. Bounded stages: Each stage only processes one item at a time, providing natural backpressure

Worker Pool Pattern

Worker pools manage a fixed number of goroutines to process tasks from a queue:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Task represents a unit of work
type Task struct {
	ID     int
	Data   interface{}
	Result interface{}
	Err    error
}

// WorkerPool manages a pool of workers
type WorkerPool struct {
	tasks       chan Task
	results     chan Task
	concurrency int
	wg          sync.WaitGroup
	ctx         context.Context
	cancel      context.CancelFunc
}

// NewWorkerPool creates a new worker pool with the specified concurrency
func NewWorkerPool(concurrency int) *WorkerPool {
	ctx, cancel := context.WithCancel(context.Background())
	return &WorkerPool{
		tasks:       make(chan Task),
		results:     make(chan Task),
		concurrency: concurrency,
		ctx:         ctx,
		cancel:      cancel,
	}
}

// Start launches the worker pool
func (p *WorkerPool) Start() {
	// Start workers
	for i := 0; i < p.concurrency; i++ {
		p.wg.Add(1)
		go p.worker(i)
	}
	
	// Start result collector
	go func() {
		p.wg.Wait()
		close(p.results)
	}()
}

// worker processes tasks from the task channel
func (p *WorkerPool) worker(id int) {
	defer p.wg.Done()
	
	for {
		select {
		case task, ok := <-p.tasks:
			if !ok {
				return // Task channel closed
			}
			
			// Process the task
			fmt.Printf("Worker %d processing task %d\n", id, task.ID)
			time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // Simulate work
			
			// Generate result (in a real application, this would be actual processing)
			task.Result = task.Data.(int) * 2
			
			// Send result
			select {
			case p.results <- task:
				// Result sent successfully
			case <-p.ctx.Done():
				return // Context cancelled
			}
			
		case <-p.ctx.Done():
			return // Context cancelled
		}
	}
}

// Submit adds a task to the worker pool
func (p *WorkerPool) Submit(task Task) {
	select {
	case p.tasks <- task:
		// Task submitted successfully
	case <-p.ctx.Done():
		// Pool is shutting down
	}
}

// Results returns the channel of completed tasks
func (p *WorkerPool) Results() <-chan Task {
	return p.results
}

// Stop gracefully shuts down the worker pool
func (p *WorkerPool) Stop() {
	p.cancel()  // Signal all workers to stop
	close(p.tasks) // Close task channel
}

// ThrottledWorkerPool extends WorkerPool with rate limiting
type ThrottledWorkerPool struct {
	*WorkerPool
	rate  time.Duration // Minimum time between task submissions
	limit chan struct{} // Semaphore for limiting concurrent tasks
}

// NewThrottledWorkerPool creates a new rate-limited worker pool
func NewThrottledWorkerPool(concurrency int, rate time.Duration, maxQueued int) *ThrottledWorkerPool {
	return &ThrottledWorkerPool{
		WorkerPool: NewWorkerPool(concurrency),
		rate:       rate,
		limit:      make(chan struct{}, maxQueued),
	}
}

// Submit adds a task to the throttled worker pool, respecting rate limits
func (p *ThrottledWorkerPool) Submit(task Task) {
	// Add to limit before submitting
	p.limit <- struct{}{}
	
	go func() {
		defer func() { <-p.limit }() // Release limit when done
		
		// Submit to underlying pool
		p.WorkerPool.Submit(task)
		
		// Enforce rate limit
		time.Sleep(p.rate)
	}()
}

func main() {
	// Create a worker pool with 3 workers
	fmt.Println("=== Basic Worker Pool ===")
	pool := NewWorkerPool(3)
	pool.Start()
	
	// Submit 10 tasks
	for i := 0; i < 10; i++ {
		pool.Submit(Task{
			ID:   i,
			Data: i,
		})
	}
	
	// Close the task channel to signal no more tasks
	pool.Stop()
	
	// Collect results
	for task := range pool.Results() {
		fmt.Printf("Task %d result: %v\n", task.ID, task.Result)
	}
	
	// Demonstrate throttled worker pool
	fmt.Println("\n=== Throttled Worker Pool ===")
	throttled := NewThrottledWorkerPool(3, 200*time.Millisecond, 5)
	throttled.Start()
	
	// Submit tasks rapidly,
	// Submit tasks rapidly, but they'll be rate-limited
	for i := 0; i < 10; i++ {
		throttled.Submit(Task{
			ID:   i,
			Data: i,
		})
		fmt.Printf("Submitted task %d\n", i)
	}
	
	// Wait for a bit to see the rate limiting in action
	time.Sleep(1 * time.Second)
	
	// Stop the pool
	throttled.Stop()
	
	// Collect results
	for task := range throttled.Results() {
		fmt.Printf("Throttled task %d result: %v\n", task.ID, task.Result)
	}
}

The worker pool pattern is particularly useful for:

  1. Controlling concurrency: Limiting the number of concurrent operations to prevent resource exhaustion
  2. Load balancing: Distributing work evenly across available resources
  3. Backpressure handling: Managing the flow of work when producers are faster than consumers
  4. Resource management: Efficiently utilizing system resources like CPU cores, network connections, or database connections

Channel-Based Semaphores

Channels can be used as semaphores to limit concurrent access to resources:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Semaphore represents a counting semaphore
type Semaphore chan struct{}

// Acquire n resources from the semaphore
func (s Semaphore) Acquire(n int) {
	for i := 0; i < n; i++ {
		s <- struct{}{}
	}
}

// Release n resources back to the semaphore
func (s Semaphore) Release(n int) {
	for i := 0; i < n; i++ {
		<-s
	}
}

// TryAcquire attempts to acquire n resources without blocking
// Returns true if successful, false if would block
func (s Semaphore) TryAcquire(n int) bool {
	select {
	case s <- struct{}{}:
		if n > 1 {
			// For multiple resources, we need to be careful
			// If we can't acquire all, release what we've acquired
			if !s.TryAcquire(n - 1) {
				<-s // Release the one we just acquired
				return false
			}
		}
		return true
	default:
		return false
	}
}

// NewSemaphore creates a new semaphore with the given capacity
func NewSemaphore(capacity int) Semaphore {
	return make(Semaphore, capacity)
}

// simulateResourceUsage demonstrates using a semaphore to limit concurrent access
func simulateResourceUsage(id int, sem Semaphore, wg *sync.WaitGroup) {
	defer wg.Done()
	
	// Try to acquire the resource
	fmt.Printf("Client %d: Attempting to acquire resource\n", id)
	
	// Try non-blocking acquire first
	if sem.TryAcquire(1) {
		fmt.Printf("Client %d: Immediately acquired resource\n", id)
	} else {
		fmt.Printf("Client %d: Waiting for resource...\n", id)
		sem.Acquire(1)
		fmt.Printf("Client %d: Eventually acquired resource\n", id)
	}
	
	// Use the resource
	fmt.Printf("Client %d: Using resource\n", id)
	time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
	
	// Release the resource
	fmt.Printf("Client %d: Releasing resource\n", id)
	sem.Release(1)
}

func main() {
	// Create a semaphore with capacity 3
	sem := NewSemaphore(3)
	
	// Create 10 clients that will try to use the resource
	var wg sync.WaitGroup
	for i := 1; i <= 10; i++ {
		wg.Add(1)
		go simulateResourceUsage(i, sem, &wg)
		time.Sleep(50 * time.Millisecond) // Stagger client starts
	}
	
	// Wait for all clients to finish
	wg.Wait()
	fmt.Println("All clients finished")
}

Channel-based semaphores are useful for:

  1. Connection pooling: Limiting the number of concurrent connections to a resource
  2. Rate limiting: Controlling the rate of operations
  3. Resource protection: Preventing resource exhaustion by limiting concurrent access
  4. Concurrency control: Implementing more complex synchronization patterns