Performance Optimization and Monitoring

Optimizing and monitoring concurrent code is essential for distributed systems.

Goroutine Leak Detection

Detecting and preventing goroutine leaks is crucial for long-running services:

package main

import (
	"context"
	"fmt"
	"log"
	"runtime"
	"sync"
	"time"
)

// LeakDetector monitors goroutine count and detects potential leaks
type LeakDetector struct {
	interval       time.Duration
	threshold      float64
	baselineCount  int
	previousCount  int
	samples        []int
	sampleSize     int
	stopCh         chan struct{}
	mu             sync.Mutex
	alertThreshold int
	onLeak         func(count int, samples []int)
}

// NewLeakDetector creates a new goroutine leak detector
func NewLeakDetector(interval time.Duration, threshold float64, sampleSize int) *LeakDetector {
	return &LeakDetector{
		interval:       interval,
		threshold:      threshold,
		samples:        make([]int, 0, sampleSize),
		sampleSize:     sampleSize,
		stopCh:         make(chan struct{}),
		alertThreshold: 1000, // Alert if more than 1000 goroutines over baseline
		onLeak: func(count int, samples []int) {
			log.Printf("WARNING: Potential goroutine leak detected! Current count: %d", count)
		},
	}
}

// Start begins monitoring for goroutine leaks
func (ld *LeakDetector) Start() {
	// Capture the baseline after a short warmup
	time.Sleep(100 * time.Millisecond)
	ld.baselineCount = runtime.NumGoroutine()
	ld.previousCount = ld.baselineCount

	log.Printf("Leak detector started with baseline of %d goroutines", ld.baselineCount)

	go func() {
		ticker := time.NewTicker(ld.interval)
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				ld.checkForLeaks()
			case <-ld.stopCh:
				return
			}
		}
	}()
}

// Stop stops the leak detector
func (ld *LeakDetector) Stop() {
	close(ld.stopCh)
}

// SetAlertThreshold sets the threshold for leak alerts
func (ld *LeakDetector) SetAlertThreshold(threshold int) {
	ld.mu.Lock()
	defer ld.mu.Unlock()
	ld.alertThreshold = threshold
}

// SetOnLeakDetected sets the callback for leak detection
func (ld *LeakDetector) SetOnLeakDetected(callback func(count int, samples []int)) {
	ld.mu.Lock()
	defer ld.mu.Unlock()
	ld.onLeak = callback
}

// checkForLeaks checks if there's a potential goroutine leak
func (ld *LeakDetector) checkForLeaks() {
	ld.mu.Lock()
	defer ld.mu.Unlock()

	currentCount := runtime.NumGoroutine()
	
	// Add to samples
	ld.samples = append(ld.samples, currentCount)
	if len(ld.samples) > ld.sampleSize {
		ld.samples = ld.samples[1:]
	}
	
	// Check for significant increase
	if currentCount > ld.baselineCount+ld.alertThreshold {
		// Check if the count is consistently increasing
		if currentCount > ld.previousCount {
			// Calculate growth rate
			growthRate := float64(currentCount-ld.previousCount) / float64(ld.previousCount)
			
			if growthRate > ld.threshold {
				if ld.onLeak != nil {
					ld.onLeak(currentCount, ld.samples)
				}
			}
		}
	}
	
	ld.previousCount = currentCount
}

// GetStats returns current goroutine statistics
func (ld *LeakDetector) GetStats() map[string]interface{} {
	ld.mu.Lock()
	defer ld.mu.Unlock()
	
	return map[string]interface{}{
		"current_count":   runtime.NumGoroutine(),
		"baseline_count":  ld.baselineCount,
		"previous_count":  ld.previousCount,
		"samples":         ld.samples,
		"alert_threshold": ld.alertThreshold,
	}
}

// simulateLeakingGoroutines creates goroutines that never terminate
func simulateLeakingGoroutines(count int) {
	for i := 0; i < count; i++ {
		go func(id int) {
			log.Printf("Leaking goroutine %d started", id)
			select {} // This goroutine will never terminate
		}(i)
	}
}

