Advanced Worker Pool Architectures

While the basic worker pool pattern is powerful, real-world applications often require more sophisticated architectures to handle complex requirements.

Dynamic Scaling Worker Pool

One limitation of the basic worker pool is its fixed size. In production environments, workloads often fluctuate, making a static pool size inefficient. Let’s implement a worker pool that can dynamically adjust its size based on load:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// DynamicPool represents a worker pool that can scale based on load
type DynamicPool struct {
	tasks         chan func()
	workerCount   atomic.Int32
	minWorkers    int32
	maxWorkers    int32
	idleTimeout   time.Duration
	mutex         sync.Mutex
	wg            sync.WaitGroup
	quit          chan struct{}
	metricsUpdate chan struct{}
	queueDepth    atomic.Int32
	activeWorkers atomic.Int32
}

// NewDynamicPool creates a new dynamic worker pool
func NewDynamicPool(minWorkers, maxWorkers int32, queueSize int, idleTimeout time.Duration) *DynamicPool {
	if minWorkers <= 0 {
		minWorkers = 1
	}
	if maxWorkers < minWorkers {
		maxWorkers = minWorkers
	}
	
	pool := &DynamicPool{
		tasks:         make(chan func(), queueSize),
		minWorkers:    minWorkers,
		maxWorkers:    maxWorkers,
		idleTimeout:   idleTimeout,
		quit:          make(chan struct{}),
		metricsUpdate: make(chan struct{}, 1),
	}
	
	// Start the minimum number of workers
	for i := int32(0); i < minWorkers; i++ {
		pool.startWorker()
	}
	
	// Start the scaling manager
	go pool.scalingManager()
	
	return pool
}

// Submit adds a task to the pool
func (p *DynamicPool) Submit(task func()) {
	select {
	case p.tasks <- task:
		// Task submitted successfully
		p.queueDepth.Add(1)
		
		// Signal metrics update if not already pending
		select {
		case p.metricsUpdate <- struct{}{}:
		default:
			// Update already pending
		}
	case <-p.quit:
		// Pool is shutting down
	}
}

// startWorker launches a new worker goroutine
func (p *DynamicPool) startWorker() {
	p.mutex.Lock()
	defer p.mutex.Unlock()
	
	p.workerCount.Add(1)
	p.wg.Add(1)
	
	go func() {
		defer p.wg.Done()
		defer p.workerCount.Add(-1)
		
		idleTimer := time.NewTimer(p.idleTimeout)
		defer idleTimer.Stop()
		
		for {
			select {
			case task, ok := <-p.tasks:
				if !ok {
					// Channel closed, worker should exit
					return
				}
				
				// Reset idle timer
				if !idleTimer.Stop() {
					select {
					case <-idleTimer.C:
					default:
					}
				}
				
				// Process task
				p.activeWorkers.Add(1)
				p.queueDepth.Add(-1)
				task()
				p.activeWorkers.Add(-1)
				
				// Restart idle timer
				idleTimer.Reset(p.idleTimeout)
				
				// Signal metrics update
				select {
				case p.metricsUpdate <- struct{}{}:
				default:
					// Update already pending
				}
				
			case <-idleTimer.C:
				// Worker has been idle for too long
				currentCount := p.workerCount.Load()
				if currentCount > p.minWorkers {
					// Only exit if we're above minimum worker count
					return
				}
				// Reset timer if we need to stay alive
				idleTimer.Reset(p.idleTimeout)
				
			case <-p.quit:
				// Pool is shutting down
				return
			}
		}
	}()
}

// scalingManager periodically checks if we need to scale up or down
func (p *DynamicPool) scalingManager() {
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()
	
	for {
		select {
		case <-p.metricsUpdate:
			p.evaluateScaling()
		case <-ticker.C:
			p.evaluateScaling()
		case <-p.quit:
			return
		}
	}
}

// evaluateScaling decides whether to scale up or down based on current metrics
func (p *DynamicPool) evaluateScaling() {
	queueDepth := p.queueDepth.Load()
	workerCount := p.workerCount.Load()
	activeWorkers := p.activeWorkers.Load()
	
	// Scale up if queue is building and we're below max workers
	if queueDepth > 0 && activeWorkers >= workerCount*80/100 && workerCount < p.maxWorkers {
		// Start new workers based on queue depth, but don't exceed max
		workersToAdd := min(queueDepth, p.maxWorkers-workerCount)
		for i := int32(0); i < workersToAdd; i++ {
			p.startWorker()
		}
		fmt.Printf("Scaled up to %d workers (added %d)\n", p.workerCount.Load(), workersToAdd)
	}
	
	// Note: We don't need to explicitly scale down as workers will exit after idle timeout
}

