Worker Pool and Pipeline Patterns
Worker pools and pipelines are powerful patterns for processing data efficiently in distributed systems.
Advanced Worker Pool with Adaptive Scaling
This implementation adjusts the number of workers based on load:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// Job represents a unit of work
type Job struct {
ID int
Payload interface{}
Duration time.Duration // Simulated processing time
}
// Result represents the outcome of job processing
type Result struct {
JobID int
Output interface{}
Err error
Latency time.Duration
}
// AdaptiveWorkerPool implements a worker pool that scales based on load
type AdaptiveWorkerPool struct {
jobQueue chan Job
resultQueue chan Result
workerCount int32
activeWorkers int32
maxWorkers int32
minWorkers int32
pendingJobs int32
mu sync.Mutex
stopCh chan struct{}
workerWg sync.WaitGroup
metrics *PoolMetrics
scaleInterval time.Duration
scaleThreshold float64 // Threshold for scaling (0-1)
}
// PoolMetrics tracks performance metrics for the worker pool
type PoolMetrics struct {
totalJobs int64
completedJobs int64
failedJobs int64
totalLatency int64 // in nanoseconds
queueHighWater int32
}
// NewAdaptiveWorkerPool creates a new adaptive worker pool
func NewAdaptiveWorkerPool(ctx context.Context, minWorkers, maxWorkers, queueSize int) *AdaptiveWorkerPool {
pool := &AdaptiveWorkerPool{
jobQueue: make(chan Job, queueSize),
resultQueue: make(chan Result, queueSize),
minWorkers: int32(minWorkers),
maxWorkers: int32(maxWorkers),
workerCount: 0,
activeWorkers: 0,
pendingJobs: 0,
stopCh: make(chan struct{}),
metrics: &PoolMetrics{},
scaleInterval: 500 * time.Millisecond,
scaleThreshold: 0.7, // Scale up when 70% of workers are busy
}
// Start the minimum number of workers
for i := 0; i < minWorkers; i++ {
pool.startWorker(ctx)
}
// Start the autoscaler
go pool.autoscaler(ctx)
return pool
}
// startWorker launches a new worker goroutine
func (p *AdaptiveWorkerPool) startWorker(ctx context.Context) {
p.workerWg.Add(1)
atomic.AddInt32(&p.workerCount, 1)
go func() {
defer p.workerWg.Done()
defer atomic.AddInt32(&p.workerCount, -1)
for {
select {
case job, ok := <-p.jobQueue:
if !ok {
return // Channel closed
}
// Mark worker as active
atomic.AddInt32(&p.activeWorkers, 1)
atomic.AddInt32(&p.pendingJobs, -1)
// Process the job
startTime := time.Now()
var result Result
// Simulate processing with potential failures
time.Sleep(job.Duration)
if rand.Intn(10) < 1 { // 10% failure rate
result = Result{
JobID: job.ID,
Output: nil,
Err: fmt.Errorf("processing error on job %d", job.ID),
Latency: time.Since(startTime),
}
atomic.AddInt64(&p.metrics.failedJobs, 1)
} else {
result = Result{
JobID: job.ID,
Output: fmt.Sprintf("Processed result for job %d", job.ID),
Err: nil,
Latency: time.Since(startTime),
}
}
// Update metrics
atomic.AddInt64(&p.metrics.completedJobs, 1)
atomic.AddInt64(&p.metrics.totalLatency, int64(result.Latency))
// Send the result
select {
case p.resultQueue <- result:
case <-ctx.Done():
return
}
// Mark worker as inactive
atomic.AddInt32(&p.activeWorkers, -1)
case <-p.stopCh:
return
case <-ctx.Done():
return
}
}
}()
}
// autoscaler adjusts the number of workers based on load
func (p *AdaptiveWorkerPool) autoscaler(ctx context.Context) {
ticker := time.NewTicker(p.scaleInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.adjustWorkerCount(ctx)
case <-p.stopCh:
return
case <-ctx.Done():
return
}
}
}
// adjustWorkerCount scales the worker pool up or down based on current load
func (p *AdaptiveWorkerPool) adjustWorkerCount(ctx context.Context) {
currentWorkers := atomic.LoadInt32(&p.workerCount)
activeWorkers := atomic.LoadInt32(&p.activeWorkers)
pendingJobs := atomic.LoadInt32(&p.pendingJobs)
// Calculate utilization
var utilization float64
if currentWorkers > 0 {
utilization = float64(activeWorkers) / float64(currentWorkers)
}
// Scale up if utilization is high and there are pending jobs
if utilization >= p.scaleThreshold && pendingJobs > 0 && currentWorkers < p.maxWorkers {
// Calculate how many workers to add (up to 25% more, at least 1)
toAdd := max(1, int(float64(currentWorkers)*0.25))
// Don't exceed max workers
if currentWorkers+int32(toAdd) > p.maxWorkers {
toAdd = int(p.maxWorkers - currentWorkers)
}
fmt.Printf("Scaling up: Adding %d workers (utilization: %.2f, pending: %d)\n",
toAdd, utilization, pendingJobs)
for i := 0; i < toAdd; i++ {
p.startWorker(ctx)
}
}
// Scale down if utilization is low and we have more than minimum workers
if utilization < p.scaleThreshold*0.5 && currentWorkers > p.minWorkers && pendingJobs == 0 {
// Calculate how many workers to remove (up to 15% fewer, at least 1)
toRemove := max(1, int(float64(currentWorkers)*0.15))
// Don't go below min workers
if currentWorkers-int32(toRemove) < p.minWorkers {
toRemove = int(currentWorkers - p.minWorkers)
}
fmt.Printf("Scaling down: Removing %d workers (utilization: %.2f)\n",
toRemove, utilization)
// Signal workers to stop
for i := 0; i < toRemove; i++ {
select {
case p.stopCh <- struct{}{}:
default:
// If channel is full, we've already signaled enough workers
break
}
}
}
}
// Submit adds a job to the pool
func (p *AdaptiveWorkerPool) Submit(ctx context.Context, job Job) error {
select {
case p.jobQueue <- job:
atomic.AddInt64(&p.metrics.totalJobs, 1)
atomic.AddInt32(&p.pendingJobs, 1)
// Update high water mark for queue
pending := atomic.LoadInt32(&p.pendingJobs)
for {
highWater := atomic.LoadInt32(&p.metrics.queueHighWater)
if pending <= highWater || atomic.CompareAndSwapInt32(&p.metrics.queueHighWater, highWater, pending) {
break
}
}
return nil
case <-ctx.Done():
return ctx.Err()
default:
return fmt.Errorf("job queue is full")
}
}
// Results returns the channel for receiving results
func (p *AdaptiveWorkerPool) Results() <-chan Result {
return p.resultQueue
}
// Shutdown gracefully shuts down the worker pool
func (p *AdaptiveWorkerPool) Shutdown() {
close(p.jobQueue)
p.workerWg.Wait()
close(p.resultQueue)
close(p.stopCh)
}
// GetMetrics returns the current pool metrics
func (p *AdaptiveWorkerPool) GetMetrics() PoolMetrics {
completed := atomic.LoadInt64(&p.metrics.completedJobs)
metrics := PoolMetrics{
totalJobs: atomic.LoadInt64(&p.metrics.totalJobs),
completedJobs: completed,
failedJobs: atomic.LoadInt64(&p.metrics.failedJobs),
queueHighWater: atomic.LoadInt32(&p.metrics.queueHighWater),
}
// Calculate average latency
if completed > 0 {
metrics.totalLatency = atomic.LoadInt64(&p.metrics.totalLatency) / completed
}
return metrics
}
// Helper function for max of two integers
func max(a, b int) int {
if a > b {
return a
}
return b
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Create an adaptive worker pool
pool := NewAdaptiveWorkerPool(ctx, 5, 20, 100)
defer pool.Shutdown()
// Start a goroutine to collect results
go func() {
for result := range pool.Results() {
if result.Err != nil {
fmt.Printf("Job %d failed: %v (took %v)\n",
result.JobID, result.Err, result.Latency)
} else {
fmt.Printf("Job %d completed: %v (took %v)\n",
result.JobID, result.Output, result.Latency)
}
}
}()
// Submit jobs in waves to demonstrate scaling
for wave := 0; wave < 3; wave++ {
fmt.Printf("\n--- Starting job wave %d ---\n", wave+1)
// Submit a batch of jobs
jobCount := 50 + wave*25
for i := 0; i < jobCount; i++ {
// Create jobs with variable processing times
duration := time.Duration(50+rand.Intn(200)) * time.Millisecond
job := Job{
ID: wave*1000 + i,
Payload: fmt.Sprintf("Job data %d", i),
Duration: duration,
}
if err := pool.Submit(ctx, job); err != nil {
fmt.Printf("Failed to submit job: %v\n", err)
}
}
// Wait between waves
time.Sleep(2 * time.Second)
// Print current metrics
metrics := pool.GetMetrics()
fmt.Printf("\nPool metrics after wave %d:\n", wave+1)
fmt.Printf("- Total jobs: %d\n", metrics.totalJobs)
fmt.Printf("- Completed jobs: %d\n", metrics.completedJobs)
fmt.Printf("- Failed jobs: %d\n", metrics.failedJobs)
fmt.Printf("- Average latency: %v\n", time.Duration(metrics.totalLatency))
fmt.Printf("- Queue high water: %d\n", metrics.queueHighWater)
fmt.Printf("- Current workers: %d\n", atomic.LoadInt32(&pool.workerCount))
fmt.Printf("- Active workers: %d\n", atomic.LoadInt32(&pool.activeWorkers))
}
// Wait for all jobs to complete
time.Sleep(1 * time.Second)
// Final metrics
metrics := pool.GetMetrics()
fmt.Printf("\nFinal pool metrics:\n")
fmt.Printf("- Total jobs: %d\n", metrics.totalJobs)
fmt.Printf("- Completed jobs: %d\n", metrics.completedJobs)
fmt.Printf("- Failed jobs: %d\n", metrics.failedJobs)
fmt.Printf("- Average latency: %v\n", time.Duration(metrics.totalLatency))
fmt.Printf("- Queue high water: %d\n", metrics.queueHighWater)
}
This advanced worker pool pattern is ideal for distributed systems because it:
- Automatically scales based on workload
- Efficiently manages resources
- Provides detailed metrics for monitoring
- Handles backpressure through queue management
- Gracefully recovers from failures
Multi-Stage Pipeline Pattern
Pipelines allow you to process data through multiple stages efficiently:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// DataItem represents a piece of data flowing through the pipeline
type DataItem struct {
ID int
Data interface{}
Metadata map[string]interface{}
Error error
}
// PipelineStage represents a processing stage in the pipeline
type PipelineStage func(ctx context.Context, in <-chan DataItem) <-chan DataItem
// Pipeline represents a multi-stage data processing pipeline
type Pipeline struct {
stages []PipelineStage
}
// NewPipeline creates a new data processing pipeline
func NewPipeline(stages ...PipelineStage) *Pipeline {
return &Pipeline{
stages: stages,
}
}
// Execute runs data through the pipeline
func (p *Pipeline) Execute(ctx context.Context, source <-chan DataItem) <-chan DataItem {
// Start with the source channel
current := source
// Apply each stage in sequence
for _, stage := range p.stages {
current = stage(ctx, current)
}
return current
}
// Source creates a source channel for the pipeline
func Source(ctx context.Context, items []DataItem) <-chan DataItem {
out := make(chan DataItem)
go func() {
defer close(out)
for _, item := range items {
select {
case out <- item:
// Item sent successfully
case <-ctx.Done():
return
}
}
}()
return out
}
// Example pipeline stages
// Validate checks if data items are valid
func Validate(ctx context.Context, in <-chan DataItem) <-chan DataItem {
out := make(chan DataItem)
go func() {
defer close(out)
for item := range in {
// Skip already failed items
if item.Error != nil {
select {
case out <- item:
case <-ctx.Done():
return
}
continue
}
// Perform validation
if item.Data == nil {
item.Error = fmt.Errorf("invalid data: nil value")
}
// Forward the item
select {
case out <- item:
case <-ctx.Done():
return
}
}
}()
return out
}
// Transform applies a transformation to data items
func Transform(ctx context.Context, in <-chan DataItem) <-chan DataItem {
out := make(chan DataItem)
go func() {
defer close(out)
for item := range in {
// Skip already failed items
if item.Error != nil {
select {
case out <- item:
case <-ctx.Done():
return
}
continue
}
// Apply transformation
switch v := item.Data.(type) {
case string:
item.Data = fmt.Sprintf("Transformed: %s", v)
case int:
item.Data = v * 2
default:
item.Error = fmt.Errorf("unsupported data type")
}
// Add metadata
if item.Metadata == nil {
item.Metadata = make(map[string]interface{})
}
item.Metadata["transformed_at"] = time.Now()
// Forward the item
select {
case out <- item:
case <-ctx.Done():
return
}
}
}()
return out
}
// Enrich adds additional data to items
func Enrich(ctx context.Context, in <-chan DataItem) <-chan DataItem {
out := make(chan DataItem)
go func() {
defer close(out)
for item := range in {
// Skip already failed items
if item.Error != nil {
select {
case out <- item:
case <-ctx.Done():
return
}
continue
}
// Add enrichment data
if item.Metadata == nil {
item.Metadata = make(map[string]interface{})
}
item.Metadata["enriched"] = true
item.Metadata["enriched_at"] = time.Now()
// Forward the item
select {
case out <- item:
case <-ctx.Done():
return
}
}
}()
return out
}
// ParallelStage processes items in parallel
func ParallelStage(workers int, processor func(DataItem) DataItem) PipelineStage {
return func(ctx context.Context, in <-chan DataItem) <-chan DataItem {
out := make(chan DataItem)
// Start a fixed number of workers
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(workerID int) {
defer wg.Done()
for item := range in {
// Skip already failed items
if item.Error != nil {
select {
case out <- item:
case <-ctx.Done():
return
}
continue
}
// Process the item
processedItem := processor(item)
// Add worker metadata
if processedItem.Metadata == nil {
processedItem.Metadata = make(map[string]interface{})
}
processedItem.Metadata["worker_id"] = workerID
// Forward the processed item
select {
case out <- processedItem:
case <-ctx.Done():
return
}
}
}(i)
}
// Close the output channel when all workers are done
go func() {
wg.Wait()
close(out)
}()
return out
}
}
func main() {
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create sample data
data := []DataItem{
{ID: 1, Data: "item 1"},
{ID: 2, Data: "item 2"},
{ID: 3, Data: "item 3"},
{ID: 4, Data: "item 4"},
{ID: 5, Data: nil}, // This will fail validation
{ID: 6, Data: "item 6"},
{ID: 7, Data: 42}, // Different type
{ID: 8, Data: "item 8"},
}
// Create a pipeline
pipeline := NewPipeline(
Validate,
Transform,
ParallelStage(3, func(item DataItem) DataItem {
// Simulate processing time
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return item
}),
Enrich,
)
// Create a source channel
source := Source(ctx, data)
// Execute the pipeline
results := pipeline.Execute(ctx, source)
// Collect and print results
for result := range results {
if result.Error != nil {
fmt.Printf("Item %d failed: %v\n", result.ID, result.Error)
} else {
fmt.Printf("Item %d processed: %v with metadata: %v\n",
result.ID, result.Data, result.Metadata)
}
}
}
This pipeline pattern is valuable for distributed systems because it:
- Separates processing logic into discrete, reusable stages
- Handles errors gracefully at each stage
- Enables parallel processing where appropriate
- Maintains context and metadata throughout the processing flow
- Provides backpressure through channel buffering