// simulateTemporaryGoroutines creates goroutines that terminate after a delay
func simulateTemporaryGoroutines(count int, duration time.Duration) {
	for i := 0; i < count; i++ {
		go func(id int) {
			log.Printf("Temporary goroutine %d started", id)
			time.Sleep(duration)
			log.Printf("Temporary goroutine %d finished", id)
		}(i)
	}
}

// dumpStacks prints all goroutine stacks for debugging
func dumpStacks() {
	buf := make([]byte, 1<<20)
	stackLen := runtime.Stack(buf, true)
	log.Printf("=== GOROUTINE DUMP ===\n%s\n=== END DUMP ===", buf[:stackLen])
}

func main() {
	// Create a leak detector
	detector := NewLeakDetector(500*time.Millisecond, 0.05, 10)
	detector.SetOnLeakDetected(func(count int, samples []int) {
		log.Printf("LEAK DETECTED: %d goroutines running (baseline: %d)", 
			count, detector.baselineCount)
		log.Printf("Recent samples: %v", samples)
		dumpStacks() // Dump stacks for debugging
	})
	
	// Start the detector
	detector.Start()
	defer detector.Stop()
	
	// Print initial stats
	log.Printf("Initial goroutine count: %d", runtime.NumGoroutine())
	
	// Simulate normal goroutine usage
	log.Println("Creating temporary goroutines...")
	simulateTemporaryGoroutines(100, 2*time.Second)
	
	// Wait a bit
	time.Sleep(3 * time.Second)
	log.Printf("Goroutine count after temporary spike: %d", runtime.NumGoroutine())
	
	// Simulate a leak
	log.Println("Simulating a goroutine leak...")
	simulateLeakingGoroutines(50)
	
	// Wait for detection
	time.Sleep(2 * time.Second)
	
	// Create more leaks to trigger detection
	log.Println("Creating more leaking goroutines...")
	simulateLeakingGoroutines(100)
	
	// Wait for detection
	time.Sleep(5 * time.Second)
	
	// Print final stats
	stats := detector.GetStats()
	log.Printf("Final stats: %+v", stats)
}

Contention Profiling

Identifying and resolving lock contention is essential for performance:

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"net/http"
	_ "net/http/pprof" // Import for profiling
	"os"
	"runtime"
	"sync"
	"time"
)

// SharedResource simulates a resource with different locking strategies
type SharedResource struct {
	name       string
	value      int
	mutex      sync.Mutex
	rwMutex    sync.RWMutex
	accessLog  []string
	logMutex   sync.Mutex
	accessCount int64
}

// UpdateWithMutex updates the resource using a standard mutex
func (r *SharedResource) UpdateWithMutex(id int, val int) {
	r.mutex.Lock()
	defer r.mutex.Unlock()
	
	// Simulate some work
	time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
	
	r.value += val
	r.logAccess(fmt.Sprintf("Writer %d updated value to %d", id, r.value))
}

// ReadWithMutex reads the resource using a standard mutex
func (r *SharedResource) ReadWithMutex(id int) int {
	r.mutex.Lock()
	defer r.mutex.Unlock()
	
	// Simulate some work
	time.Sleep(time.Duration(1+rand.Intn(2)) * time.Millisecond)
	
	r.logAccess(fmt.Sprintf("Reader %d read value %d", id, r.value))
	return r.value
}

// UpdateWithRWMutex updates the resource using a read-write mutex
func (r *SharedResource) UpdateWithRWMutex(id int, val int) {
	r.rwMutex.Lock()
	defer r.rwMutex.Unlock()
	
	// Simulate some work
	time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
	
	r.value += val
	r.logAccess(fmt.Sprintf("Writer %d updated value to %d (RW)", id, r.value))
}

// ReadWithRWMutex reads the resource using a read-write mutex
func (r *SharedResource) ReadWithRWMutex(id int) int {
	r.rwMutex.RLock()
	defer r.rwMutex.RUnlock()
	
	// Simulate some work
	time.Sleep(time.Duration(1+rand.Intn(2)) * time.Millisecond)
	
	r.logAccess(fmt.Sprintf("Reader %d read value %d (RW)", id, r.value))
	return r.value
}

