Worker Pool Fundamentals
At its core, a worker pool consists of three main components: a task queue, a pool of worker goroutines, and a coordination mechanism. Workers pull tasks from the queue, process them concurrently, and coordinate their activities to ensure efficient resource utilization.
Basic Worker Pool Implementation
Let’s start with a simple worker pool implementation that processes generic tasks:
package main
import (
"fmt"
"sync"
"time"
)
// Task represents a unit of work to be performed
type Task func() error
// WorkerPool manages a pool of workers that process tasks
type WorkerPool struct {
tasks chan Task
wg sync.WaitGroup
workers int
}
// NewWorkerPool creates a new worker pool with the specified number of workers
// and buffer capacity for tasks
func NewWorkerPool(workers, capacity int) *WorkerPool {
return &WorkerPool{
tasks: make(chan Task, capacity),
workers: workers,
}
}
// Start launches the worker pool
func (p *WorkerPool) Start() {
// Launch workers
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
// Worker loop: continuously process tasks until channel is closed
for task := range p.tasks {
// Execute the task
err := task()
if err != nil {
fmt.Printf("Worker %d encountered error: %v\n", workerID, err)
}
}
fmt.Printf("Worker %d shutting down\n", workerID)
}(i + 1) // Pass worker ID for logging
}
}
// Submit adds a task to the pool
func (p *WorkerPool) Submit(task Task) {
p.tasks <- task
}
// Stop gracefully shuts down the worker pool
func (p *WorkerPool) Stop() {
close(p.tasks)
p.wg.Wait()
fmt.Println("All workers have completed their tasks")
}
func main() {
// Create a worker pool with 3 workers and a task queue capacity of 10
pool := NewWorkerPool(3, 10)
// Start the worker pool
pool.Start()
// Submit tasks to the pool
for i := 1; i <= 10; i++ {
taskID := i // Capture loop variable
pool.Submit(func() error {
fmt.Printf("Processing task %d\n", taskID)
// Simulate work
time.Sleep(100 * time.Millisecond)
return nil
})
}
// Allow some time for tasks to be processed
time.Sleep(500 * time.Millisecond)
// Gracefully shut down the pool
pool.Stop()
}
This basic implementation demonstrates the core components of a worker pool:
- Task Queue: A buffered channel (
tasks
) that holds tasks waiting to be processed - Worker Goroutines: A fixed number of goroutines that continuously pull tasks from the queue
- Coordination: Use of
sync.WaitGroup
to track when all workers have completed
While this implementation is straightforward, it has several limitations:
- No way to retrieve results from tasks
- Limited error handling
- No backpressure mechanism when the task queue is full
- No dynamic scaling based on load
Buffered vs. Unbuffered Channels
The choice between buffered and unbuffered channels for the task queue has significant implications for worker pool behavior:
package main
import (
"fmt"
"sync"
"time"
)
func demonstrateChannelTypes() {
// Unbuffered channel - synchronous communication
unbufferedChan := make(chan int)
// Buffered channel - asynchronous up to capacity
bufferedChan := make(chan int, 5)
var wg sync.WaitGroup
wg.Add(2)
// Producer for unbuffered channel
go func() {
defer wg.Done()
for i := 1; i <= 3; i++ {
fmt.Printf("Sending to unbuffered channel: %d\n", i)
// This will block until a receiver is ready
unbufferedChan <- i
fmt.Printf("Sent to unbuffered channel: %d\n", i)
}
close(unbufferedChan)
}()
// Producer for buffered channel
go func() {
defer wg.Done()
for i := 1; i <= 8; i++ {
if i <= 5 {
fmt.Printf("Sending to buffered channel: %d\n", i)
// This won't block until buffer is full (after 5 items)
bufferedChan <- i
fmt.Printf("Sent to buffered channel: %d\n", i)
} else {
fmt.Printf("Sending to buffered channel: %d (will block)\n", i)
// This will block because buffer is full
bufferedChan <- i
fmt.Printf("Sent to buffered channel: %d (unblocked)\n", i)
}
}
close(bufferedChan)
}()
// Consumer for unbuffered channel
go func() {
for val := range unbufferedChan {
fmt.Printf("Received from unbuffered channel: %d\n", val)
time.Sleep(100 * time.Millisecond) // Simulate processing time
}
}()
// Consumer for buffered channel
go func() {
// Wait a bit to let buffer fill up
time.Sleep(200 * time.Millisecond)
for val := range bufferedChan {
fmt.Printf("Received from buffered channel: %d\n", val)
time.Sleep(100 * time.Millisecond) // Simulate processing time
}
}()
wg.Wait()
}
func main() {
demonstrateChannelTypes()
}
For worker pools, the implications are:
-
Unbuffered Task Queue:
- Provides natural backpressure (submitting a task blocks until a worker is ready)
- Ensures tasks are immediately picked up when submitted
- May reduce throughput if task submission is bursty
-
Buffered Task Queue:
- Allows for decoupling of task submission and processing
- Can handle bursts of tasks up to buffer capacity
- Provides a queue for tasks when all workers are busy
- May delay processing of tasks if buffer is large and fills up
The choice depends on your specific requirements. Use unbuffered channels when immediate processing is critical, and buffered channels when you need to handle bursts of tasks or want to decouple submission from processing.
Worker Pool with Results
Most real-world scenarios require capturing the results of task execution. Let’s enhance our worker pool to return results:
package main
import (
"fmt"
"sync"
"time"
)
// Task represents a unit of work that returns a result and possibly an error
type Task[T any] func() (T, error)
// Result contains the output of a task execution
type Result[T any] struct {
Value T
Err error
}
// WorkerPool manages a pool of workers that process tasks and return results
type WorkerPool[T any] struct {
tasks chan Task[T]
results chan Result[T]
wg sync.WaitGroup
workers int
}
// NewWorkerPool creates a new worker pool with the specified number of workers
// and buffer capacities for tasks and results
func NewWorkerPool[T any](workers, taskCapacity, resultCapacity int) *WorkerPool[T] {
return &WorkerPool[T]{
tasks: make(chan Task[T], taskCapacity),
results: make(chan Result[T], resultCapacity),
workers: workers,
}
}
// Start launches the worker pool
func (p *WorkerPool[T]) Start() {
// Launch workers
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
// Worker loop: continuously process tasks until channel is closed
for task := range p.tasks {
// Execute the task and capture result
value, err := task()
p.results <- Result[T]{Value: value, Err: err}
}
}(i + 1)
}
// Close results channel when all workers are done
go func() {
p.wg.Wait()
close(p.results)
}()
}
// Submit adds a task to the pool
func (p *WorkerPool[T]) Submit(task Task[T]) {
p.tasks <- task
}
// Results returns the channel of results
func (p *WorkerPool[T]) Results() <-chan Result[T] {
return p.results
}
// Stop gracefully shuts down the worker pool
func (p *WorkerPool[T]) Stop() {
close(p.tasks)
}
func main() {
// Create a worker pool with 3 workers, task capacity 10, result capacity 10
pool := NewWorkerPool[int](3, 10, 10)
// Start the worker pool
pool.Start()
// Submit tasks to the pool
for i := 1; i <= 5; i++ {
taskID := i // Capture loop variable
pool.Submit(func() (int, error) {
// Simulate work
time.Sleep(100 * time.Millisecond)
// Return the square of the task ID
return taskID * taskID, nil
})
}
// Stop accepting new tasks
pool.Stop()
// Process results as they become available
for result := range pool.Results() {
if result.Err != nil {
fmt.Printf("Task error: %v\n", result.Err)
} else {
fmt.Printf("Task result: %d\n", result.Value)
}
}
}
This implementation leverages Go’s generics to create a type-safe worker pool that can process tasks returning any type of result. The key enhancements are:
- Generic Types: The pool can work with any result type
- Result Channel: A dedicated channel for collecting task results
- Structured Results: Each result includes both the value and any error that occurred