// min returns the minimum of two int32 values
func min(a, b int32) int32 {
	if a < b {
		return a
	}
	return b
}

// Stop gracefully shuts down the pool
func (p *DynamicPool) Stop() {
	close(p.quit)
	close(p.tasks)
	p.wg.Wait()
}

// Stats returns current pool statistics
func (p *DynamicPool) Stats() (workers, active, queueDepth int32) {
	return p.workerCount.Load(), p.activeWorkers.Load(), p.queueDepth.Load()
}

func main() {
	// Create a dynamic pool with 2-10 workers, queue size 100, and 5s idle timeout
	pool := NewDynamicPool(2, 10, 100, 5*time.Second)
	
	// Submit a burst of tasks
	for i := 0; i < 20; i++ {
		taskID := i
		pool.Submit(func() {
			fmt.Printf("Processing task %d\n", taskID)
			// Simulate varying work durations
			time.Sleep(time.Duration(100+taskID*10) * time.Millisecond)
		})
	}
	
	// Monitor pool stats
	go func() {
		for i := 0; i < 10; i++ {
			workers, active, queue := pool.Stats()
			fmt.Printf("Stats: workers=%d, active=%d, queue=%d\n", workers, active, queue)
			time.Sleep(500 * time.Millisecond)
		}
	}()
	
	// Wait a bit for tasks to process
	time.Sleep(3 * time.Second)
	
	// Submit another burst of tasks
	for i := 20; i < 30; i++ {
		taskID := i
		pool.Submit(func() {
			fmt.Printf("Processing task %d\n", taskID)
			time.Sleep(time.Duration(50+taskID*5) * time.Millisecond)
		})
	}
	
	// Wait for all tasks to complete and workers to scale down
	time.Sleep(8 * time.Second)
	
	// Shut down the pool
	pool.Stop()
	fmt.Println("Pool shut down")
}

This dynamic worker pool implementation includes several advanced features:

  1. Auto-scaling: Workers are added when the queue depth increases and existing workers are busy
  2. Self-termination: Idle workers terminate themselves after a timeout if they exceed the minimum count
  3. Metrics tracking: The pool tracks queue depth, active workers, and total worker count
  4. Graceful shutdown: The pool can be shut down cleanly, waiting for in-progress tasks to complete

Priority-Based Worker Pool

In many applications, not all tasks are equally important. A priority-based worker pool allows more important tasks to be processed before less important ones:

package main

import (
	"container/heap"
	"fmt"
	"sync"
	"time"
)

// PriorityTask represents a task with a priority level
type PriorityTask struct {
	execute  func()
	priority int
	index    int // Used by the heap implementation
}

// PriorityQueue implements heap.Interface for PriorityTask items
type PriorityQueue []*PriorityTask

// Len returns the length of the queue
func (pq PriorityQueue) Len() int { return len(pq) }

// Less defines the ordering of tasks (higher priority first)
func (pq PriorityQueue) Less(i, j int) bool {
	return pq[i].priority > pq[j].priority
}

// Swap swaps the elements with indexes i and j
func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j
}

// Push adds an element to the priority queue
func (pq *PriorityQueue) Push(x interface{}) {
	n := len(*pq)
	task := x.(*PriorityTask)
	task.index = n
	*pq = append(*pq, task)
}

// Pop removes and returns the highest priority element
func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	task := old[n-1]
	old[n-1] = nil  // Avoid memory leak
	task.index = -1 // For safety
	*pq = old[0 : n-1]
	return task
}

// PriorityWorkerPool manages a pool of workers that process tasks based on priority
type PriorityWorkerPool struct {
	workers    int
	taskQueue  PriorityQueue
	queueMutex sync.Mutex
	queueCond  *sync.Cond
	wg         sync.WaitGroup
	quit       chan struct{}
}

// NewPriorityWorkerPool creates a new priority-based worker pool
func NewPriorityWorkerPool(workers int) *PriorityWorkerPool {
	pool := &PriorityWorkerPool{
		workers:   workers,
		taskQueue: make(PriorityQueue, 0),
		quit:      make(chan struct{}),
	}
	pool.queueCond = sync.NewCond(&pool.queueMutex)
	return pool
}

