Performance Monitoring and Optimization
Monitoring worker pool performance is essential for identifying bottlenecks and optimizing resource utilization.
Instrumented Worker Pool
Let’s implement a worker pool with comprehensive instrumentation:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// Metrics collects performance data about the worker pool
type Metrics struct {
TasksSubmitted atomic.Int64
TasksCompleted atomic.Int64
TasksRejected atomic.Int64
TasksFailed atomic.Int64
QueueTimeNanos atomic.Int64
ProcessTimeNanos atomic.Int64
TaskCount atomic.Int64 // Current tasks in queue or being processed
}
// AvgQueueTime returns the average time tasks spend in the queue
func (m *Metrics) AvgQueueTime() time.Duration {
completed := m.TasksCompleted.Load()
if completed == 0 {
return 0
}
avgNanos := m.QueueTimeNanos.Load() / completed
return time.Duration(avgNanos) * time.Nanosecond
}
// AvgProcessTime returns the average time to process tasks
func (m *Metrics) AvgProcessTime() time.Duration {
completed := m.TasksCompleted.Load()
if completed == 0 {
return 0
}
avgNanos := m.ProcessTimeNanos.Load() / completed
return time.Duration(avgNanos) * time.Nanosecond
}
// InstrumentedTask represents a task with timing information
type InstrumentedTask struct {
Execute func() error
EnqueuedAt time.Time
StartedAt time.Time
FinishedAt time.Time
}
// InstrumentedPool represents a worker pool with performance instrumentation
type InstrumentedPool struct {
workers int
tasks chan *InstrumentedTask
wg sync.WaitGroup
quit chan struct{}
metrics Metrics
sampleRate int // Sample rate for detailed metrics (1 = all tasks, 10 = 10% of tasks)
}
// NewInstrumentedPool creates a new instrumented worker pool
func NewInstrumentedPool(workers, queueSize, sampleRate int) *InstrumentedPool {
return &InstrumentedPool{
workers: workers,
tasks: make(chan *InstrumentedTask, queueSize),
quit: make(chan struct{}),
sampleRate: sampleRate,
}
}
// Start launches the worker pool
func (p *InstrumentedPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for {
select {
case task, ok := <-p.tasks:
if !ok {
return // Channel closed
}
// Record start time
task.StartedAt = time.Now()
// Calculate and record queue time
queueTime := task.StartedAt.Sub(task.EnqueuedAt)
p.metrics.QueueTimeNanos.Add(int64(queueTime))
// Execute the task
err := task.Execute()
// Record finish time
task.FinishedAt = time.Now()
// Calculate and record process time
processTime := task.FinishedAt.Sub(task.StartedAt)
p.metrics.ProcessTimeNanos.Add(int64(processTime))
// Update metrics
p.metrics.TasksCompleted.Add(1)
p.metrics.TaskCount.Add(-1)
if err != nil {
p.metrics.TasksFailed.Add(1)
}
// Detailed logging for sampled tasks
if p.metrics.TasksCompleted.Load()%int64(p.sampleRate) == 0 {
fmt.Printf("Task completed: queue_time=%v process_time=%v error=%v\n",
queueTime, processTime, err)
}
case <-p.quit:
return
}
}
}(i + 1)
}
// Start metrics reporter
go p.reportMetrics()
}
// Submit adds a task to the pool
func (p *InstrumentedPool) Submit(execute func() error) bool {
task := &InstrumentedTask{
Execute: execute,
EnqueuedAt: time.Now(),
}
select {
case p.tasks <- task:
p.metrics.TasksSubmitted.Add(1)
p.metrics.TaskCount.Add(1)
return true
default:
p.metrics.TasksRejected.Add(1)
return false
}
}
// reportMetrics periodically reports pool metrics
func (p *InstrumentedPool) reportMetrics() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.logMetrics()
case <-p.quit:
return
}
}
}
// logMetrics logs the current metrics
func (p *InstrumentedPool) logMetrics() {
fmt.Printf("\n--- Worker Pool Metrics ---\n")
fmt.Printf("Tasks Submitted: %d\n", p.metrics.TasksSubmitted.Load())
fmt.Printf("Tasks Completed: %d\n", p.metrics.TasksCompleted.Load())
fmt.Printf("Tasks Rejected: %d\n", p.metrics.TasksRejected.Load())
fmt.Printf("Tasks Failed: %d\n", p.metrics.TasksFailed.Load())
fmt.Printf("Current Tasks: %d\n", p.metrics.TaskCount.Load())
fmt.Printf("Avg Queue Time: %v\n", p.metrics.AvgQueueTime())
fmt.Printf("Avg Process Time: %v\n", p.metrics.AvgProcessTime())
fmt.Printf("---------------------------\n")
}
// GetMetrics returns a copy of the current metrics
func (p *InstrumentedPool) GetMetrics() Metrics {
return p.metrics
}
// Stop gracefully shuts down the pool
func (p *InstrumentedPool) Stop() {
close(p.quit)
close(p.tasks)
p.wg.Wait()
// Final metrics report
p.logMetrics()
}
func main() {
// Create an instrumented pool with 5 workers, queue size 20, sample rate 10
pool := NewInstrumentedPool(5, 20, 10)
pool.Start()
// Submit tasks with varying processing times
for i := 0; i < 100; i++ {
taskID := i
// Simulate varying submission rates
if i%10 == 0 {
time.Sleep(500 * time.Millisecond)
}
success := pool.Submit(func() error {
// Simulate varying processing times
processingTime := time.Duration(50+i%200) * time.Millisecond
time.Sleep(processingTime)
// Simulate occasional errors
if i%20 == 0 {
return fmt.Errorf("simulated error in task %d", taskID)
}
return nil
})
if !success {
fmt.Printf("Task %d rejected\n", taskID)
}
}
// Wait for tasks to complete
time.Sleep(10 * time.Second)
// Stop the pool
pool.Stop()
}
This instrumented worker pool provides comprehensive performance metrics:
- Timing measurements: Queue time and processing time for each task
- Throughput metrics: Tasks submitted, completed, rejected, and failed
- Periodic reporting: Regular output of key performance indicators
- Sampling: Detailed logging for a subset of tasks to reduce overhead