// logAccess logs an access to the resource
func (r *SharedResource) logAccess(msg string) {
	r.logMutex.Lock()
	defer r.logMutex.Unlock()
	
	r.accessLog = append(r.accessLog, msg)
	if len(r.accessLog) > 100 {
		r.accessLog = r.accessLog[1:]
	}
	r.accessCount++
}

// GetStats returns statistics about the resource
func (r *SharedResource) GetStats() map[string]interface{} {
	r.logMutex.Lock()
	defer r.logMutex.Unlock()
	
	return map[string]interface{}{
		"name":         r.name,
		"value":        r.value,
		"access_count": r.accessCount,
		"recent_logs":  r.accessLog[max(0, len(r.accessLog)-5):],
	}
}

// Helper function for max of two ints
func max(a, b int) int {
	if a > b {
		return a
	}
	return b
}

// runContentionTest runs a test with different concurrency levels
func runContentionTest(ctx context.Context, useRWMutex bool, readers, writers int) {
	resource := &SharedResource{
		name: fmt.Sprintf("Resource-%v-%d-%d", useRWMutex, readers, writers),
	}
	
	log.Printf("Starting contention test with %d readers and %d writers (RWMutex: %v)",
		readers, writers, useRWMutex)
	
	// Start writers
	var wg sync.WaitGroup
	for i := 0; i < writers; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for {
				select {
				case <-ctx.Done():
					return
				default:
					// Update with
 random value
					if useRWMutex {
						resource.UpdateWithRWMutex(id, rand.Intn(10))
					} else {
						resource.UpdateWithMutex(id, rand.Intn(10))
					}
					
					time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
				}
			}
		}(i)
	}
	
	// Start readers
	for i := 0; i < readers; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for {
				select {
				case <-ctx.Done():
					return
				default:
					// Read value
					if useRWMutex {
						_ = resource.ReadWithRWMutex(id)
					} else {
						_ = resource.ReadWithMutex(id)
					}
					
					time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
				}
			}
		}(i)
	}
	
	// Run for a fixed duration
	time.Sleep(5 * time.Second)
	
	// Print stats
	stats := resource.GetStats()
	log.Printf("Test results for %s:", resource.name)
	log.Printf("- Final value: %d", stats["value"])
	log.Printf("- Access count: %d", stats["access_count"])
	log.Printf("- Recent accesses: %v", stats["recent_logs"])
}

func main() {
	// Start pprof server for profiling
	go func() {
		log.Println("Starting pprof server on :6060")
		log.Println(http.ListenAndServe("localhost:6060", nil))
	}()
	
	// Set GOMAXPROCS to use all CPUs
	runtime.GOMAXPROCS(runtime.NumCPU())
	log.Printf("Running with GOMAXPROCS=%d", runtime.GOMAXPROCS(0))
	
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create a context with cancel
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	
	// Run tests with different configurations
	log.Println("=== Starting contention tests ===")
	
	// Test 1: Few readers, few writers with standard mutex
	runContentionTest(ctx, false, 5, 5)
	
	// Test 2: Few readers, few writers with RWMutex
	runContentionTest(ctx, true, 5, 5)
	
	// Test 3: Many readers, few writers with standard mutex
	runContentionTest(ctx, false, 50, 5)
	
	// Test 4: Many readers, few writers with RWMutex
	runContentionTest(ctx, true, 50, 5)
	
	log.Println("=== Contention tests complete ===")
	log.Println("To view profiling data, run: go tool pprof http://localhost:6060/debug/pprof/profile")
	log.Println("To view mutex contention: go tool pprof http://localhost:6060/debug/pprof/mutex")
	
	// Keep running for a while to allow profiling
	log.Println("Press Ctrl+C to exit")
	select {}
}

This contention profiling approach is valuable for distributed systems because it:

  • Identifies performance bottlenecks in concurrent code
  • Helps optimize lock usage for better throughput
  • Provides insights into resource contention
  • Enables data-driven decisions about synchronization strategies
  • Integrates with Go’s built-in profiling tools