Production Best Practices

When deploying concurrent Go code in production distributed systems, following these best practices can help ensure reliability and performance.

Context Propagation

Always propagate context through your entire call chain to ensure proper cancellation and timeout handling:

package main

import (
	"context"
	"fmt"
	"log"
	"time"
)

// Service represents a component in a distributed system
type Service struct {
	name        string
	dependencies []*Service
	processingTime time.Duration
}

// NewService creates a new service with dependencies
func NewService(name string, processingTime time.Duration, dependencies ...*Service) *Service {
	return &Service{
		name:           name,
		dependencies:   dependencies,
		processingTime: processingTime,
	}
}

// Process handles a request, propagating context to dependencies
func (s *Service) Process(ctx context.Context, requestID string) (string, error) {
	// Check if context is already canceled
	if ctx.Err() != nil {
		return "", ctx.Err()
	}
	
	log.Printf("[%s] Processing request %s", s.name, requestID)
	
	// Call dependencies first
	for _, dep := range s.dependencies {
		// Create a child context with timeout for the dependency call
		depCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
		defer cancel() // Always cancel to release resources
		
		result, err := dep.Process(depCtx, requestID)
		if err != nil {
			log.Printf("[%s] Dependency %s failed: %v", s.name, dep.name, err)
			return "", fmt.Errorf("dependency %s failed: %w", dep.name, err)
		}
		
		log.Printf("[%s] Dependency %s returned: %s", s.name, dep.name, result)
	}
	
	// Simulate processing time
	select {
	case <-time.After(s.processingTime):
		// Processing completed successfully
	case <-ctx.Done():
		// Context was canceled
		return "", ctx.Err()
	}
	
	response := fmt.Sprintf("Response from %s for request %s", s.name, requestID)
	log.Printf("[%s] Completed request %s", s.name, requestID)
	return response, nil
}

func main() {
	// Create a service dependency graph
	serviceD := NewService("ServiceD", 100*time.Millisecond)
	serviceE := NewService("ServiceE", 150*time.Millisecond)
	serviceB := NewService("ServiceB", 200*time.Millisecond, serviceD, serviceE)
	serviceC := NewService("ServiceC", 150*time.Millisecond, serviceE)
	serviceA := NewService("ServiceA", 100*time.Millisecond, serviceB, serviceC)
	
	// Create a root context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
	defer cancel()
	
	// Add request ID to context
	requestID := fmt.Sprintf("req-%d", time.Now().UnixNano())
	
	// Process the request
	log.Printf("Starting request %s with 800ms timeout", requestID)
	result, err := serviceA.Process(ctx, requestID)
	
	if err != nil {
		log.Printf("Request failed: %v", err)
	} else {
		log.Printf("Request succeeded: %s", result)
	}
	
	// Try another request with insufficient timeout
	log.Println("\nStarting another request with insufficient timeout")
	ctx2, cancel2 := context.WithTimeout(context.Background(), 300*time.Millisecond)
	defer cancel2()
	
	requestID2 := fmt.Sprintf("req-%d", time.Now().UnixNano())
	result2, err2 := serviceA.Process(ctx2, requestID2)
	
	if err2 != nil {
		log.Printf("Request failed as expected: %v", err2)
	} else {
		log.Printf("Request succeeded unexpectedly: %s", result2)
	}
}

Bounded Concurrency

Always limit concurrency to prevent resource exhaustion:

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"sync"
	"time"
)

// BoundedExecutor limits the number of concurrent operations
type BoundedExecutor struct {
	semaphore chan struct{}
	timeout   time.Duration
}

// NewBoundedExecutor creates a new bounded executor
func NewBoundedExecutor(maxConcurrent int, timeout time.Duration) *BoundedExecutor {
	return &BoundedExecutor{
		semaphore: make(chan struct{}, maxConcurrent),
		timeout:   timeout,
	}
}

