Dynamic Scaling and Load Balancing
In production environments, workloads rarely remain constant. Systems must adapt to changing demands, scaling resources up during peak loads and down during quiet periods to optimize resource utilization.
Adaptive Worker Pool with Load Balancing
Let’s implement a more sophisticated worker pool that combines dynamic scaling with load balancing across multiple task queues:
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// Task represents a unit of work
type Task struct {
ID int
Priority int
Execute func()
}
// AdaptivePool represents a worker pool that can adapt to changing workloads
type AdaptivePool struct {
queues []chan Task
queueCount int
minWorkers int
maxWorkers int
workerCount atomic.Int32
activeWorkers atomic.Int32
idleTimeout time.Duration
wg sync.WaitGroup
quit chan struct{}
metrics struct {
tasksProcessed atomic.Int64
tasksRejected atomic.Int64
}
}
// NewAdaptivePool creates a new adaptive worker pool
func NewAdaptivePool(queueCount, queueSize, minWorkers, maxWorkers int, idleTimeout time.Duration) *AdaptivePool {
if minWorkers <= 0 {
minWorkers = 1
}
if maxWorkers < minWorkers {
maxWorkers = minWorkers
}
if queueCount <= 0 {
queueCount = 1
}
pool := &AdaptivePool{
queues: make([]chan Task, queueCount),
queueCount: queueCount,
minWorkers: minWorkers,
maxWorkers: maxWorkers,
idleTimeout: idleTimeout,
quit: make(chan struct{}),
}
// Initialize task queues
for i := 0; i < queueCount; i++ {
pool.queues[i] = make(chan Task, queueSize)
}
return pool
}
// Start launches the worker pool
func (p *AdaptivePool) Start() {
// Start minimum number of workers
for i := 0; i < p.minWorkers; i++ {
p.startWorker()
}
// Start the scaling manager
go p.scalingManager()
}
// startWorker launches a new worker goroutine
func (p *AdaptivePool) startWorker() {
p.workerCount.Add(1)
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer p.workerCount.Add(-1)
// Create a timer for idle timeout
idleTimer := time.NewTimer(p.idleTimeout)
defer idleTimer.Stop()
// Create cases for select
cases := make([]chan Task, len(p.queues))
for i := range p.queues {
cases[i] = p.queues[i]
}
for {
// Reset idle timer
if !idleTimer.Stop() {
select {
case <-idleTimer.C:
default:
}
}
idleTimer.Reset(p.idleTimeout)
// Try to get a task from any queue
var task Task
var ok bool
select {
case task, ok = <-cases[rand.Intn(len(cases))]:
if !ok {
// Queue closed
return
}
case <-idleTimer.C:
// Worker has been idle
if p.workerCount.Load() > int32(p.minWorkers) {
// We can terminate this worker
return
}
continue
case <-p.quit:
// Pool is shutting down
return
}
// Process the task
p.activeWorkers.Add(1)
task.Execute()
p.activeWorkers.Add(-1)
p.metrics.tasksProcessed.Add(1)
}
}()
}
// scalingManager periodically checks if we need to scale up or down
func (p *AdaptivePool) scalingManager() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.evaluateScaling()
case <-p.quit:
return
}
}
}
// evaluateScaling decides whether to scale up or down
func (p *AdaptivePool) evaluateScaling() {
// Calculate queue depths
totalQueueDepth := 0
for _, queue := range p.queues {
totalQueueDepth += len(queue)
}
workerCount := p.workerCount.Load()
activeWorkers := p.activeWorkers.Load()
// Scale up if:
// 1. Queue depth is growing
// 2. Most workers are active
// 3. We're below max workers
if totalQueueDepth > 0 && activeWorkers >= workerCount*80/100 && workerCount < int32(p.maxWorkers) {
// Start new workers based on queue depth
workersToAdd := min(int32(totalQueueDepth), int32(p.maxWorkers)-workerCount)
for i := int32(0); i < workersToAdd; i++ {
p.startWorker()
}
fmt.Printf("Scaled up to %d workers (added %d)\n", p.workerCount.Load(), workersToAdd)
}
// Note: Workers self-terminate after idle timeout
}
// Submit adds a task to the pool
// Returns true if the task was accepted, false if rejected
func (p *AdaptivePool) Submit(task Task) bool {
// Choose a queue based on task priority
queueIndex := task.Priority % p.queueCount
// Try to submit to the chosen queue
select {
case p.queues[queueIndex] <- task:
return true
default:
// Queue is full, try other queues
for i := 0; i < p.queueCount; i++ {
if i == queueIndex {
continue
}
select {
case p.queues[i] <- task:
return true
default:
// This queue is also full
}
}
// All queues are full
p.metrics.tasksRejected.Add(1)
return false
}
}
// Stop gracefully shuts down the pool
func (p *AdaptivePool) Stop() {
close(p.quit)
// Close all queues
for _, queue := range p.queues {
close(queue)
}
p.wg.Wait()
}
// Stats returns current pool statistics
func (p *AdaptivePool) Stats() (workers, active int32, processed, rejected int64) {
return p.workerCount.Load(),
p.activeWorkers.Load(),
p.metrics.tasksProcessed.Load(),
p.metrics.tasksRejected.Load()
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create an adaptive pool with:
// - 3 queues with capacity 10 each
// - 2-10 workers
// - 5s idle timeout
pool := NewAdaptivePool(3, 10, 2, 10, 5*time.Second)
pool.Start()
// Submit tasks with varying priorities
for i := 0; i < 50; i++ {
taskID := i
priority := rand.Intn(5) // Random priority 0-4
success := pool.Submit(Task{
ID: taskID,
Priority: priority,
Execute: func() {
fmt.Printf("Processing task %d with priority %d\n", taskID, priority)
// Simulate varying work durations
time.Sleep(time.Duration(50+rand.Intn(200)) * time.Millisecond)
},
})
if success {
fmt.Printf("Task %d accepted\n", taskID)
} else {
fmt.Printf("Task %d rejected\n", taskID)
}
// Submit tasks with varying rates
time.Sleep(time.Duration(10+rand.Intn(50)) * time.Millisecond)
}
// Monitor pool stats
for i := 0; i < 5; i++ {
workers, active, processed, rejected := pool.Stats()
fmt.Printf("Stats: workers=%d, active=%d, processed=%d, rejected=%d\n",
workers, active, processed, rejected)
time.Sleep(1 * time.Second)
}
// Stop the pool
pool.Stop()
fmt.Println("Pool shut down")
}
This adaptive worker pool combines several advanced features:
- Multiple task queues: Tasks are distributed across multiple queues based on priority
- Load balancing: Workers pull tasks from random queues to balance load
- Dynamic scaling: The pool adjusts the number of workers based on workload
- Comprehensive metrics: The pool tracks tasks processed, rejected, and worker utilization