// Start launches the worker pool
func (p *PriorityWorkerPool) Start() {
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go func(workerID int) {
			defer p.wg.Done()
			
			for {
				// Get the next task from the priority queue
				p.queueMutex.Lock()
				for len(p.taskQueue) == 0 {
					// Check if we should quit
					select {
					case <-p.quit:
						p.queueMutex.Unlock()
						return
					default:
						// Wait for a task to be added
						p.queueCond.Wait()
						
						// Check again if we should quit after waking up
						select {
						case <-p.quit:
							p.queueMutex.Unlock()
							return
						default:
							// Continue to get task
						}
					}
				}
				
				// Get highest priority task
				task := heap.Pop(&p.taskQueue).(*PriorityTask)
				p.queueMutex.Unlock()
				
				// Execute the task
				fmt.Printf("Worker %d executing task with priority %d\n", workerID, task.priority)
				task.execute()
			}
		}(i + 1)
	}
}

// Submit adds a task to the pool with the specified priority
func (p *PriorityWorkerPool) Submit(task func(), priority int) {
	p.queueMutex.Lock()
	defer p.queueMutex.Unlock()
	
	// Create a new priority task
	priorityTask := &PriorityTask{
		execute:  task,
		priority: priority,
	}
	
	// Add to priority queue
	heap.Push(&p.taskQueue, priorityTask)
	
	// Signal that a task is available
	p.queueCond.Signal()
}

// Stop gracefully shuts down the pool
func (p *PriorityWorkerPool) Stop() {
	close(p.quit)
	p.queueCond.Broadcast() // Wake up all workers
	p.wg.Wait()
}

func main() {
	// Create a priority worker pool with 3 workers
	pool := NewPriorityWorkerPool(3)
	pool.Start()
	
	// Submit tasks with different priorities
	// Higher number = higher priority
	priorities := []int{1, 5, 2, 10, 3, 7, 1}
	
	for i, priority := range priorities {
		taskID := i
		taskPriority := priority
		
		pool.Submit(func() {
			fmt.Printf("Executing task %d with priority %d\n", taskID, taskPriority)
			// Simulate work
			time.Sleep(200 * time.Millisecond)
		}, taskPriority)
	}
	
	// Allow time for tasks to be processed
	time.Sleep(1 * time.Second)
	
	// Stop the pool
	pool.Stop()
	fmt.Println("Pool shut down")
}

This priority-based worker pool uses Go’s container/heap package to maintain a priority queue of tasks. Key features include:

  1. Priority ordering: Tasks with higher priority values are processed first
  2. Efficient priority management: The heap data structure ensures efficient insertion and removal of tasks
  3. Conditional waiting: Workers use a condition variable to wait for new tasks without busy-waiting

Rate-Limited Worker Pool

In many scenarios, you need to limit the rate at which tasks are processed, either to prevent overwhelming downstream systems or to comply with API rate limits:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"golang.org/x/time/rate"
)

// RateLimitedPool represents a worker pool with rate limiting
type RateLimitedPool struct {
	workers    int
	limiter    *rate.Limiter
	tasks      chan func()
	wg         sync.WaitGroup
	ctx        context.Context
	cancelFunc context.CancelFunc
}

// NewRateLimitedPool creates a new rate-limited worker pool
// rate is specified in tasks per second
func NewRateLimitedPool(workers int, tasksPerSecond float64, burstSize int, queueSize int) *RateLimitedPool {
	ctx, cancel := context.WithCancel(context.Background())
	
	return &RateLimitedPool{
		workers:    workers,
		limiter:    rate.NewLimiter(rate.Limit(tasksPerSecond), burstSize),
		tasks:      make(chan func(), queueSize),
		ctx:        ctx,
		cancelFunc: cancel,
	}
}

// Start launches the worker pool
func (p *RateLimitedPool) Start() {
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go func(workerID int) {
			defer p.wg.Done()
			
			for {
				select {
				case task, ok := <-p.tasks:
					if !ok {
						// Channel closed, worker should exit
						return
					}
					
					// Wait for rate limiter permission
					err := p.limiter.Wait(p.ctx)
					if err != nil {
						// Context canceled, worker should exit
						return
					}
					
					// Execute the task
					startTime := time.Now()
					task()
					duration := time.Since(startTime)
					fmt.Printf("Worker %d completed task in %v\n", workerID, duration)
					
				case <-p.ctx.Done():
					// Context canceled, worker should exit
					return
				}
			}
		}(i + 1)
	}
}

// Submit adds a task to the pool
func (p *RateLimitedPool) Submit(task func()) bool {
	select {
	case p.tasks <- task:
		return true
	case <-p.ctx.Done():
		return false
	}
}

// Stop gracefully shuts down the pool
func (p *RateLimitedPool) Stop() {
	p.cancelFunc() // Signal all workers to stop
	close(p.tasks) // Stop accepting new tasks
	p.wg.Wait()    // Wait for all workers to finish
}

