Benchmarking and Performance Analysis
To make informed decisions about worker pool configurations, it’s essential to benchmark different implementations under various workloads. Let’s create a simple benchmarking framework to compare worker pool performance:
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// WorkerPoolBenchmark represents a benchmark for worker pools
type WorkerPoolBenchmark struct {
name string
workers int
queueSize int
taskCount int
taskDuration time.Duration
taskDurationSD time.Duration // Standard deviation for task duration
results BenchmarkResults
}
// BenchmarkResults contains the results of a benchmark run
type BenchmarkResults struct {
totalTime time.Duration
throughput float64 // Tasks per second
avgLatency time.Duration
p95Latency time.Duration
p99Latency time.Duration
rejectionRate float64
cpuUtilization float64
}
// SimplePool is a minimal worker pool for benchmarking
type SimplePool struct {
tasks chan func()
wg sync.WaitGroup
workers int
}
// NewSimplePool creates a new simple worker pool
func NewSimplePool(workers, queueSize int) *SimplePool {
return &SimplePool{
tasks: make(chan func(), queueSize),
workers: workers,
}
}
// Start launches the worker pool
func (p *SimplePool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for task := range p.tasks {
task()
}
}()
}
}
// Submit adds a task to the pool
func (p *SimplePool) Submit(task func()) bool {
select {
case p.tasks <- task:
return true
default:
return false
}
}
// Stop gracefully shuts down the pool
func (p *SimplePool) Stop() {
close(p.tasks)
p.wg.Wait()
}
// RunBenchmark runs a benchmark on the specified worker pool configuration
func RunBenchmark(config WorkerPoolBenchmark) BenchmarkResults {
fmt.Printf("Running benchmark: %s (workers=%d, queue=%d, tasks=%d)\n",
config.name, config.workers, config.queueSize, config.taskCount)
// Create the pool
pool := NewSimplePool(config.workers, config.queueSize)
pool.Start()
// Prepare for benchmark
var wg sync.WaitGroup
var completed atomic.Int32
var rejected atomic.Int32
latencies := make([]time.Duration, 0, config.taskCount)
var latencyMutex sync.Mutex
// Start timing
startTime := time.Now()
// Submit tasks
for i := 0; i < config.taskCount; i++ {
wg.Add(1)
taskID := i
// Record submission time
submitTime := time.Now()
success := pool.Submit(func() {
// Calculate task duration with some variability
duration := config.taskDuration
if config.taskDurationSD > 0 {
// Add some randomness to task duration
variation := time.Duration(rand.NormFloat64() * float64(config.taskDurationSD))
duration += variation
if duration < 0 {
duration = 1 * time.Millisecond // Ensure positive duration
}
}
// Simulate work
time.Sleep(duration)
// Record completion and latency
completionTime := time.Now()
latency := completionTime.Sub(submitTime)
latencyMutex.Lock()
latencies = append(latencies, latency)
latencyMutex.Unlock()
completed.Add(1)
wg.Done()
})
if !success {
rejected.Add(1)
wg.Done()
}
// Control submission rate if needed
if i%100 == 0 {
time.Sleep(1 * time.Millisecond)
}
}
// Wait for all tasks to complete or be rejected
wg.Wait()
// Stop timing
endTime := time.Now()
totalTime := endTime.Sub(startTime)
// Calculate results
completedCount := completed.Load()
rejectedCount := rejected.Load()
// Sort latencies for percentile calculation
latencyMutex.Lock()
sortDurations(latencies)
// Calculate average latency
var totalLatency time.Duration
for _, l := range latencies {
totalLatency += l
}
var avgLatency time.Duration
if len(latencies) > 0 {
avgLatency = totalLatency / time.Duration(len(latencies))
}
// Calculate percentile latencies
var p95Latency, p99Latency time.Duration
if len(latencies) > 0 {
p95Index := int(float64(len(latencies)) * 0.95)
p99Index := int(float64(len(latencies)) * 0.99)
if p95Index >= len(latencies) {
p95Index = len(latencies) - 1
}
if p99Index >= len(latencies) {
p99Index = len(latencies) - 1
}
p95Latency = latencies[p95Index]
p99Latency = latencies[p99Index]
}
latencyMutex.Unlock()
// Calculate throughput and rejection rate
throughput := float64(completedCount) / totalTime.Seconds()
rejectionRate := float64(rejectedCount) / float64(config.taskCount)
// Create and return results
results := BenchmarkResults{
totalTime: totalTime,
throughput: throughput,
avgLatency: avgLatency,
p95Latency: p95Latency,
p99Latency: p99Latency,
rejectionRate: rejectionRate,
// CPU utilization would require OS-specific measurements
}
// Print results
fmt.Printf("Results for %s:\n", config.name)
fmt.Printf(" Total time: %v\n", results.totalTime)
fmt.Printf(" Throughput: %.2f tasks/sec\n", results.throughput)
fmt.Printf(" Avg latency: %v\n", results.avgLatency)
fmt.Printf(" P95 latency: %v\n", results.p95Latency)
fmt.Printf(" P99 latency: %v\n", results.p99Latency)
fmt.Printf(" Rejection rate: %.2f%%\n", results.rejectionRate*100)
fmt.Println()
// Clean up
pool.Stop()
return results
}
// sortDurations sorts a slice of durations in ascending order
func sortDurations(durations []time.Duration) {
// Simple insertion sort for demonstration
for i := 1; i < len(durations); i++ {
key := durations[i]
j := i - 1
for j >= 0 && durations[j] > key {
durations[j+1] = durations[j]
j--
}
durations[j+1] = key
}
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Define benchmark configurations
benchmarks := []WorkerPoolBenchmark{
{
name: "Small pool, short tasks",
workers: 4,
queueSize: 10,
taskCount: 1000,
taskDuration: 10 * time.Millisecond,
taskDurationSD: 5 * time.Millisecond,
},
{
name: "Medium pool, mixed tasks",
workers: 16,
queueSize: 100,
taskCount: 5000,
taskDuration: 20 * time.Millisecond,
taskDurationSD: 10 * time.Millisecond,
},
{
name: "Large pool, long tasks",
workers: 32,
queueSize: 500,
taskCount: 10000,
taskDuration: 50 * time.Millisecond,
taskDurationSD: 25 * time.Millisecond,
},
}
// Run benchmarks
for i := range benchmarks {
benchmarks[i].results = RunBenchmark(benchmarks[i])
}
// Compare results
fmt.Println("Benchmark Comparison:")
fmt.Println("---------------------")
for _, b := range benchmarks {
fmt.Printf("%s: %.2f tasks/sec, %.2f%% rejection, avg latency %v\n",
b.name, b.results.throughput, b.results.rejectionRate*100, b.results.avgLatency)
}
}
This benchmarking framework allows you to compare different worker pool configurations and identify the optimal setup for your specific workload characteristics.