Dynamic Scaling and Load Balancing

In production environments, workloads rarely remain constant. Systems must adapt to changing demands, scaling resources up during peak loads and down during quiet periods to optimize resource utilization.

Adaptive Worker Pool with Load Balancing

Let’s implement a more sophisticated worker pool that combines dynamic scaling with load balancing across multiple task queues:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

// Task represents a unit of work
type Task struct {
	ID       int
	Priority int
	Execute  func()
}

// AdaptivePool represents a worker pool that can adapt to changing workloads
type AdaptivePool struct {
	queues        []chan Task
	queueCount    int
	minWorkers    int
	maxWorkers    int
	workerCount   atomic.Int32
	activeWorkers atomic.Int32
	idleTimeout   time.Duration
	wg            sync.WaitGroup
	quit          chan struct{}
	metrics       struct {
		tasksProcessed atomic.Int64
		tasksRejected  atomic.Int64
	}
}

// NewAdaptivePool creates a new adaptive worker pool
func NewAdaptivePool(queueCount, queueSize, minWorkers, maxWorkers int, idleTimeout time.Duration) *AdaptivePool {
	if minWorkers <= 0 {
		minWorkers = 1
	}
	if maxWorkers < minWorkers {
		maxWorkers = minWorkers
	}
	if queueCount <= 0 {
		queueCount = 1
	}
	
	pool := &AdaptivePool{
		queues:      make([]chan Task, queueCount),
		queueCount:  queueCount,
		minWorkers:  minWorkers,
		maxWorkers:  maxWorkers,
		idleTimeout: idleTimeout,
		quit:        make(chan struct{}),
	}
	
	// Initialize task queues
	for i := 0; i < queueCount; i++ {
		pool.queues[i] = make(chan Task, queueSize)
	}
	
	return pool
}

// Start launches the worker pool
func (p *AdaptivePool) Start() {
	// Start minimum number of workers
	for i := 0; i < p.minWorkers; i++ {
		p.startWorker()
	}
	
	// Start the scaling manager
	go p.scalingManager()
}

// startWorker launches a new worker goroutine
func (p *AdaptivePool) startWorker() {
	p.workerCount.Add(1)
	p.wg.Add(1)
	
	go func() {
		defer p.wg.Done()
		defer p.workerCount.Add(-1)
		
		// Create a timer for idle timeout
		idleTimer := time.NewTimer(p.idleTimeout)
		defer idleTimer.Stop()
		
		// Create cases for select
		cases := make([]chan Task, len(p.queues))
		for i := range p.queues {
			cases[i] = p.queues[i]
		}
		
		for {
			// Reset idle timer
			if !idleTimer.Stop() {
				select {
				case <-idleTimer.C:
				default:
				}
			}
			idleTimer.Reset(p.idleTimeout)
			
			// Try to get a task from any queue
			var task Task
			var ok bool
			
			select {
			case task, ok = <-cases[rand.Intn(len(cases))]:
				if !ok {
					// Queue closed
					return
				}
				
			case <-idleTimer.C:
				// Worker has been idle
				if p.workerCount.Load() > int32(p.minWorkers) {
					// We can terminate this worker
					return
				}
				continue
				
			case <-p.quit:
				// Pool is shutting down
				return
			}
			
			// Process the task
			p.activeWorkers.Add(1)
			task.Execute()
			p.activeWorkers.Add(-1)
			p.metrics.tasksProcessed.Add(1)
		}
	}()
}

// scalingManager periodically checks if we need to scale up or down
func (p *AdaptivePool) scalingManager() {
	ticker := time.NewTicker(500 * time.Millisecond)
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			p.evaluateScaling()
		case <-p.quit:
			return
		}
	}
}

// evaluateScaling decides whether to scale up or down
func (p *AdaptivePool) evaluateScaling() {
	// Calculate queue depths
	totalQueueDepth := 0
	for _, queue := range p.queues {
		totalQueueDepth += len(queue)
	}
	
	workerCount := p.workerCount.Load()
	activeWorkers := p.activeWorkers.Load()
	
	// Scale up if:
	// 1. Queue depth is growing
	// 2. Most workers are active
	// 3. We're below max workers
	if totalQueueDepth > 0 && activeWorkers >= workerCount*80/100 && workerCount < int32(p.maxWorkers) {
		// Start new workers based on queue depth
		workersToAdd := min(int32(totalQueueDepth), int32(p.maxWorkers)-workerCount)
		for i := int32(0); i < workersToAdd; i++ {
			p.startWorker()
		}
		fmt.Printf("Scaled up to %d workers (added %d)\n", p.workerCount.Load(), workersToAdd)
	}
	
	// Note: Workers self-terminate after idle timeout
}

// Submit adds a task to the pool
// Returns true if the task was accepted, false if rejected
func (p *AdaptivePool) Submit(task Task) bool {
	// Choose a queue based on task priority
	queueIndex := task.Priority % p.queueCount
	
	// Try to submit to the chosen queue
	select {
	case p.queues[queueIndex] <- task:
		return true
	default:
		// Queue is full, try other queues
		for i := 0; i < p.queueCount; i++ {
			if i == queueIndex {
				continue
			}
			
			select {
			case p.queues[i] <- task:
				return true
			default:
				// This queue is also full
			}
		}
		
		// All queues are full
		p.metrics.tasksRejected.Add(1)
		return false
	}
}

// Stop gracefully shuts down the pool
func (p *AdaptivePool) Stop() {
	close(p.quit)
	
	// Close all queues
	for _, queue := range p.queues {
		close(queue)
	}
	
	p.wg.Wait()
}

// Stats returns current pool statistics
func (p *AdaptivePool) Stats() (workers, active int32, processed, rejected int64) {
	return p.workerCount.Load(),
		p.activeWorkers.Load(),
		p.metrics.tasksProcessed.Load(),
		p.metrics.tasksRejected.Load()
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create an adaptive pool with:
	// - 3 queues with capacity 10 each
	// - 2-10 workers
	// - 5s idle timeout
	pool := NewAdaptivePool(3, 10, 2, 10, 5*time.Second)
	pool.Start()
	
	// Submit tasks with varying priorities
	for i := 0; i < 50; i++ {
		taskID := i
		priority := rand.Intn(5) // Random priority 0-4
		
		success := pool.Submit(Task{
			ID:       taskID,
			Priority: priority,
			Execute: func() {
				fmt.Printf("Processing task %d with priority %d\n", taskID, priority)
				// Simulate varying work durations
				time.Sleep(time.Duration(50+rand.Intn(200)) * time.Millisecond)
			},
		})
		
		if success {
			fmt.Printf("Task %d accepted\n", taskID)
		} else {
			fmt.Printf("Task %d rejected\n", taskID)
		}
		
		// Submit tasks with varying rates
		time.Sleep(time.Duration(10+rand.Intn(50)) * time.Millisecond)
	}
	
	// Monitor pool stats
	for i := 0; i < 5; i++ {
		workers, active, processed, rejected := pool.Stats()
		fmt.Printf("Stats: workers=%d, active=%d, processed=%d, rejected=%d\n",
			workers, active, processed, rejected)
		time.Sleep(1 * time.Second)
	}
	
	// Stop the pool
	pool.Stop()
	fmt.Println("Pool shut down")
}

This adaptive worker pool combines several advanced features:

  1. Multiple task queues: Tasks are distributed across multiple queues based on priority
  2. Load balancing: Workers pull tasks from random queues to balance load
  3. Dynamic scaling: The pool adjusts the number of workers based on workload
  4. Comprehensive metrics: The pool tracks tasks processed, rejected, and worker utilization