Performance Optimization and Monitoring
Optimizing and monitoring concurrent code is essential for distributed systems.
Goroutine Leak Detection
Detecting and preventing goroutine leaks is crucial for long-running services:
package main
import (
"context"
"fmt"
"log"
"runtime"
"sync"
"time"
)
// LeakDetector monitors goroutine count and detects potential leaks
type LeakDetector struct {
interval time.Duration
threshold float64
baselineCount int
previousCount int
samples []int
sampleSize int
stopCh chan struct{}
mu sync.Mutex
alertThreshold int
onLeak func(count int, samples []int)
}
// NewLeakDetector creates a new goroutine leak detector
func NewLeakDetector(interval time.Duration, threshold float64, sampleSize int) *LeakDetector {
return &LeakDetector{
interval: interval,
threshold: threshold,
samples: make([]int, 0, sampleSize),
sampleSize: sampleSize,
stopCh: make(chan struct{}),
alertThreshold: 1000, // Alert if more than 1000 goroutines over baseline
onLeak: func(count int, samples []int) {
log.Printf("WARNING: Potential goroutine leak detected! Current count: %d", count)
},
}
}
// Start begins monitoring for goroutine leaks
func (ld *LeakDetector) Start() {
// Capture the baseline after a short warmup
time.Sleep(100 * time.Millisecond)
ld.baselineCount = runtime.NumGoroutine()
ld.previousCount = ld.baselineCount
log.Printf("Leak detector started with baseline of %d goroutines", ld.baselineCount)
go func() {
ticker := time.NewTicker(ld.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ld.checkForLeaks()
case <-ld.stopCh:
return
}
}
}()
}
// Stop stops the leak detector
func (ld *LeakDetector) Stop() {
close(ld.stopCh)
}
// SetAlertThreshold sets the threshold for leak alerts
func (ld *LeakDetector) SetAlertThreshold(threshold int) {
ld.mu.Lock()
defer ld.mu.Unlock()
ld.alertThreshold = threshold
}
// SetOnLeakDetected sets the callback for leak detection
func (ld *LeakDetector) SetOnLeakDetected(callback func(count int, samples []int)) {
ld.mu.Lock()
defer ld.mu.Unlock()
ld.onLeak = callback
}
// checkForLeaks checks if there's a potential goroutine leak
func (ld *LeakDetector) checkForLeaks() {
ld.mu.Lock()
defer ld.mu.Unlock()
currentCount := runtime.NumGoroutine()
// Add to samples
ld.samples = append(ld.samples, currentCount)
if len(ld.samples) > ld.sampleSize {
ld.samples = ld.samples[1:]
}
// Check for significant increase
if currentCount > ld.baselineCount+ld.alertThreshold {
// Check if the count is consistently increasing
if currentCount > ld.previousCount {
// Calculate growth rate
growthRate := float64(currentCount-ld.previousCount) / float64(ld.previousCount)
if growthRate > ld.threshold {
if ld.onLeak != nil {
ld.onLeak(currentCount, ld.samples)
}
}
}
}
ld.previousCount = currentCount
}
// GetStats returns current goroutine statistics
func (ld *LeakDetector) GetStats() map[string]interface{} {
ld.mu.Lock()
defer ld.mu.Unlock()
return map[string]interface{}{
"current_count": runtime.NumGoroutine(),
"baseline_count": ld.baselineCount,
"previous_count": ld.previousCount,
"samples": ld.samples,
"alert_threshold": ld.alertThreshold,
}
}
// simulateLeakingGoroutines creates goroutines that never terminate
func simulateLeakingGoroutines(count int) {
for i := 0; i < count; i++ {
go func(id int) {
log.Printf("Leaking goroutine %d started", id)
select {} // This goroutine will never terminate
}(i)
}
}
// simulateTemporaryGoroutines creates goroutines that terminate after a delay
func simulateTemporaryGoroutines(count int, duration time.Duration) {
for i := 0; i < count; i++ {
go func(id int) {
log.Printf("Temporary goroutine %d started", id)
time.Sleep(duration)
log.Printf("Temporary goroutine %d finished", id)
}(i)
}
}
// dumpStacks prints all goroutine stacks for debugging
func dumpStacks() {
buf := make([]byte, 1<<20)
stackLen := runtime.Stack(buf, true)
log.Printf("=== GOROUTINE DUMP ===\n%s\n=== END DUMP ===", buf[:stackLen])
}
func main() {
// Create a leak detector
detector := NewLeakDetector(500*time.Millisecond, 0.05, 10)
detector.SetOnLeakDetected(func(count int, samples []int) {
log.Printf("LEAK DETECTED: %d goroutines running (baseline: %d)",
count, detector.baselineCount)
log.Printf("Recent samples: %v", samples)
dumpStacks() // Dump stacks for debugging
})
// Start the detector
detector.Start()
defer detector.Stop()
// Print initial stats
log.Printf("Initial goroutine count: %d", runtime.NumGoroutine())
// Simulate normal goroutine usage
log.Println("Creating temporary goroutines...")
simulateTemporaryGoroutines(100, 2*time.Second)
// Wait a bit
time.Sleep(3 * time.Second)
log.Printf("Goroutine count after temporary spike: %d", runtime.NumGoroutine())
// Simulate a leak
log.Println("Simulating a goroutine leak...")
simulateLeakingGoroutines(50)
// Wait for detection
time.Sleep(2 * time.Second)
// Create more leaks to trigger detection
log.Println("Creating more leaking goroutines...")
simulateLeakingGoroutines(100)
// Wait for detection
time.Sleep(5 * time.Second)
// Print final stats
stats := detector.GetStats()
log.Printf("Final stats: %+v", stats)
}
Contention Profiling
Identifying and resolving lock contention is essential for performance:
package main
import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
_ "net/http/pprof" // Import for profiling
"os"
"runtime"
"sync"
"time"
)
// SharedResource simulates a resource with different locking strategies
type SharedResource struct {
name string
value int
mutex sync.Mutex
rwMutex sync.RWMutex
accessLog []string
logMutex sync.Mutex
accessCount int64
}
// UpdateWithMutex updates the resource using a standard mutex
func (r *SharedResource) UpdateWithMutex(id int, val int) {
r.mutex.Lock()
defer r.mutex.Unlock()
// Simulate some work
time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
r.value += val
r.logAccess(fmt.Sprintf("Writer %d updated value to %d", id, r.value))
}
// ReadWithMutex reads the resource using a standard mutex
func (r *SharedResource) ReadWithMutex(id int) int {
r.mutex.Lock()
defer r.mutex.Unlock()
// Simulate some work
time.Sleep(time.Duration(1+rand.Intn(2)) * time.Millisecond)
r.logAccess(fmt.Sprintf("Reader %d read value %d", id, r.value))
return r.value
}
// UpdateWithRWMutex updates the resource using a read-write mutex
func (r *SharedResource) UpdateWithRWMutex(id int, val int) {
r.rwMutex.Lock()
defer r.rwMutex.Unlock()
// Simulate some work
time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
r.value += val
r.logAccess(fmt.Sprintf("Writer %d updated value to %d (RW)", id, r.value))
}
// ReadWithRWMutex reads the resource using a read-write mutex
func (r *SharedResource) ReadWithRWMutex(id int) int {
r.rwMutex.RLock()
defer r.rwMutex.RUnlock()
// Simulate some work
time.Sleep(time.Duration(1+rand.Intn(2)) * time.Millisecond)
r.logAccess(fmt.Sprintf("Reader %d read value %d (RW)", id, r.value))
return r.value
}
// logAccess logs an access to the resource
func (r *SharedResource) logAccess(msg string) {
r.logMutex.Lock()
defer r.logMutex.Unlock()
r.accessLog = append(r.accessLog, msg)
if len(r.accessLog) > 100 {
r.accessLog = r.accessLog[1:]
}
r.accessCount++
}
// GetStats returns statistics about the resource
func (r *SharedResource) GetStats() map[string]interface{} {
r.logMutex.Lock()
defer r.logMutex.Unlock()
return map[string]interface{}{
"name": r.name,
"value": r.value,
"access_count": r.accessCount,
"recent_logs": r.accessLog[max(0, len(r.accessLog)-5):],
}
}
// Helper function for max of two ints
func max(a, b int) int {
if a > b {
return a
}
return b
}
// runContentionTest runs a test with different concurrency levels
func runContentionTest(ctx context.Context, useRWMutex bool, readers, writers int) {
resource := &SharedResource{
name: fmt.Sprintf("Resource-%v-%d-%d", useRWMutex, readers, writers),
}
log.Printf("Starting contention test with %d readers and %d writers (RWMutex: %v)",
readers, writers, useRWMutex)
// Start writers
var wg sync.WaitGroup
for i := 0; i < writers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
// Update with
random value
if useRWMutex {
resource.UpdateWithRWMutex(id, rand.Intn(10))
} else {
resource.UpdateWithMutex(id, rand.Intn(10))
}
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
}
}
}(i)
}
// Start readers
for i := 0; i < readers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
// Read value
if useRWMutex {
_ = resource.ReadWithRWMutex(id)
} else {
_ = resource.ReadWithMutex(id)
}
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
}
}
}(i)
}
// Run for a fixed duration
time.Sleep(5 * time.Second)
// Print stats
stats := resource.GetStats()
log.Printf("Test results for %s:", resource.name)
log.Printf("- Final value: %d", stats["value"])
log.Printf("- Access count: %d", stats["access_count"])
log.Printf("- Recent accesses: %v", stats["recent_logs"])
}
func main() {
// Start pprof server for profiling
go func() {
log.Println("Starting pprof server on :6060")
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// Set GOMAXPROCS to use all CPUs
runtime.GOMAXPROCS(runtime.NumCPU())
log.Printf("Running with GOMAXPROCS=%d", runtime.GOMAXPROCS(0))
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create a context with cancel
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Run tests with different configurations
log.Println("=== Starting contention tests ===")
// Test 1: Few readers, few writers with standard mutex
runContentionTest(ctx, false, 5, 5)
// Test 2: Few readers, few writers with RWMutex
runContentionTest(ctx, true, 5, 5)
// Test 3: Many readers, few writers with standard mutex
runContentionTest(ctx, false, 50, 5)
// Test 4: Many readers, few writers with RWMutex
runContentionTest(ctx, true, 50, 5)
log.Println("=== Contention tests complete ===")
log.Println("To view profiling data, run: go tool pprof http://localhost:6060/debug/pprof/profile")
log.Println("To view mutex contention: go tool pprof http://localhost:6060/debug/pprof/mutex")
// Keep running for a while to allow profiling
log.Println("Press Ctrl+C to exit")
select {}
}
This contention profiling approach is valuable for distributed systems because it:
- Identifies performance bottlenecks in concurrent code
- Helps optimize lock usage for better throughput
- Provides insights into resource contention
- Enables data-driven decisions about synchronization strategies
- Integrates with Go’s built-in profiling tools