// Execute runs the given function with bounded concurrency
func (e *BoundedExecutor) Execute(ctx context.Context, fn func() (interface{}, error)) (interface{}, error) {
	// Create a context with timeout
	execCtx, cancel := context.WithTimeout(ctx, e.timeout)
	defer cancel()
	
	// Try to acquire a semaphore slot
	select {
	case e.semaphore <- struct{}{}:
		// Acquired a slot
		defer func() { <-e.semaphore }()
	case <-execCtx.Done():
		// Couldn't acquire a slot within timeout
		return nil, fmt.Errorf("operation rejected: %w", execCtx.Err())
	}
	
	// Execute the function
	return fn()
}

// simulateOperation simulates a remote operation with variable latency
func simulateOperation(id int) (interface{}, error) {
	// Simulate random latency
	latency := 50 + rand.Intn(200)
	time.Sleep(time.Duration(latency) * time.Millisecond)
	
	// Simulate occasional failures
	if rand.Intn(10) == 0 {
		return nil, fmt.Errorf("operation %d failed", id)
	}
	
	return fmt.Sprintf("Result from operation %d", id), nil
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create a bounded executor
	executor := NewBoundedExecutor(5, 500*time.Millisecond)
	
	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	// Track metrics
	var (
		totalOps      int
		successfulOps int
		rejectedOps   int
		failedOps     int
		mu            sync.Mutex
		wg            sync.WaitGroup
	)
	
	// Launch a bunch of operations
	for i := 0; i < 50; i++ {
		wg.Add(1)
		go func(opID int) {
			defer wg.Done()
			
			mu.Lock()
			totalOps++
			mu.Unlock()
			
			// Execute with bounded concurrency
			result, err := executor.Execute(ctx, func() (interface{}, error) {
				return simulateOperation(opID)
			})
			
			mu.Lock()
			defer mu.Unlock()
			
			if err != nil {
				if err.Error() == "operation rejected: context deadline exceeded" {
					rejectedOps++
					log.Printf("Operation %d rejected: %v", opID, err)
				} else {
					failedOps++
					log.Printf("Operation %d failed: %v", opID, err)
				}
			} else {
				successfulOps++
				log.Printf("Operation %d succeeded: %v", opID, result)
			}
		}(i)
		
		// Add some delay between operations
		time.Sleep(50 * time.Millisecond)
	}
	
	// Wait for all operations to complete
	wg.Wait()
	
	// Print summary
	log.Printf("\nSummary:")
	log.Printf("- Total operations: %d", totalOps)
	log.Printf("- Successful: %d", successfulOps)
	log.Printf("- Rejected: %d", rejectedOps)
	log.Printf("- Failed: %d", failedOps)
}

Graceful Degradation

Design systems to degrade gracefully under load:

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

// ServiceLevel represents different service quality levels
type ServiceLevel int

const (
	FullService ServiceLevel = iota
	ReducedService
	MinimalService
	EmergencyService
)

// LoadManager monitors system load and adjusts service levels
type LoadManager struct {
	currentLevel     ServiceLevel
	cpuLoad          int64
	memoryUsage      int64
	requestRate      int64
	errorRate        int64
	thresholds       map[ServiceLevel]map[string]int64
	mu               sync.RWMutex
	onLevelChange    func(from, to ServiceLevel)
	degradationRules map[ServiceLevel]func()
}

// NewLoadManager creates a new load manager
func NewLoadManager() *LoadManager {
	lm := &LoadManager{
		currentLevel: FullService,
		thresholds: map[ServiceLevel]map[string]int64{
			ReducedService: {
				"cpu":         70,  // 70% CPU
				"memory":      80,  // 80% memory
				"requestRate": 1000, // 1000 req/sec
				"errorRate":   5,   // 5% errors
			},
			MinimalService: {
				"cpu":         85,
				"memory":      90,
				"requestRate": 2000,
				"errorRate":   10,
			},
			EmergencyService: {
				"cpu":         95,
				"memory":      95,
				"requestRate": 3000,
				"errorRate":   20,
			},
		},
		degradationRules: make(map[ServiceLevel]func()),
	}
	
	// Set default level change handler
	lm.onLevelChange = func(from, to ServiceLevel) {
		log.Printf("Service level changed from %v to %v", from, to)
	}
	
	return lm
}

