Worker Pool and Pipeline Patterns

Worker pools and pipelines are powerful patterns for processing data efficiently in distributed systems.

Advanced Worker Pool with Adaptive Scaling

This implementation adjusts the number of workers based on load:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

// Job represents a unit of work
type Job struct {
	ID       int
	Payload  interface{}
	Duration time.Duration // Simulated processing time
}

// Result represents the outcome of job processing
type Result struct {
	JobID   int
	Output  interface{}
	Err     error
	Latency time.Duration
}

// AdaptiveWorkerPool implements a worker pool that scales based on load
type AdaptiveWorkerPool struct {
	jobQueue       chan Job
	resultQueue    chan Result
	workerCount    int32
	activeWorkers  int32
	maxWorkers     int32
	minWorkers     int32
	pendingJobs    int32
	mu             sync.Mutex
	stopCh         chan struct{}
	workerWg       sync.WaitGroup
	metrics        *PoolMetrics
	scaleInterval  time.Duration
	scaleThreshold float64 // Threshold for scaling (0-1)
}

// PoolMetrics tracks performance metrics for the worker pool
type PoolMetrics struct {
	totalJobs      int64
	completedJobs  int64
	failedJobs     int64
	totalLatency   int64 // in nanoseconds
	queueHighWater int32
}

// NewAdaptiveWorkerPool creates a new adaptive worker pool
func NewAdaptiveWorkerPool(ctx context.Context, minWorkers, maxWorkers, queueSize int) *AdaptiveWorkerPool {
	pool := &AdaptiveWorkerPool{
		jobQueue:       make(chan Job, queueSize),
		resultQueue:    make(chan Result, queueSize),
		minWorkers:     int32(minWorkers),
		maxWorkers:     int32(maxWorkers),
		workerCount:    0,
		activeWorkers:  0,
		pendingJobs:    0,
		stopCh:         make(chan struct{}),
		metrics:        &PoolMetrics{},
		scaleInterval:  500 * time.Millisecond,
		scaleThreshold: 0.7, // Scale up when 70% of workers are busy
	}
	
	// Start the minimum number of workers
	for i := 0; i < minWorkers; i++ {
		pool.startWorker(ctx)
	}
	
	// Start the autoscaler
	go pool.autoscaler(ctx)
	
	return pool
}

// startWorker launches a new worker goroutine
func (p *AdaptiveWorkerPool) startWorker(ctx context.Context) {
	p.workerWg.Add(1)
	atomic.AddInt32(&p.workerCount, 1)
	
	go func() {
		defer p.workerWg.Done()
		defer atomic.AddInt32(&p.workerCount, -1)
		
		for {
			select {
			case job, ok := <-p.jobQueue:
				if !ok {
					return // Channel closed
				}
				
				// Mark worker as active
				atomic.AddInt32(&p.activeWorkers, 1)
				atomic.AddInt32(&p.pendingJobs, -1)
				
				// Process the job
				startTime := time.Now()
				var result Result
				
				// Simulate processing with potential failures
				time.Sleep(job.Duration)
				
				if rand.Intn(10) < 1 { // 10% failure rate
					result = Result{
						JobID:   job.ID,
						Output:  nil,
						Err:     fmt.Errorf("processing error on job %d", job.ID),
						Latency: time.Since(startTime),
					}
					atomic.AddInt64(&p.metrics.failedJobs, 1)
				} else {
					result = Result{
						JobID:   job.ID,
						Output:  fmt.Sprintf("Processed result for job %d", job.ID),
						Err:     nil,
						Latency: time.Since(startTime),
					}
				}
				
				// Update metrics
				atomic.AddInt64(&p.metrics.completedJobs, 1)
				atomic.AddInt64(&p.metrics.totalLatency, int64(result.Latency))
				
				// Send the result
				select {
				case p.resultQueue <- result:
				case <-ctx.Done():
					return
				}
				
				// Mark worker as inactive
				atomic.AddInt32(&p.activeWorkers, -1)
				
			case <-p.stopCh:
				return
				
			case <-ctx.Done():
				return
			}
		}
	}()
}

// autoscaler adjusts the number of workers based on load
func (p *AdaptiveWorkerPool) autoscaler(ctx context.Context) {
	ticker := time.NewTicker(p.scaleInterval)
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			p.adjustWorkerCount(ctx)
		case <-p.stopCh:
			return
		case <-ctx.Done():
			return
		}
	}
}

