Advanced Worker Pool Architectures
While the basic worker pool pattern is powerful, real-world applications often require more sophisticated architectures to handle complex requirements.
Dynamic Scaling Worker Pool
One limitation of the basic worker pool is its fixed size. In production environments, workloads often fluctuate, making a static pool size inefficient. Let’s implement a worker pool that can dynamically adjust its size based on load:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// DynamicPool represents a worker pool that can scale based on load
type DynamicPool struct {
tasks chan func()
workerCount atomic.Int32
minWorkers int32
maxWorkers int32
idleTimeout time.Duration
mutex sync.Mutex
wg sync.WaitGroup
quit chan struct{}
metricsUpdate chan struct{}
queueDepth atomic.Int32
activeWorkers atomic.Int32
}
// NewDynamicPool creates a new dynamic worker pool
func NewDynamicPool(minWorkers, maxWorkers int32, queueSize int, idleTimeout time.Duration) *DynamicPool {
if minWorkers <= 0 {
minWorkers = 1
}
if maxWorkers < minWorkers {
maxWorkers = minWorkers
}
pool := &DynamicPool{
tasks: make(chan func(), queueSize),
minWorkers: minWorkers,
maxWorkers: maxWorkers,
idleTimeout: idleTimeout,
quit: make(chan struct{}),
metricsUpdate: make(chan struct{}, 1),
}
// Start the minimum number of workers
for i := int32(0); i < minWorkers; i++ {
pool.startWorker()
}
// Start the scaling manager
go pool.scalingManager()
return pool
}
// Submit adds a task to the pool
func (p *DynamicPool) Submit(task func()) {
select {
case p.tasks <- task:
// Task submitted successfully
p.queueDepth.Add(1)
// Signal metrics update if not already pending
select {
case p.metricsUpdate <- struct{}{}:
default:
// Update already pending
}
case <-p.quit:
// Pool is shutting down
}
}
// startWorker launches a new worker goroutine
func (p *DynamicPool) startWorker() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.workerCount.Add(1)
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer p.workerCount.Add(-1)
idleTimer := time.NewTimer(p.idleTimeout)
defer idleTimer.Stop()
for {
select {
case task, ok := <-p.tasks:
if !ok {
// Channel closed, worker should exit
return
}
// Reset idle timer
if !idleTimer.Stop() {
select {
case <-idleTimer.C:
default:
}
}
// Process task
p.activeWorkers.Add(1)
p.queueDepth.Add(-1)
task()
p.activeWorkers.Add(-1)
// Restart idle timer
idleTimer.Reset(p.idleTimeout)
// Signal metrics update
select {
case p.metricsUpdate <- struct{}{}:
default:
// Update already pending
}
case <-idleTimer.C:
// Worker has been idle for too long
currentCount := p.workerCount.Load()
if currentCount > p.minWorkers {
// Only exit if we're above minimum worker count
return
}
// Reset timer if we need to stay alive
idleTimer.Reset(p.idleTimeout)
case <-p.quit:
// Pool is shutting down
return
}
}
}()
}
// scalingManager periodically checks if we need to scale up or down
func (p *DynamicPool) scalingManager() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-p.metricsUpdate:
p.evaluateScaling()
case <-ticker.C:
p.evaluateScaling()
case <-p.quit:
return
}
}
}
// evaluateScaling decides whether to scale up or down based on current metrics
func (p *DynamicPool) evaluateScaling() {
queueDepth := p.queueDepth.Load()
workerCount := p.workerCount.Load()
activeWorkers := p.activeWorkers.Load()
// Scale up if queue is building and we're below max workers
if queueDepth > 0 && activeWorkers >= workerCount*80/100 && workerCount < p.maxWorkers {
// Start new workers based on queue depth, but don't exceed max
workersToAdd := min(queueDepth, p.maxWorkers-workerCount)
for i := int32(0); i < workersToAdd; i++ {
p.startWorker()
}
fmt.Printf("Scaled up to %d workers (added %d)\n", p.workerCount.Load(), workersToAdd)
}
// Note: We don't need to explicitly scale down as workers will exit after idle timeout
}
// min returns the minimum of two int32 values
func min(a, b int32) int32 {
if a < b {
return a
}
return b
}
// Stop gracefully shuts down the pool
func (p *DynamicPool) Stop() {
close(p.quit)
close(p.tasks)
p.wg.Wait()
}
// Stats returns current pool statistics
func (p *DynamicPool) Stats() (workers, active, queueDepth int32) {
return p.workerCount.Load(), p.activeWorkers.Load(), p.queueDepth.Load()
}
func main() {
// Create a dynamic pool with 2-10 workers, queue size 100, and 5s idle timeout
pool := NewDynamicPool(2, 10, 100, 5*time.Second)
// Submit a burst of tasks
for i := 0; i < 20; i++ {
taskID := i
pool.Submit(func() {
fmt.Printf("Processing task %d\n", taskID)
// Simulate varying work durations
time.Sleep(time.Duration(100+taskID*10) * time.Millisecond)
})
}
// Monitor pool stats
go func() {
for i := 0; i < 10; i++ {
workers, active, queue := pool.Stats()
fmt.Printf("Stats: workers=%d, active=%d, queue=%d\n", workers, active, queue)
time.Sleep(500 * time.Millisecond)
}
}()
// Wait a bit for tasks to process
time.Sleep(3 * time.Second)
// Submit another burst of tasks
for i := 20; i < 30; i++ {
taskID := i
pool.Submit(func() {
fmt.Printf("Processing task %d\n", taskID)
time.Sleep(time.Duration(50+taskID*5) * time.Millisecond)
})
}
// Wait for all tasks to complete and workers to scale down
time.Sleep(8 * time.Second)
// Shut down the pool
pool.Stop()
fmt.Println("Pool shut down")
}
This dynamic worker pool implementation includes several advanced features:
- Auto-scaling: Workers are added when the queue depth increases and existing workers are busy
- Self-termination: Idle workers terminate themselves after a timeout if they exceed the minimum count
- Metrics tracking: The pool tracks queue depth, active workers, and total worker count
- Graceful shutdown: The pool can be shut down cleanly, waiting for in-progress tasks to complete
Priority-Based Worker Pool
In many applications, not all tasks are equally important. A priority-based worker pool allows more important tasks to be processed before less important ones:
package main
import (
"container/heap"
"fmt"
"sync"
"time"
)
// PriorityTask represents a task with a priority level
type PriorityTask struct {
execute func()
priority int
index int // Used by the heap implementation
}
// PriorityQueue implements heap.Interface for PriorityTask items
type PriorityQueue []*PriorityTask
// Len returns the length of the queue
func (pq PriorityQueue) Len() int { return len(pq) }
// Less defines the ordering of tasks (higher priority first)
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].priority > pq[j].priority
}
// Swap swaps the elements with indexes i and j
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
// Push adds an element to the priority queue
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
task := x.(*PriorityTask)
task.index = n
*pq = append(*pq, task)
}
// Pop removes and returns the highest priority element
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
task := old[n-1]
old[n-1] = nil // Avoid memory leak
task.index = -1 // For safety
*pq = old[0 : n-1]
return task
}
// PriorityWorkerPool manages a pool of workers that process tasks based on priority
type PriorityWorkerPool struct {
workers int
taskQueue PriorityQueue
queueMutex sync.Mutex
queueCond *sync.Cond
wg sync.WaitGroup
quit chan struct{}
}
// NewPriorityWorkerPool creates a new priority-based worker pool
func NewPriorityWorkerPool(workers int) *PriorityWorkerPool {
pool := &PriorityWorkerPool{
workers: workers,
taskQueue: make(PriorityQueue, 0),
quit: make(chan struct{}),
}
pool.queueCond = sync.NewCond(&pool.queueMutex)
return pool
}
// Start launches the worker pool
func (p *PriorityWorkerPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for {
// Get the next task from the priority queue
p.queueMutex.Lock()
for len(p.taskQueue) == 0 {
// Check if we should quit
select {
case <-p.quit:
p.queueMutex.Unlock()
return
default:
// Wait for a task to be added
p.queueCond.Wait()
// Check again if we should quit after waking up
select {
case <-p.quit:
p.queueMutex.Unlock()
return
default:
// Continue to get task
}
}
}
// Get highest priority task
task := heap.Pop(&p.taskQueue).(*PriorityTask)
p.queueMutex.Unlock()
// Execute the task
fmt.Printf("Worker %d executing task with priority %d\n", workerID, task.priority)
task.execute()
}
}(i + 1)
}
}
// Submit adds a task to the pool with the specified priority
func (p *PriorityWorkerPool) Submit(task func(), priority int) {
p.queueMutex.Lock()
defer p.queueMutex.Unlock()
// Create a new priority task
priorityTask := &PriorityTask{
execute: task,
priority: priority,
}
// Add to priority queue
heap.Push(&p.taskQueue, priorityTask)
// Signal that a task is available
p.queueCond.Signal()
}
// Stop gracefully shuts down the pool
func (p *PriorityWorkerPool) Stop() {
close(p.quit)
p.queueCond.Broadcast() // Wake up all workers
p.wg.Wait()
}
func main() {
// Create a priority worker pool with 3 workers
pool := NewPriorityWorkerPool(3)
pool.Start()
// Submit tasks with different priorities
// Higher number = higher priority
priorities := []int{1, 5, 2, 10, 3, 7, 1}
for i, priority := range priorities {
taskID := i
taskPriority := priority
pool.Submit(func() {
fmt.Printf("Executing task %d with priority %d\n", taskID, taskPriority)
// Simulate work
time.Sleep(200 * time.Millisecond)
}, taskPriority)
}
// Allow time for tasks to be processed
time.Sleep(1 * time.Second)
// Stop the pool
pool.Stop()
fmt.Println("Pool shut down")
}
This priority-based worker pool uses Go’s container/heap
package to maintain a priority queue of tasks. Key features include:
- Priority ordering: Tasks with higher priority values are processed first
- Efficient priority management: The heap data structure ensures efficient insertion and removal of tasks
- Conditional waiting: Workers use a condition variable to wait for new tasks without busy-waiting
Rate-Limited Worker Pool
In many scenarios, you need to limit the rate at which tasks are processed, either to prevent overwhelming downstream systems or to comply with API rate limits:
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
)
// RateLimitedPool represents a worker pool with rate limiting
type RateLimitedPool struct {
workers int
limiter *rate.Limiter
tasks chan func()
wg sync.WaitGroup
ctx context.Context
cancelFunc context.CancelFunc
}
// NewRateLimitedPool creates a new rate-limited worker pool
// rate is specified in tasks per second
func NewRateLimitedPool(workers int, tasksPerSecond float64, burstSize int, queueSize int) *RateLimitedPool {
ctx, cancel := context.WithCancel(context.Background())
return &RateLimitedPool{
workers: workers,
limiter: rate.NewLimiter(rate.Limit(tasksPerSecond), burstSize),
tasks: make(chan func(), queueSize),
ctx: ctx,
cancelFunc: cancel,
}
}
// Start launches the worker pool
func (p *RateLimitedPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for {
select {
case task, ok := <-p.tasks:
if !ok {
// Channel closed, worker should exit
return
}
// Wait for rate limiter permission
err := p.limiter.Wait(p.ctx)
if err != nil {
// Context canceled, worker should exit
return
}
// Execute the task
startTime := time.Now()
task()
duration := time.Since(startTime)
fmt.Printf("Worker %d completed task in %v\n", workerID, duration)
case <-p.ctx.Done():
// Context canceled, worker should exit
return
}
}
}(i + 1)
}
}
// Submit adds a task to the pool
func (p *RateLimitedPool) Submit(task func()) bool {
select {
case p.tasks <- task:
return true
case <-p.ctx.Done():
return false
}
}
// Stop gracefully shuts down the pool
func (p *RateLimitedPool) Stop() {
p.cancelFunc() // Signal all workers to stop
close(p.tasks) // Stop accepting new tasks
p.wg.Wait() // Wait for all workers to finish
}
func main() {
// Create a rate-limited pool with 5 workers, 10 tasks per second,
// burst size of 3, and queue size of 100
pool := NewRateLimitedPool(5, 10, 3, 100)
pool.Start()
// Submit 30 tasks
for i := 0; i < 30; i++ {
taskID := i
if !pool.Submit(func() {
fmt.Printf("Processing task %d\n", taskID)
// Simulate work
time.Sleep(50 * time.Millisecond)
}) {
fmt.Printf("Failed to submit task %d\n", taskID)
}
}
// Wait for tasks to complete (should take ~3 seconds due to rate limiting)
time.Sleep(4 * time.Second)
// Stop the pool
pool.Stop()
fmt.Println("Pool shut down")
}
This rate-limited worker pool uses Go’s golang.org/x/time/rate
package to control the rate at which tasks are processed. Key features include:
- Configurable rate limiting: Control tasks per second and burst size
- Context-based cancellation: Clean shutdown using context cancellation
- Non-blocking submission: Task submission doesn’t block the caller
Worker Pool with Backpressure
Backpressure is a critical mechanism for maintaining system stability under load. It allows a system to signal upstream components when it’s approaching capacity limits:
package main
import (
"fmt"
"sync"
"time"
)
// BackpressurePool represents a worker pool with backpressure mechanisms
type BackpressurePool struct {
workers int
tasks chan func()
wg sync.WaitGroup
quit chan struct{}
maxQueueDepth int
}
// NewBackpressurePool creates a new worker pool with backpressure
ressurePool {
return &BackpressurePool{
workers: workers,
tasks: make(chan func(), maxQueueDepth),
quit: make(chan struct{}),
maxQueueDepth: maxQueueDepth,
}
}
// Start launches the worker pool
func (p *BackpressurePool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for {
select {
case task, ok := <-p.tasks:
if !ok {
return // Channel closed
}
// Execute the task
task()
case <-p.quit:
return
}
}
}(i + 1)
}
}
// Submit adds a task to the pool with backpressure
// Returns true if the task was accepted, false if rejected due to backpressure
func (p *BackpressurePool) Submit(task func()) bool {
select {
case p.tasks <- task:
return true
default:
// Channel buffer is full - apply backpressure by rejecting the task
return false
}
}
// SubmitWithTimeout adds a task to the pool with a timeout
// Returns true if the task was accepted, false if rejected due to backpressure or timeout
func (p *BackpressurePool) SubmitWithTimeout(task func(), timeout time.Duration) bool {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case p.tasks <- task:
return true
case <-timer.C:
// Timeout occurred
return false
}
}
// QueueDepth returns the current number of tasks in the queue
func (p *BackpressurePool) QueueDepth() int {
return len(p.tasks)
}
// Stop gracefully shuts down the pool
func (p *BackpressurePool) Stop() {
close(p.quit)
close(p.tasks)
p.wg.Wait()
}
func main() {
// Create a backpressure pool with 3 workers and max queue depth of 5
pool := NewBackpressurePool(3, 5)
pool.Start()
// Submit tasks with backpressure
for i := 0; i < 20; i++ {
taskID := i
success := pool.Submit(func() {
fmt.Printf("Processing task %d\n", taskID)
// Simulate work
time.Sleep(200 * time.Millisecond)
})
if success {
fmt.Printf("Task %d accepted\n", taskID)
} else {
fmt.Printf("Task %d rejected due to backpressure\n", taskID)
// In a real system, you might:
// 1. Retry after a delay
// 2. Store the task for later processing
// 3. Drop the task if it's no longer relevant
// 4. Alert monitoring systems about rejection rate
// For this example, we'll wait a bit before retrying
time.Sleep(100 * time.Millisecond)
i-- // Retry this task
}
}
// Wait for tasks to complete
time.Sleep(2 * time.Second)
// Stop the pool
pool.Stop()
fmt.Println("Pool shut down")
}
This backpressure pool implementation provides mechanisms to handle overload situations:
- Non-blocking submission: The
Submit
method returns immediately, indicating whether the task was accepted - Timeout-based submission: The
SubmitWithTimeout
method allows waiting for a specified duration for queue space - Queue depth monitoring: The
QueueDepth
method allows monitoring the current queue size - Rejection handling: The caller can implement appropriate strategies when tasks are rejected