// SetOnLevelChange sets the callback for service level changes
func (lm *LoadManager) SetOnLevelChange(callback func(from, to ServiceLevel)) {
	lm.mu.Lock()
	defer lm.mu.Unlock()
	lm.onLevelChange = callback
}

// SetDegradationRule sets the function to call when degrading to a specific level
func (lm *LoadManager) SetDegradationRule(level ServiceLevel, rule func()) {
	lm.mu.Lock()
	defer lm.mu.Unlock()
	lm.degradationRules[level] = rule
}

// UpdateMetrics updates the load metrics
func (lm *LoadManager) UpdateMetrics(cpu, memory, requestRate, errorRate int64) {
	lm.mu.Lock()
	defer lm.mu.Unlock()
	
	lm.cpuLoad = cpu
	lm.memoryUsage = memory
	lm.requestRate = requestRate
	lm.errorRate = errorRate
	
	// Check if we need to change service level
	lm.adjustServiceLevel()
}

// adjustServiceLevel changes the service level based on current metrics
func (lm *LoadManager) adjustServiceLevel() {
	// Determine appropriate service level
	var newLevel ServiceLevel
	
	if lm.cpuLoad >= lm.thresholds[EmergencyService]["cpu"] ||
		lm.memoryUsage >= lm.thresholds[EmergencyService]["memory"] ||
		lm.requestRate >= lm.thresholds[EmergencyService]["requestRate"] ||
		lm.errorRate >= lm.thresholds[EmergencyService]["errorRate"] {
		newLevel = EmergencyService
	} else if lm.cpuLoad >= lm.thresholds[MinimalService]["cpu"] ||
		lm.memoryUsage >= lm.thresholds[MinimalService]["memory"] ||
		lm.requestRate >= lm.thresholds[MinimalService]["requestRate"] ||
		lm.errorRate >= lm.thresholds[MinimalService]["errorRate"] {
		newLevel = MinimalService
	} else if lm.cpuLoad >= lm.thresholds[ReducedService]["cpu"] ||
		lm.memoryUsage >= lm.thresholds[ReducedService]["memory"] ||
		lm.requestRate >= lm.thresholds[ReducedService]["requestRate"] ||
		lm.errorRate >= lm.thresholds[ReducedService]["errorRate"] {
		newLevel = ReducedService
	} else {
		newLevel = FullService
	}
	
	// If level changed, notify and apply degradation rules
	if newLevel != lm.currentLevel {
		oldLevel := lm.currentLevel
		lm.currentLevel = newLevel
		
		// Notify about level change
		if lm.onLevelChange != nil {
			lm.onLevelChange(oldLevel, newLevel)
		}
		
		// Apply degradation rule if available
		if rule, exists := lm.degradationRules[newLevel]; exists && rule != nil {
			rule()
		}
	}
}

// GetCurrentLevel returns the current service level
func (lm *LoadManager) GetCurrentLevel() ServiceLevel {
	lm.mu.RLock()
	defer lm.mu.RUnlock()
	return lm.currentLevel
}

// GetMetrics returns the current metrics
func (lm *LoadManager) GetMetrics() map[string]int64 {
	lm.mu.RLock()
	defer lm.mu.RUnlock()
	
	return map[string]int64{
		"cpu":         lm.cpuLoad,
		"memory":      lm.memoryUsage,
		"requestRate": lm.requestRate,
		"errorRate":   lm.errorRate,
	}
}

// DegradableService demonstrates graceful degradation
type DegradableService struct {
	loadManager    *LoadManager
	workerPool     *BoundedExecutor
	cacheEnabled   bool
	retryEnabled   bool
	featureFlags   map[string]bool
	requestCounter int64
	errorCounter   int64
}