// adjustWorkerCount scales the worker pool up or down based on current load
func (p *AdaptiveWorkerPool) adjustWorkerCount(ctx context.Context) {
	currentWorkers := atomic.LoadInt32(&p.workerCount)
	activeWorkers := atomic.LoadInt32(&p.activeWorkers)
	pendingJobs := atomic.LoadInt32(&p.pendingJobs)
	
	// Calculate utilization
	var utilization float64
	if currentWorkers > 0 {
		utilization = float64(activeWorkers) / float64(currentWorkers)
	}
	
	// Scale up if utilization is high and there are pending jobs
	if utilization >= p.scaleThreshold && pendingJobs > 0 && currentWorkers < p.maxWorkers {
		// Calculate how many workers to add (up to 25% more, at least 1)
		toAdd := max(1, int(float64(currentWorkers)*0.25))
		
		// Don't exceed max workers
		if currentWorkers+int32(toAdd) > p.maxWorkers {
			toAdd = int(p.maxWorkers - currentWorkers)
		}
		
		fmt.Printf("Scaling up: Adding %d workers (utilization: %.2f, pending: %d)\n", 
			toAdd, utilization, pendingJobs)
		
		for i := 0; i < toAdd; i++ {
			p.startWorker(ctx)
		}
	}
	
	// Scale down if utilization is low and we have more than minimum workers
	if utilization < p.scaleThreshold*0.5 && currentWorkers > p.minWorkers && pendingJobs == 0 {
		// Calculate how many workers to remove (up to 15% fewer, at least 1)
		toRemove := max(1, int(float64(currentWorkers)*0.15))
		
		// Don't go below min workers
		if currentWorkers-int32(toRemove) < p.minWorkers {
			toRemove = int(currentWorkers - p.minWorkers)
		}
		
		fmt.Printf("Scaling down: Removing %d workers (utilization: %.2f)\n", 
			toRemove, utilization)
		
		// Signal workers to stop
		for i := 0; i < toRemove; i++ {
			select {
			case p.stopCh <- struct{}{}:
			default:
				// If channel is full, we've already signaled enough workers
				break
			}
		}
	}
}

// Submit adds a job to the pool
func (p *AdaptiveWorkerPool) Submit(ctx context.Context, job Job) error {
	select {
	case p.jobQueue <- job:
		atomic.AddInt64(&p.metrics.totalJobs, 1)
		atomic.AddInt32(&p.pendingJobs, 1)
		
		// Update high water mark for queue
		pending := atomic.LoadInt32(&p.pendingJobs)
		for {
			highWater := atomic.LoadInt32(&p.metrics.queueHighWater)
			if pending <= highWater || atomic.CompareAndSwapInt32(&p.metrics.queueHighWater, highWater, pending) {
				break
			}
		}
		
		return nil
	case <-ctx.Done():
		return ctx.Err()
	default:
		return fmt.Errorf("job queue is full")
	}
}

// Results returns the channel for receiving results
func (p *AdaptiveWorkerPool) Results() <-chan Result {
	return p.resultQueue
}

// Shutdown gracefully shuts down the worker pool
func (p *AdaptiveWorkerPool) Shutdown() {
	close(p.jobQueue)
	p.workerWg.Wait()
	close(p.resultQueue)
	close(p.stopCh)
}

// GetMetrics returns the current pool metrics
func (p *AdaptiveWorkerPool) GetMetrics() PoolMetrics {
	completed := atomic.LoadInt64(&p.metrics.completedJobs)
	metrics := PoolMetrics{
		totalJobs:      atomic.LoadInt64(&p.metrics.totalJobs),
		completedJobs:  completed,
		failedJobs:     atomic.LoadInt64(&p.metrics.failedJobs),
		queueHighWater: atomic.LoadInt32(&p.metrics.queueHighWater),
	}
	
	// Calculate average latency
	if completed > 0 {
		metrics.totalLatency = atomic.LoadInt64(&p.metrics.totalLatency) / completed
	}
	
	return metrics
}