func main() {
	// Create a rate-limited pool with 5 workers, 10 tasks per second,
	// burst size of 3, and queue size of 100
	pool := NewRateLimitedPool(5, 10, 3, 100)
	pool.Start()
	
	// Submit 30 tasks
	for i := 0; i < 30; i++ {
		taskID := i
		if !pool.Submit(func() {
			fmt.Printf("Processing task %d\n", taskID)
			// Simulate work
			time.Sleep(50 * time.Millisecond)
		}) {
			fmt.Printf("Failed to submit task %d\n", taskID)
		}
	}
	
	// Wait for tasks to complete (should take ~3 seconds due to rate limiting)
	time.Sleep(4 * time.Second)
	
	// Stop the pool
	pool.Stop()
	fmt.Println("Pool shut down")
}

This rate-limited worker pool uses Go’s golang.org/x/time/rate package to control the rate at which tasks are processed. Key features include:

  1. Configurable rate limiting: Control tasks per second and burst size
  2. Context-based cancellation: Clean shutdown using context cancellation
  3. Non-blocking submission: Task submission doesn’t block the caller

Worker Pool with Backpressure

Backpressure is a critical mechanism for maintaining system stability under load. It allows a system to signal upstream components when it’s approaching capacity limits:

package main

import (
	"fmt"
	"sync"
	"time"
)

// BackpressurePool represents a worker pool with backpressure mechanisms
type BackpressurePool struct {
	workers       int
	tasks         chan func()
	wg            sync.WaitGroup
	quit          chan struct{}
	maxQueueDepth int
}

// NewBackpressurePool creates a new worker pool with backpressure
ressurePool {
	return &BackpressurePool{
		workers:       workers,
		tasks:         make(chan func(), maxQueueDepth),
		quit:          make(chan struct{}),
		maxQueueDepth: maxQueueDepth,
	}
}

// Start launches the worker pool
func (p *BackpressurePool) Start() {
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go func(workerID int) {
			defer p.wg.Done()
			
			for {
				select {
				case task, ok := <-p.tasks:
					if !ok {
						return // Channel closed
					}
					
					// Execute the task
					task()
					
				case <-p.quit:
					return
				}
			}
		}(i + 1)
	}
}

// Submit adds a task to the pool with backpressure
// Returns true if the task was accepted, false if rejected due to backpressure
func (p *BackpressurePool) Submit(task func()) bool {
	select {
	case p.tasks <- task:
		return true
	default:
		// Channel buffer is full - apply backpressure by rejecting the task
		return false
	}
}

// SubmitWithTimeout adds a task to the pool with a timeout
// Returns true if the task was accepted, false if rejected due to backpressure or timeout
func (p *BackpressurePool) SubmitWithTimeout(task func(), timeout time.Duration) bool {
	timer := time.NewTimer(timeout)
	defer timer.Stop()
	
	select {
	case p.tasks <- task:
		return true
	case <-timer.C:
		// Timeout occurred
		return false
	}
}

// QueueDepth returns the current number of tasks in the queue
func (p *BackpressurePool) QueueDepth() int {
	return len(p.tasks)
}

// Stop gracefully shuts down the pool
func (p *BackpressurePool) Stop() {
	close(p.quit)
	close(p.tasks)
	p.wg.Wait()
}

func main() {
	// Create a backpressure pool with 3 workers and max queue depth of 5
	pool := NewBackpressurePool(3, 5)
	pool.Start()
	
	// Submit tasks with backpressure
	for i := 0; i < 20; i++ {
		taskID := i
		success := pool.Submit(func() {
			fmt.Printf("Processing task %d\n", taskID)
			// Simulate work
			time.Sleep(200 * time.Millisecond)
		})
		
		if success {
			fmt.Printf("Task %d accepted\n", taskID)
		} else {
			fmt.Printf("Task %d rejected due to backpressure\n", taskID)
			// In a real system, you might:
			// 1. Retry after a delay
			// 2. Store the task for later processing
			// 3. Drop the task if it's no longer relevant
			// 4. Alert monitoring systems about rejection rate
			
			// For this example, we'll wait a bit before retrying
			time.Sleep(100 * time.Millisecond)
			i-- // Retry this task
		}
	}
	
	// Wait for tasks to complete
	time.Sleep(2 * time.Second)
	
	// Stop the pool
	pool.Stop()
	fmt.Println("Pool shut down")
}

This backpressure pool implementation provides mechanisms to handle overload situations:

  1. Non-blocking submission: The Submit method returns immediately, indicating whether the task was accepted
  2. Timeout-based submission: The SubmitWithTimeout method allows waiting for a specified duration for queue space
  3. Queue depth monitoring: The QueueDepth method allows monitoring the current queue size
  4. Rejection handling: The caller can implement appropriate strategies when tasks are rejected