// NewDegradableService creates a new service with degradation capabilities
func NewDegradableService() *DegradableService {
	loadManager := NewLoadManager()
	service := &DegradableService{
		loadManager:  loadManager,
		workerPool:   NewBoundedExecutor(20, 1*time.Second),
		cacheEnabled: true,
		retryEnabled: true,
		featureFlags: map[string]bool{
			"analytics":     true,
			"notifications": true,
			"recommendations": true,
			"fullHistory":   true,
		},
	}
	
	// Configure degradation rules
	loadManager.SetDegradationRule(ReducedService, func() {
		log.Println("Applying REDUCED service level:")
		log.Println("- Disabling analytics")
		log.Println("- Reducing worker pool to 15")
		
		service.featureFlags["analytics"] = false
		service.workerPool = NewBoundedExecutor(15, 800*time.Millisecond)
	})
	
	loadManager.SetDegradationRule(MinimalService, func() {
		log.Println("Applying MINIMAL service level:")
		log.Println("- Disabling recommendations")
		log.Println("- Disabling notifications")
		log.Println("- Reducing worker pool to 10")
		log.Println("- Shortening timeouts")
		
		service.featureFlags["recommendations"] = false
		service.featureFlags["notifications"] = false
		service.workerPool = NewBoundedExecutor(10, 500*time.Millisecond)
	})
	
	loadManager.SetDegradationRule(EmergencyService, func() {
		log.Println("Applying EMERGENCY service level:")
		log.Println("- Disabling full history")
		log.Println("- Disabling retries")
		log.Println("- Reducing worker pool to 5")
		log.Println("- Shortening timeouts further")
		
		service.featureFlags["fullHistory"] = false
		service.retryEnabled = false
		service.workerPool = NewBoundedExecutor(5, 300*time.Millisecond)
	})
	
	return service
}

// HandleRequest processes a request with graceful degradation
func (s *DegradableService) HandleRequest(ctx context.Context, requestID string) (string, error) {
	// Increment request counter
	atomic.AddInt64(&s.requestCounter, 1)
	
	// Get current service level
	level := s.loadManager.GetCurrentLevel()
	
	// Execute with bounded concurrency
	result, err := s.workerPool.Execute(ctx, func() (interface{}, error) {
		// Simulate processing based on service level
		switch level {
		case FullService:
			// Full processing
			time.Sleep(100 * time.Millisecond)
			if s.featureFlags["analytics"] {
				// Do analytics processing
				time.Sleep(50 * time.Millisecond)
			}
			if s.featureFlags["recommendations"] {
				// Generate recommendations
				time.Sleep(50 * time.Millisecond)
			}
		case ReducedService:
			// Skip some processing
			time.Sleep(80 * time.Millisecond)
		case MinimalService:
			// Minimal processing
			time.Sleep(50 * time.Millisecond)
		case EmergencyService:
			// Critical path only
			time.Sleep(30 * time.Millisecond)
		}
		
		// Simulate occasional failures
		if rand.Intn(100) < 5 {
			atomic.AddInt64(&s.errorCounter, 1)
			return nil, fmt.Errorf("processing error")
		}
		
		return fmt.Sprintf("Response for %s (service level: %v)", requestID, level), nil
	})
	
	if err != nil {
		// Handle error based on service level
		if s.retryEnabled && level < MinimalService {
			// Retry once for less severe degradation levels
			log.Printf("Retrying request %s after error: %v", requestID, err)
			
			result, err = s.workerPool.Execute(ctx, func() (interface{}, error) {
				time.Sleep(50 * time.Millisecond)
				if rand.Intn(100) < 5 {
					atomic.AddInt64(&s.errorCounter, 1)
					return nil, fmt.Errorf("retry failed")
				}
				return fmt.Sprintf("Retry response for %s (service level: %v)", requestID, level), nil
			})
		}
		
		if err != nil {
			atomic.AddInt64(&s.errorCounter, 1)
			return "", err
		}
	}
	
	return result.(string), nil
}