// Helper function for max of two integers
func max(a, b int) int {
	if a > b {
		return a
	}
	return b
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	
	// Create an adaptive worker pool
	pool := NewAdaptiveWorkerPool(ctx, 5, 20, 100)
	defer pool.Shutdown()
	
	// Start a goroutine to collect results
	go func() {
		for result := range pool.Results() {
			if result.Err != nil {
				fmt.Printf("Job %d failed: %v (took %v)\n", 
					result.JobID, result.Err, result.Latency)
			} else {
				fmt.Printf("Job %d completed: %v (took %v)\n", 
					result.JobID, result.Output, result.Latency)
			}
		}
	}()
	
	// Submit jobs in waves to demonstrate scaling
	for wave := 0; wave < 3; wave++ {
		fmt.Printf("\n--- Starting job wave %d ---\n", wave+1)
		
		// Submit a batch of jobs
		jobCount := 50 + wave*25
		for i := 0; i < jobCount; i++ {
			// Create jobs with variable processing times
			duration := time.Duration(50+rand.Intn(200)) * time.Millisecond
			job := Job{
				ID:       wave*1000 + i,
				Payload:  fmt.Sprintf("Job data %d", i),
				Duration: duration,
			}
			
			if err := pool.Submit(ctx, job); err != nil {
				fmt.Printf("Failed to submit job: %v\n", err)
			}
		}
		
		// Wait between waves
		time.Sleep(2 * time.Second)
		
		// Print current metrics
		metrics := pool.GetMetrics()
		fmt.Printf("\nPool metrics after wave %d:\n", wave+1)
		fmt.Printf("- Total jobs: %d\n", metrics.totalJobs)
		fmt.Printf("- Completed jobs: %d\n", metrics.completedJobs)
		fmt.Printf("- Failed jobs: %d\n", metrics.failedJobs)
		fmt.Printf("- Average latency: %v\n", time.Duration(metrics.totalLatency))
		fmt.Printf("- Queue high water: %d\n", metrics.queueHighWater)
		fmt.Printf("- Current workers: %d\n", atomic.LoadInt32(&pool.workerCount))
		fmt.Printf("- Active workers: %d\n", atomic.LoadInt32(&pool.activeWorkers))
	}
	
	// Wait for all jobs to complete
	time.Sleep(1 * time.Second)
	
	// Final metrics
	metrics := pool.GetMetrics()
	fmt.Printf("\nFinal pool metrics:\n")
	fmt.Printf("- Total jobs: %d\n", metrics.totalJobs)
	fmt.Printf("- Completed jobs: %d\n", metrics.completedJobs)
	fmt.Printf("- Failed jobs: %d\n", metrics.failedJobs)
	fmt.Printf("- Average latency: %v\n", time.Duration(metrics.totalLatency))
	fmt.Printf("- Queue high water: %d\n", metrics.queueHighWater)
}

This advanced worker pool pattern is ideal for distributed systems because it:

  • Automatically scales based on workload
  • Efficiently manages resources
  • Provides detailed metrics for monitoring
  • Handles backpressure through queue management
  • Gracefully recovers from failures

Multi-Stage Pipeline Pattern

Pipelines allow you to process data through multiple stages efficiently:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// DataItem represents a piece of data flowing through the pipeline
type DataItem struct {
	ID      int
	Data    interface{}
	Metadata map[string]interface{}
	Error   error
}

// PipelineStage represents a processing stage in the pipeline
type PipelineStage func(ctx context.Context, in <-chan DataItem) <-chan DataItem

// Pipeline represents a multi-stage data processing pipeline
type Pipeline struct {
	stages []PipelineStage
}

// NewPipeline creates a new data processing pipeline
func NewPipeline(stages ...PipelineStage) *Pipeline {
	return &Pipeline{
		stages: stages,
	}
}

// Execute runs data through the pipeline
func (p *Pipeline) Execute(ctx context.Context, source <-chan DataItem) <-chan DataItem {
	// Start with the source channel
	current := source
	
	// Apply each stage in sequence
	for _, stage := range p.stages {
		current = stage(ctx, current)
	}
	
	return current
}

