Worker Pool Fundamentals

At its core, a worker pool consists of three main components: a task queue, a pool of worker goroutines, and a coordination mechanism. Workers pull tasks from the queue, process them concurrently, and coordinate their activities to ensure efficient resource utilization.

Basic Worker Pool Implementation

Let’s start with a simple worker pool implementation that processes generic tasks:

package main

import (
	"fmt"
	"sync"
	"time"
)

// Task represents a unit of work to be performed
type Task func() error

// WorkerPool manages a pool of workers that process tasks
type WorkerPool struct {
	tasks   chan Task
	wg      sync.WaitGroup
	workers int
}

// NewWorkerPool creates a new worker pool with the specified number of workers
// and buffer capacity for tasks
func NewWorkerPool(workers, capacity int) *WorkerPool {
	return &WorkerPool{
		tasks:   make(chan Task, capacity),
		workers: workers,
	}
}

// Start launches the worker pool
func (p *WorkerPool) Start() {
	// Launch workers
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go func(workerID int) {
			defer p.wg.Done()
			
			// Worker loop: continuously process tasks until channel is closed
			for task := range p.tasks {
				// Execute the task
				err := task()
				if err != nil {
					fmt.Printf("Worker %d encountered error: %v\n", workerID, err)
				}
			}
			fmt.Printf("Worker %d shutting down\n", workerID)
		}(i + 1) // Pass worker ID for logging
	}
}

// Submit adds a task to the pool
func (p *WorkerPool) Submit(task Task) {
	p.tasks <- task
}

// Stop gracefully shuts down the worker pool
func (p *WorkerPool) Stop() {
	close(p.tasks)
	p.wg.Wait()
	fmt.Println("All workers have completed their tasks")
}

func main() {
	// Create a worker pool with 3 workers and a task queue capacity of 10
	pool := NewWorkerPool(3, 10)
	
	// Start the worker pool
	pool.Start()
	
	// Submit tasks to the pool
	for i := 1; i <= 10; i++ {
		taskID := i // Capture loop variable
		pool.Submit(func() error {
			fmt.Printf("Processing task %d\n", taskID)
			// Simulate work
			time.Sleep(100 * time.Millisecond)
			return nil
		})
	}
	
	// Allow some time for tasks to be processed
	time.Sleep(500 * time.Millisecond)
	
	// Gracefully shut down the pool
	pool.Stop()
}

This basic implementation demonstrates the core components of a worker pool:

  1. Task Queue: A buffered channel (tasks) that holds tasks waiting to be processed
  2. Worker Goroutines: A fixed number of goroutines that continuously pull tasks from the queue
  3. Coordination: Use of sync.WaitGroup to track when all workers have completed

While this implementation is straightforward, it has several limitations:

  • No way to retrieve results from tasks
  • Limited error handling
  • No backpressure mechanism when the task queue is full
  • No dynamic scaling based on load

Buffered vs. Unbuffered Channels

The choice between buffered and unbuffered channels for the task queue has significant implications for worker pool behavior:

package main

import (
	"fmt"
	"sync"
	"time"
)

func demonstrateChannelTypes() {
	// Unbuffered channel - synchronous communication
	unbufferedChan := make(chan int)
	
	// Buffered channel - asynchronous up to capacity
	bufferedChan := make(chan int, 5)
	
	var wg sync.WaitGroup
	wg.Add(2)
	
	// Producer for unbuffered channel
	go func() {
		defer wg.Done()
		for i := 1; i <= 3; i++ {
			fmt.Printf("Sending to unbuffered channel: %d\n", i)
			// This will block until a receiver is ready
			unbufferedChan <- i
			fmt.Printf("Sent to unbuffered channel: %d\n", i)
		}
		close(unbufferedChan)
	}()
	
	// Producer for buffered channel
	go func() {
		defer wg.Done()
		for i := 1; i <= 8; i++ {
			if i <= 5 {
				fmt.Printf("Sending to buffered channel: %d\n", i)
				// This won't block until buffer is full (after 5 items)
				bufferedChan <- i
				fmt.Printf("Sent to buffered channel: %d\n", i)
			} else {
				fmt.Printf("Sending to buffered channel: %d (will block)\n", i)
				// This will block because buffer is full
				bufferedChan <- i
				fmt.Printf("Sent to buffered channel: %d (unblocked)\n", i)
			}
		}
		close(bufferedChan)
	}()
	
	// Consumer for unbuffered channel
	go func() {
		for val := range unbufferedChan {
			fmt.Printf("Received from unbuffered channel: %d\n", val)
			time.Sleep(100 * time.Millisecond) // Simulate processing time
		}
	}()
	
	// Consumer for buffered channel
	go func() {
		// Wait a bit to let buffer fill up
		time.Sleep(200 * time.Millisecond)
		
		for val := range bufferedChan {
			fmt.Printf("Received from buffered channel: %d\n", val)
			time.Sleep(100 * time.Millisecond) // Simulate processing time
		}
	}()
	
	wg.Wait()
}