// SimulateLoad generates synthetic load for the service
func (s *DegradableService) SimulateLoad(ctx context.Context, duration time.Duration) {
	// Reset counters
	atomic.StoreInt64(&s.requestCounter, 0)
	atomic.StoreInt64(&s.errorCounter, 0)
	
	// Start time
	startTime := time.Now()
	endTime := startTime.Add(duration)
	
	// Launch load generator
	var wg sync.WaitGroup
	
	// Start with low load
	go func() {
		rate := int64(50) // requests per second
		
		for time.Now().Before(endTime) {
			// Calculate how many requests to send this second
			currentRate := atomic.LoadInt64(&rate)
			interval := time.Second / time.Duration(currentRate)
			
			for i := 0; i < int(currentRate); i++ {
				wg.Add(1)
				go func(reqID string) {
					defer wg.Done()
					
					reqCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
					defer cancel()
					
					_, _ = s.HandleRequest(reqCtx, reqID)
				}(fmt.Sprintf("req-%d", time.Now().UnixNano()))
				
				time.Sleep(interval)
			}
			
			// Increase load over time
			elapsed := time.Since(startTime)
			progress := float64(elapsed) / float64(duration)
			
			// Simulate a load curve that peaks in the middle
			if progress < 0.5 {
				// Ramp up to peak
				newRate := 50 + int64(progress*2*1950) // Max 2000 req/sec at peak
				atomic.StoreInt64(&rate, newRate)
			} else {
				// Ramp down from peak
				newRate := 50 + int64((1-progress)*2*1950)
				atomic.StoreInt64(&rate, newRate)
			}
			
			// Update load metrics every second
			cpuLoad := 30 + int64(progress*70) // Simulate CPU increasing with load
			if progress > 0.5 {
				cpuLoad = 30 + int64((1-progress)*140)
			}
			
			memoryUsage := 40 + int64(progress*55) // Memory grows and stays high
			
			// Calculate request rate (requests per second)
			requestRate := atomic.LoadInt64(&s.requestCounter)
			atomic.StoreInt64(&s.requestCounter, 0)
			
			// Calculate error rate (percentage)
			errorCount := atomic.LoadInt64(&s.errorCounter)
			atomic.StoreInt64(&s.errorCounter, 0)
			
			var errorRate int64
			if requestRate > 0 {
				errorRate = (errorCount * 100) / requestRate
			}
			
			// Update load manager
			s.loadManager.UpdateMetrics(cpuLoad, memoryUsage, requestRate, errorRate)
			
			// Log current status
			level := s.loadManager.GetCurrentLevel()
			metrics := s.loadManager.GetMetrics()
			log.Printf("Load: CPU %d%%, Mem %d%%, Rate %d req/s, Errors %d%%, Level %v",
				metrics["cpu"], metrics["memory"], metrics["requestRate"], 
				metrics["errorRate"], level)
			
			// Wait for next second
			time.Sleep(1 * time.Second)
		}
	}()
	
	// Wait for load test to complete
	time.Sleep(duration)
	wg.Wait()
	
	log.Println("Load test completed")
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create a degradable service
	service := NewDegradableService()
	
	// Create a context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	
	// Run a load test
	log.Println("Starting load test with graceful degradation...")
	service.SimulateLoad(ctx, 30*time.Second)
}

Observability Integration

Always instrument your concurrent code for proper observability:

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"net/http"
	"os"
	"runtime"
	"sync"
	"sync/atomic"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

// Metrics represents a collection of Prometheus metrics
type Metrics struct {
	requestCounter   *prometheus.CounterVec
	requestDuration  *prometheus.HistogramVec
	goroutineGauge   prometheus.Gauge
	workerPoolSize   *prometheus.GaugeVec
	queueDepth       *prometheus.GaugeVec
	errorCounter     *prometheus.CounterVec
	inFlightRequests *prometheus.GaugeVec
}

