Benchmarking and Performance Analysis

To make informed decisions about worker pool configurations, it’s essential to benchmark different implementations under various workloads. Let’s create a simple benchmarking framework to compare worker pool performance:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

// WorkerPoolBenchmark represents a benchmark for worker pools
type WorkerPoolBenchmark struct {
	name           string
	workers        int
	queueSize      int
	taskCount      int
	taskDuration   time.Duration
	taskDurationSD time.Duration // Standard deviation for task duration
	results        BenchmarkResults
}

// BenchmarkResults contains the results of a benchmark run
type BenchmarkResults struct {
	totalTime      time.Duration
	throughput     float64 // Tasks per second
	avgLatency     time.Duration
	p95Latency     time.Duration
	p99Latency     time.Duration
	rejectionRate  float64
	cpuUtilization float64
}

// SimplePool is a minimal worker pool for benchmarking
type SimplePool struct {
	tasks   chan func()
	wg      sync.WaitGroup
	workers int
}

// NewSimplePool creates a new simple worker pool
func NewSimplePool(workers, queueSize int) *SimplePool {
	return &SimplePool{
		tasks:   make(chan func(), queueSize),
		workers: workers,
	}
}

// Start launches the worker pool
func (p *SimplePool) Start() {
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go func() {
			defer p.wg.Done()
			for task := range p.tasks {
				task()
			}
		}()
	}
}

// Submit adds a task to the pool
func (p *SimplePool) Submit(task func()) bool {
	select {
	case p.tasks <- task:
		return true
	default:
		return false
	}
}

// Stop gracefully shuts down the pool
func (p *SimplePool) Stop() {
	close(p.tasks)
	p.wg.Wait()
}

// RunBenchmark runs a benchmark on the specified worker pool configuration
func RunBenchmark(config WorkerPoolBenchmark) BenchmarkResults {
	fmt.Printf("Running benchmark: %s (workers=%d, queue=%d, tasks=%d)\n",
		config.name, config.workers, config.queueSize, config.taskCount)
	
	// Create the pool
	pool := NewSimplePool(config.workers, config.queueSize)
	pool.Start()
	
	// Prepare for benchmark
	var wg sync.WaitGroup
	var completed atomic.Int32
	var rejected atomic.Int32
	latencies := make([]time.Duration, 0, config.taskCount)
	var latencyMutex sync.Mutex
	
	// Start timing
	startTime := time.Now()
	
	// Submit tasks
	for i := 0; i < config.taskCount; i++ {
		wg.Add(1)
		taskID := i
		
		// Record submission time
		submitTime := time.Now()
		
		success := pool.Submit(func() {
			// Calculate task duration with some variability
			duration := config.taskDuration
			if config.taskDurationSD > 0 {
				// Add some randomness to task duration
				variation := time.Duration(rand.NormFloat64() * float64(config.taskDurationSD))
				duration += variation
				if duration < 0 {
					duration = 1 * time.Millisecond // Ensure positive duration
				}
			}
			
			// Simulate work
			time.Sleep(duration)
			
			// Record completion and latency
			completionTime := time.Now()
			latency := completionTime.Sub(submitTime)
			
			latencyMutex.Lock()
			latencies = append(latencies, latency)
			latencyMutex.Unlock()
			
			completed.Add(1)
			wg.Done()
		})
		
		if !success {
			rejected.Add(1)
			wg.Done()
		}
		
		// Control submission rate if needed
		if i%100 == 0 {
			time.Sleep(1 * time.Millisecond)
		}
	}
	
	// Wait for all tasks to complete or be rejected
	wg.Wait()
	
	// Stop timing
	endTime := time.Now()
	totalTime := endTime.Sub(startTime)
	
	// Calculate results
	completedCount := completed.Load()
	rejectedCount := rejected.Load()
	
	// Sort latencies for percentile calculation
	latencyMutex.Lock()
	sortDurations(latencies)
	
	// Calculate average latency
	var totalLatency time.Duration
	for _, l := range latencies {
		totalLatency += l
	}
	
	var avgLatency time.Duration
	if len(latencies) > 0 {
		avgLatency = totalLatency / time.Duration(len(latencies))
	}
	
	// Calculate percentile latencies
	var p95Latency, p99Latency time.Duration
	if len(latencies) > 0 {
		p95Index := int(float64(len(latencies)) * 0.95)
		p99Index := int(float64(len(latencies)) * 0.99)
		if p95Index >= len(latencies) {
			p95Index = len(latencies) - 1
		}
		if p99Index >= len(latencies) {
			p99Index = len(latencies) - 1
		}
		p95Latency = latencies[p95Index]
		p99Latency = latencies[p99Index]
	}
	latencyMutex.Unlock()
	
	// Calculate throughput and rejection rate
	throughput := float64(completedCount) / totalTime.Seconds()
	rejectionRate := float64(rejectedCount) / float64(config.taskCount)
	
	// Create and return results
	results := BenchmarkResults{
		totalTime:     totalTime,
		throughput:    throughput,
		avgLatency:    avgLatency,
		p95Latency:    p95Latency,
		p99Latency:    p99Latency,
		rejectionRate: rejectionRate,
		// CPU utilization would require OS-specific measurements
	}
	
	// Print results
	fmt.Printf("Results for %s:\n", config.name)
	fmt.Printf("  Total time: %v\n", results.totalTime)
	fmt.Printf("  Throughput: %.2f tasks/sec\n", results.throughput)
	fmt.Printf("  Avg latency: %v\n", results.avgLatency)
	fmt.Printf("  P95 latency: %v\n", results.p95Latency)
	fmt.Printf("  P99 latency: %v\n", results.p99Latency)
	fmt.Printf("  Rejection rate: %.2f%%\n", results.rejectionRate*100)
	fmt.Println()
	
	// Clean up
	pool.Stop()
	
	return results
}

// sortDurations sorts a slice of durations in ascending order
func sortDurations(durations []time.Duration) {
	// Simple insertion sort for demonstration
	for i := 1; i < len(durations); i++ {
		key := durations[i]
		j := i - 1
		for j >= 0 && durations[j] > key {
			durations[j+1] = durations[j]
			j--
		}
		durations[j+1] = key
	}
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Define benchmark configurations
	benchmarks := []WorkerPoolBenchmark{
		{
			name:           "Small pool, short tasks",
			workers:        4,
			queueSize:      10,
			taskCount:      1000,
			taskDuration:   10 * time.Millisecond,
			taskDurationSD: 5 * time.Millisecond,
		},
		{
			name:           "Medium pool, mixed tasks",
			workers:        16,
			queueSize:      100,
			taskCount:      5000,
			taskDuration:   20 * time.Millisecond,
			taskDurationSD: 10 * time.Millisecond,
		},
		{
			name:           "Large pool, long tasks",
			workers:        32,
			queueSize:      500,
			taskCount:      10000,
			taskDuration:   50 * time.Millisecond,
			taskDurationSD: 25 * time.Millisecond,
		},
	}
	
	// Run benchmarks
	for i := range benchmarks {
		benchmarks[i].results = RunBenchmark(benchmarks[i])
	}
	
	// Compare results
	fmt.Println("Benchmark Comparison:")
	fmt.Println("---------------------")
	for _, b := range benchmarks {
		fmt.Printf("%s: %.2f tasks/sec, %.2f%% rejection, avg latency %v\n",
			b.name, b.results.throughput, b.results.rejectionRate*100, b.results.avgLatency)
	}
}

This benchmarking framework allows you to compare different worker pool configurations and identify the optimal setup for your specific workload characteristics.