func main() {
	demonstrateChannelTypes()
}

For worker pools, the implications are:

  1. Unbuffered Task Queue:

    • Provides natural backpressure (submitting a task blocks until a worker is ready)
    • Ensures tasks are immediately picked up when submitted
    • May reduce throughput if task submission is bursty
  2. Buffered Task Queue:

    • Allows for decoupling of task submission and processing
    • Can handle bursts of tasks up to buffer capacity
    • Provides a queue for tasks when all workers are busy
    • May delay processing of tasks if buffer is large and fills up

The choice depends on your specific requirements. Use unbuffered channels when immediate processing is critical, and buffered channels when you need to handle bursts of tasks or want to decouple submission from processing.

Worker Pool with Results

Most real-world scenarios require capturing the results of task execution. Let’s enhance our worker pool to return results:

package main

import (
	"fmt"
	"sync"
	"time"
)

// Task represents a unit of work that returns a result and possibly an error
type Task[T any] func() (T, error)

// Result contains the output of a task execution
type Result[T any] struct {
	Value T
	Err   error
}

// WorkerPool manages a pool of workers that process tasks and return results
type WorkerPool[T any] struct {
	tasks   chan Task[T]
	results chan Result[T]
	wg      sync.WaitGroup
	workers int
}

// NewWorkerPool creates a new worker pool with the specified number of workers
// and buffer capacities for tasks and results
func NewWorkerPool[T any](workers, taskCapacity, resultCapacity int) *WorkerPool[T] {
	return &WorkerPool[T]{
		tasks:   make(chan Task[T], taskCapacity),
		results: make(chan Result[T], resultCapacity),
		workers: workers,
	}
}

// Start launches the worker pool
func (p *WorkerPool[T]) Start() {
	// Launch workers
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go func(workerID int) {
			defer p.wg.Done()
			
			// Worker loop: continuously process tasks until channel is closed
			for task := range p.tasks {
				// Execute the task and capture result
				value, err := task()
				p.results <- Result[T]{Value: value, Err: err}
			}
		}(i + 1)
	}
	
	// Close results channel when all workers are done
	go func() {
		p.wg.Wait()
		close(p.results)
	}()
}

// Submit adds a task to the pool
func (p *WorkerPool[T]) Submit(task Task[T]) {
	p.tasks <- task
}

// Results returns the channel of results
func (p *WorkerPool[T]) Results() <-chan Result[T] {
	return p.results
}

// Stop gracefully shuts down the worker pool
func (p *WorkerPool[T]) Stop() {
	close(p.tasks)
}

func main() {
	// Create a worker pool with 3 workers, task capacity 10, result capacity 10
	pool := NewWorkerPool[int](3, 10, 10)
	
	// Start the worker pool
	pool.Start()
	
	// Submit tasks to the pool
	for i := 1; i <= 5; i++ {
		taskID := i // Capture loop variable
		pool.Submit(func() (int, error) {
			// Simulate work
			time.Sleep(100 * time.Millisecond)
			
			// Return the square of the task ID
			return taskID * taskID, nil
		})
	}
	
	// Stop accepting new tasks
	pool.Stop()
	
	// Process results as they become available
	for result := range pool.Results() {
		if result.Err != nil {
			fmt.Printf("Task error: %v\n", result.Err)
		} else {
			fmt.Printf("Task result: %d\n", result.Value)
		}
	}
}

This implementation leverages Go’s generics to create a type-safe worker pool that can process tasks returning any type of result. The key enhancements are:

  1. Generic Types: The pool can work with any result type
  2. Result Channel: A dedicated channel for collecting task results
  3. Structured Results: Each result includes both the value and any error that occurred