// NewMetrics creates and registers Prometheus metrics
func NewMetrics(reg prometheus.Registerer) *Metrics {
	m := &Metrics{
		requestCounter: prometheus.NewCounterVec(
			prometheus.CounterOpts{
				Name: "requests_total",
				Help: "Total number of requests processed",
			},
			[]string{"service", "endpoint", "status"},
		),
		requestDuration: prometheus.NewHistogramVec(
			prometheus.HistogramOpts{
				Name:    "request_duration_seconds",
				Help:    "Request duration in seconds",
				Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
			},
			[]string{"service", "endpoint"},
		),
		goroutineGauge: prometheus.NewGauge(
			prometheus.GaugeOpts{
				Name: "goroutines_total",
				Help: "Current number of goroutines",
			},
		),
		workerPoolSize: prometheus.NewGaugeVec(
			prometheus.GaugeOpts{
				Name: "worker_pool_size",
				Help: "Current size of worker pools",
			},
			[]string{"pool"},
		),
		queueDepth: prometheus.NewGaugeVec(
			prometheus.GaugeOpts{
				Name: "queue_depth",
				Help: "Current depth of work queues",
			},
			[]string{"queue"},
		),
		errorCounter: prometheus.NewCounterVec(
			prometheus.CounterOpts{
				Name: "errors_total",
				Help: "Total number of errors",
			},
			[]string{"service", "type"},
		),
		inFlightRequests: prometheus.NewGaugeVec(
			prometheus.GaugeOpts{
				Name: "in_flight_requests",
				Help: "Current number of in-flight requests",
			},
			[]string{"service"},
		),
	}

	// Register all metrics
	reg.MustRegister(
		m.requestCounter,
		m.requestDuration,
		m.goroutineGauge,
		m.workerPoolSize,
		m.queueDepth,
		m.errorCounter,
		m.inFlightRequests,
	)

	// Start goroutine collector
	go func() {
		for {
			m.goroutineGauge.Set(float64(runtime.NumGoroutine()))
			time.Sleep(1 * time.Second)
		}
	}()

	return m
}

// InstrumentedWorkerPool is a worker pool with metrics
type InstrumentedWorkerPool struct {
	name       string
	workers    int
	queue      chan Job
	metrics    *Metrics
	wg         sync.WaitGroup
	shutdown   chan struct{}
	processing int32
}

// Job represents a unit of work
type Job struct {
	ID       string
	Handler  func(ctx context.Context) (interface{}, error)
	Priority int
}

// NewInstrumentedWorkerPool creates a new worker pool with metrics
func NewInstrumentedWorkerPool(name string, workers, queueSize int, metrics *Metrics) *InstrumentedWorkerPool {
	pool := &InstrumentedWorkerPool{
		name:     name,
		workers:  workers,
		queue:    make(chan Job, queueSize),
		metrics:  metrics,
		shutdown: make(chan struct{}),
	}

	// Update initial metrics
	metrics.workerPoolSize.WithLabelValues(name).Set(float64(workers))
	metrics.queueDepth.WithL
abelValues(name).Set(0)

	// Start workers
	for i := 0; i < workers; i++ {
		pool.wg.Add(1)
		go pool.worker()
	}

	return pool
}

// worker processes jobs from the queue
func (p *InstrumentedWorkerPool) worker() {
	defer p.wg.Done()

	for {
		select {
		case job, ok := <-p.queue:
			if !ok {
				return // Channel closed
			}

			// Update metrics
			atomic.AddInt32(&p.processing, 1)
			p.metrics.inFlightRequests.WithLabelValues(p.name).Inc()

			// Process the job with tracing
			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
			startTime := time.Now()

			result, err := job.Handler(ctx)
			duration := time.Since(startTime)

			// Update metrics based on result
			p.metrics.requestDuration.WithLabelValues(p.name, job.ID).Observe(duration.Seconds())

			if err != nil {
				p.metrics.errorCounter.WithLabelValues(p.name, "job_error").Inc()
				p.metrics.requestCounter.WithLabelValues(p.name, job.ID, "error").Inc()
				log.Printf("[%s] Job %s failed: %v", p.name, job.ID, err)
			} else {
				p.metrics.requestCounter.WithLabelValues(p.name, job.ID, "success").Inc()
				log.Printf("[%s] Job %s succeeded: %v", p.name, job.ID, result)
			}

			// Update in-flight metrics
			atomic.AddInt32(&p.processing, -1)
			p.metrics.inFlightRequests.WithLabelValues(p.name).Dec()

			// Update queue depth metric
			p.metrics.queueDepth.WithLabelValues(p.name).Set(float64(len(p.queue)))

			cancel() // Always cancel the context

		case <-p.shutdown:
			return
		}
	}
}

