Performance Monitoring and Optimization

Monitoring worker pool performance is essential for identifying bottlenecks and optimizing resource utilization.

Instrumented Worker Pool

Let’s implement a worker pool with comprehensive instrumentation:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// Metrics collects performance data about the worker pool
type Metrics struct {
	TasksSubmitted   atomic.Int64
	TasksCompleted   atomic.Int64
	TasksRejected    atomic.Int64
	TasksFailed      atomic.Int64
	QueueTimeNanos   atomic.Int64
	ProcessTimeNanos atomic.Int64
	TaskCount        atomic.Int64 // Current tasks in queue or being processed
}

// AvgQueueTime returns the average time tasks spend in the queue
func (m *Metrics) AvgQueueTime() time.Duration {
	completed := m.TasksCompleted.Load()
	if completed == 0 {
		return 0
	}
	avgNanos := m.QueueTimeNanos.Load() / completed
	return time.Duration(avgNanos) * time.Nanosecond
}

// AvgProcessTime returns the average time to process tasks
func (m *Metrics) AvgProcessTime() time.Duration {
	completed := m.TasksCompleted.Load()
	if completed == 0 {
		return 0
	}
	avgNanos := m.ProcessTimeNanos.Load() / completed
	return time.Duration(avgNanos) * time.Nanosecond
}

// InstrumentedTask represents a task with timing information
type InstrumentedTask struct {
	Execute    func() error
	EnqueuedAt time.Time
	StartedAt  time.Time
	FinishedAt time.Time
}

// InstrumentedPool represents a worker pool with performance instrumentation
type InstrumentedPool struct {
	workers    int
	tasks      chan *InstrumentedTask
	wg         sync.WaitGroup
	quit       chan struct{}
	metrics    Metrics
	sampleRate int // Sample rate for detailed metrics (1 = all tasks, 10 = 10% of tasks)
}

// NewInstrumentedPool creates a new instrumented worker pool
func NewInstrumentedPool(workers, queueSize, sampleRate int) *InstrumentedPool {
	return &InstrumentedPool{
		workers:    workers,
		tasks:      make(chan *InstrumentedTask, queueSize),
		quit:       make(chan struct{}),
		sampleRate: sampleRate,
	}
}

// Start launches the worker pool
func (p *InstrumentedPool) Start() {
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go func(workerID int) {
			defer p.wg.Done()
			
			for {
				select {
				case task, ok := <-p.tasks:
					if !ok {
						return // Channel closed
					}
					
					// Record start time
					task.StartedAt = time.Now()
					
					// Calculate and record queue time
					queueTime := task.StartedAt.Sub(task.EnqueuedAt)
					p.metrics.QueueTimeNanos.Add(int64(queueTime))
					
					// Execute the task
					err := task.Execute()
					
					// Record finish time
					task.FinishedAt = time.Now()
					
					// Calculate and record process time
					processTime := task.FinishedAt.Sub(task.StartedAt)
					p.metrics.ProcessTimeNanos.Add(int64(processTime))
					
					// Update metrics
					p.metrics.TasksCompleted.Add(1)
					p.metrics.TaskCount.Add(-1)
					
					if err != nil {
						p.metrics.TasksFailed.Add(1)
					}
					
					// Detailed logging for sampled tasks
					if p.metrics.TasksCompleted.Load()%int64(p.sampleRate) == 0 {
						fmt.Printf("Task completed: queue_time=%v process_time=%v error=%v\n",
							queueTime, processTime, err)
					}
					
				case <-p.quit:
					return
				}
			}
		}(i + 1)
	}
	
	// Start metrics reporter
	go p.reportMetrics()
}

// Submit adds a task to the pool
func (p *InstrumentedPool) Submit(execute func() error) bool {
	task := &InstrumentedTask{
		Execute:    execute,
		EnqueuedAt: time.Now(),
	}
	
	select {
	case p.tasks <- task:
		p.metrics.TasksSubmitted.Add(1)
		p.metrics.TaskCount.Add(1)
		return true
	default:
		p.metrics.TasksRejected.Add(1)
		return false
	}
}

// reportMetrics periodically reports pool metrics
func (p *InstrumentedPool) reportMetrics() {
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			p.logMetrics()
		case <-p.quit:
			return
		}
	}
}

// logMetrics logs the current metrics
func (p *InstrumentedPool) logMetrics() {
	fmt.Printf("\n--- Worker Pool Metrics ---\n")
	fmt.Printf("Tasks Submitted: %d\n", p.metrics.TasksSubmitted.Load())
	fmt.Printf("Tasks Completed: %d\n", p.metrics.TasksCompleted.Load())
	fmt.Printf("Tasks Rejected: %d\n", p.metrics.TasksRejected.Load())
	fmt.Printf("Tasks Failed: %d\n", p.metrics.TasksFailed.Load())
	fmt.Printf("Current Tasks: %d\n", p.metrics.TaskCount.Load())
	fmt.Printf("Avg Queue Time: %v\n", p.metrics.AvgQueueTime())
	fmt.Printf("Avg Process Time: %v\n", p.metrics.AvgProcessTime())
	fmt.Printf("---------------------------\n")
}

// GetMetrics returns a copy of the current metrics
func (p *InstrumentedPool) GetMetrics() Metrics {
	return p.metrics
}

// Stop gracefully shuts down the pool
func (p *InstrumentedPool) Stop() {
	close(p.quit)
	close(p.tasks)
	p.wg.Wait()
	
	// Final metrics report
	p.logMetrics()
}

func main() {
	// Create an instrumented pool with 5 workers, queue size 20, sample rate 10
	pool := NewInstrumentedPool(5, 20, 10)
	pool.Start()
	
	// Submit tasks with varying processing times
	for i := 0; i < 100; i++ {
		taskID := i
		
		// Simulate varying submission rates
		if i%10 == 0 {
			time.Sleep(500 * time.Millisecond)
		}
		
		success := pool.Submit(func() error {
			// Simulate varying processing times
			processingTime := time.Duration(50+i%200) * time.Millisecond
			time.Sleep(processingTime)
			
			// Simulate occasional errors
			if i%20 == 0 {
				return fmt.Errorf("simulated error in task %d", taskID)
			}
			
			return nil
		})
		
		if !success {
			fmt.Printf("Task %d rejected\n", taskID)
		}
	}
	
	// Wait for tasks to complete
	time.Sleep(10 * time.Second)
	
	// Stop the pool
	pool.Stop()
}

This instrumented worker pool provides comprehensive performance metrics:

  1. Timing measurements: Queue time and processing time for each task
  2. Throughput metrics: Tasks submitted, completed, rejected, and failed
  3. Periodic reporting: Regular output of key performance indicators
  4. Sampling: Detailed logging for a subset of tasks to reduce overhead