// Source creates a source channel for the pipeline
func Source(ctx context.Context, items []DataItem) <-chan DataItem {
	out := make(chan DataItem)
	
	go func() {
		defer close(out)
		
		for _, item := range items {
			select {
			case out <- item:
				// Item sent successfully
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

// Example pipeline stages

// Validate checks if data items are valid
func Validate(ctx context.Context, in <-chan DataItem) <-chan DataItem {
	out := make(chan DataItem)
	
	go func() {
		defer close(out)
		
		for item := range in {
			// Skip already failed items
			if item.Error != nil {
				select {
				case out <- item:
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// Perform validation
			if item.Data == nil {
				item.Error = fmt.Errorf("invalid data: nil value")
			}
			
			// Forward the item
			select {
			case out <- item:
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

// Transform applies a transformation to data items
func Transform(ctx context.Context, in <-chan DataItem) <-chan DataItem {
	out := make(chan DataItem)
	
	go func() {
		defer close(out)
		
		for item := range in {
			// Skip already failed items
			if item.Error != nil {
				select {
				case out <- item:
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// Apply transformation
			switch v := item.Data.(type) {
			case string:
				item.Data = fmt.Sprintf("Transformed: %s", v)
			case int:
				item.Data = v * 2
			default:
				item.Error = fmt.Errorf("unsupported data type")
			}
			
			// Add metadata
			if item.Metadata == nil {
				item.Metadata = make(map[string]interface{})
			}
			item.Metadata["transformed_at"] = time.Now()
			
			// Forward the item
			select {
			case out <- item:
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

// Enrich adds additional data to items
func Enrich(ctx context.Context, in <-chan DataItem) <-chan DataItem {
	out := make(chan DataItem)
	
	go func() {
		defer close(out)
		
		for item := range in {
			// Skip already failed items
			if item.Error != nil {
				select {
				case out <- item:
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// Add enrichment data
			if item.Metadata == nil {
				item.Metadata = make(map[string]interface{})
			}
			item.Metadata["enriched"] = true
			item.Metadata["enriched_at"] = time.Now()
			
			// Forward the item
			select {
			case out <- item:
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

// ParallelStage processes items in parallel
func ParallelStage(workers int, processor func(DataItem) DataItem) PipelineStage {
	return func(ctx context.Context, in <-chan DataItem) <-chan DataItem {
		out := make(chan DataItem)
		
		// Start a fixed number of workers
		var wg sync.WaitGroup
		wg.Add(workers)
		
		for i := 0; i < workers; i++ {
			go func(workerID int) {
				defer wg.Done()
				
				for item := range in {
					// Skip already failed items
					if item.Error != nil {
						select {
						case out <- item:
						case <-ctx.Done():
							return
						}
						continue
					}
					
					// Process the item
					processedItem := processor(item)
					
					// Add worker metadata
					if processedItem.Metadata == nil {
						processedItem.Metadata = make(map[string]interface{})
					}
					processedItem.Metadata["worker_id"] = workerID
					
					// Forward the processed item
					select {
					case out <- processedItem:
					case <-ctx.Done():
						return
					}
				}
			}(i)
		}
		
		// Close the output channel when all workers are done
		go func() {
			wg.Wait()
			close(out)
		}()
		
		return out
	}
}

func main() {
	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	// Create sample data
	data := []DataItem{
		{ID: 1, Data: "item 1"},
		{ID: 2, Data: "item 2"},
		{ID: 3, Data: "item 3"},
		{ID: 4, Data: "item 4"},
		{ID: 5, Data: nil}, // This will fail validation
		{ID: 6, Data: "item 6"},
		{ID: 7, Data: 42}, // Different type
		{ID: 8, Data: "item 8"},
	}
	
	// Create a pipeline
	pipeline := NewPipeline(
		Validate,
		Transform,
		ParallelStage(3, func(item DataItem) DataItem {
			// Simulate processing time
			time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
			return item
		}),
		Enrich,
	)
	
	// Create a source channel
	source := Source(ctx, data)
	
	// Execute the pipeline
	results := pipeline.Execute(ctx, source)
	
	// Collect and print results
	for result := range results {
		if result.Error != nil {
			fmt.Printf("Item %d failed: %v\n", result.ID, result.Error)
		} else {
			fmt.Printf("Item %d processed: %v with metadata: %v\n", 
				result.ID, result.Data, result.Metadata)
		}
	}
}

This pipeline pattern is valuable for distributed systems because it:

  • Separates processing logic into discrete, reusable stages
  • Handles errors gracefully at each stage
  • Enables parallel processing where appropriate
  • Maintains context and metadata throughout the processing flow
  • Provides backpressure through channel buffering