// Submit adds a job to the worker pool
func (p *InstrumentedWorkerPool) Submit(job Job) error {
	select {
	case p.queue <- job:
		// Update queue depth metric
		p.metrics.queueDepth.WithLabelValues(p.name).Set(float64(len(p.queue)))
		return nil
	default:
		p.metrics.errorCounter.WithLabelValues(p.name, "queue_full").Inc()
		return fmt.Errorf("queue is full")
	}
}

// Shutdown stops the worker pool
func (p *InstrumentedWorkerPool) Shutdown() {
	close(p.shutdown)
	close(p.queue)
	p.wg.Wait()

	// Update metrics
	p.metrics.workerPoolSize.WithLabelValues(p.name).Set(0)
	p.metrics.queueDepth.WithLabelValues(p.name).Set(0)
	p.metrics.inFlightRequests.WithLabelValues(p.name).Set(0)
}

// GetMetrics returns current metrics for the pool
func (p *InstrumentedWorkerPool) GetMetrics() map[string]interface{} {
	return map[string]interface{}{
		"workers":    p.workers,
		"queue_size": len(p.queue),
		"processing": atomic.LoadInt32(&p.processing),
	}
}

func main() {
	// Create a Prometheus registry
	reg := prometheus.NewRegistry()

	// Create metrics
	metrics := NewMetrics(reg)

	// Start HTTP server for Prometheus metrics
	http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
	go func() {
		log.Println("Starting metrics server on :8080")
		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Printf("Metrics server error: %v", err)
		}
	}()

	// Create worker pools
	highPriorityPool := NewInstrumentedWorkerPool("high-priority", 5, 10, metrics)
	lowPriorityPool := NewInstrumentedWorkerPool("low-priority", 3, 20, metrics)

	// Create a context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Submit jobs to the pools
	for i := 0; i < 20; i++ {
		// Create jobs with different priorities
		highPriorityJob := Job{
			ID: fmt.Sprintf("high-%d", i),
			Handler: func(ctx context.Context) (interface{}, error) {
				// Simulate work
				time.Sleep(100 * time.Millisecond)
				
				// Simulate occasional errors
				if rand.Intn(10) == 0 {
					return nil, fmt.Errorf("high priority job failed")
				}
				
				return "high priority result", nil
			},
			Priority: 2,
		}
		
		lowPriorityJob := Job{
			ID: fmt.Sprintf("low-%d", i),
			Handler: func(ctx context.Context) (interface{}, error) {
				// Simulate work
				time.Sleep(200 * time.Millisecond)
				
				// Simulate occasional errors
				if rand.Intn(5) == 0 {
					return nil, fmt.Errorf("low priority job failed")
				}
				
				return "low priority result", nil
			},
			Priority: 1,
		}
		
		// Submit jobs
		if err := highPriorityPool.Submit(highPriorityJob); err != nil {
			log.Printf("Failed to submit high priority job: %v", err)
		}
		
		if err := lowPriorityPool.Submit(lowPriorityJob); err != nil {
			log.Printf("Failed to submit low priority job: %v", err)
		}
		
		// Add some delay between submissions
		time.Sleep(50 * time.Millisecond)
	}
	
	// Wait for jobs to complete
	time.Sleep(5 * time.Second)
	
	// Print metrics
	log.Printf("High priority pool metrics: %v", highPriorityPool.GetMetrics())
	log.Printf("Low priority pool metrics: %v", lowPriorityPool.GetMetrics())
	
	// Shutdown pools
	highPriorityPool.Shutdown()
	lowPriorityPool.Shutdown()
	
	log.Println("Worker pools shut down")
}

This observability integration approach is essential for distributed systems because it:

  • Provides real-time visibility into system behavior
  • Enables detection of performance bottlenecks
  • Facilitates capacity planning and scaling decisions
  • Helps identify and diagnose issues quickly
  • Supports data-driven optimization of concurrent code