Production Best Practices
When deploying concurrent Go code in production distributed systems, following these best practices can help ensure reliability and performance.
Context Propagation
Always propagate context through your entire call chain to ensure proper cancellation and timeout handling:
package main
import (
"context"
"fmt"
"log"
"time"
)
// Service represents a component in a distributed system
type Service struct {
name string
dependencies []*Service
processingTime time.Duration
}
// NewService creates a new service with dependencies
func NewService(name string, processingTime time.Duration, dependencies ...*Service) *Service {
return &Service{
name: name,
dependencies: dependencies,
processingTime: processingTime,
}
}
// Process handles a request, propagating context to dependencies
func (s *Service) Process(ctx context.Context, requestID string) (string, error) {
// Check if context is already canceled
if ctx.Err() != nil {
return "", ctx.Err()
}
log.Printf("[%s] Processing request %s", s.name, requestID)
// Call dependencies first
for _, dep := range s.dependencies {
// Create a child context with timeout for the dependency call
depCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel() // Always cancel to release resources
result, err := dep.Process(depCtx, requestID)
if err != nil {
log.Printf("[%s] Dependency %s failed: %v", s.name, dep.name, err)
return "", fmt.Errorf("dependency %s failed: %w", dep.name, err)
}
log.Printf("[%s] Dependency %s returned: %s", s.name, dep.name, result)
}
// Simulate processing time
select {
case <-time.After(s.processingTime):
// Processing completed successfully
case <-ctx.Done():
// Context was canceled
return "", ctx.Err()
}
response := fmt.Sprintf("Response from %s for request %s", s.name, requestID)
log.Printf("[%s] Completed request %s", s.name, requestID)
return response, nil
}
func main() {
// Create a service dependency graph
serviceD := NewService("ServiceD", 100*time.Millisecond)
serviceE := NewService("ServiceE", 150*time.Millisecond)
serviceB := NewService("ServiceB", 200*time.Millisecond, serviceD, serviceE)
serviceC := NewService("ServiceC", 150*time.Millisecond, serviceE)
serviceA := NewService("ServiceA", 100*time.Millisecond, serviceB, serviceC)
// Create a root context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
defer cancel()
// Add request ID to context
requestID := fmt.Sprintf("req-%d", time.Now().UnixNano())
// Process the request
log.Printf("Starting request %s with 800ms timeout", requestID)
result, err := serviceA.Process(ctx, requestID)
if err != nil {
log.Printf("Request failed: %v", err)
} else {
log.Printf("Request succeeded: %s", result)
}
// Try another request with insufficient timeout
log.Println("\nStarting another request with insufficient timeout")
ctx2, cancel2 := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel2()
requestID2 := fmt.Sprintf("req-%d", time.Now().UnixNano())
result2, err2 := serviceA.Process(ctx2, requestID2)
if err2 != nil {
log.Printf("Request failed as expected: %v", err2)
} else {
log.Printf("Request succeeded unexpectedly: %s", result2)
}
}
Bounded Concurrency
Always limit concurrency to prevent resource exhaustion:
package main
import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"time"
)
// BoundedExecutor limits the number of concurrent operations
type BoundedExecutor struct {
semaphore chan struct{}
timeout time.Duration
}
// NewBoundedExecutor creates a new bounded executor
func NewBoundedExecutor(maxConcurrent int, timeout time.Duration) *BoundedExecutor {
return &BoundedExecutor{
semaphore: make(chan struct{}, maxConcurrent),
timeout: timeout,
}
}
// Execute runs the given function with bounded concurrency
func (e *BoundedExecutor) Execute(ctx context.Context, fn func() (interface{}, error)) (interface{}, error) {
// Create a context with timeout
execCtx, cancel := context.WithTimeout(ctx, e.timeout)
defer cancel()
// Try to acquire a semaphore slot
select {
case e.semaphore <- struct{}{}:
// Acquired a slot
defer func() { <-e.semaphore }()
case <-execCtx.Done():
// Couldn't acquire a slot within timeout
return nil, fmt.Errorf("operation rejected: %w", execCtx.Err())
}
// Execute the function
return fn()
}
// simulateOperation simulates a remote operation with variable latency
func simulateOperation(id int) (interface{}, error) {
// Simulate random latency
latency := 50 + rand.Intn(200)
time.Sleep(time.Duration(latency) * time.Millisecond)
// Simulate occasional failures
if rand.Intn(10) == 0 {
return nil, fmt.Errorf("operation %d failed", id)
}
return fmt.Sprintf("Result from operation %d", id), nil
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create a bounded executor
executor := NewBoundedExecutor(5, 500*time.Millisecond)
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Track metrics
var (
totalOps int
successfulOps int
rejectedOps int
failedOps int
mu sync.Mutex
wg sync.WaitGroup
)
// Launch a bunch of operations
for i := 0; i < 50; i++ {
wg.Add(1)
go func(opID int) {
defer wg.Done()
mu.Lock()
totalOps++
mu.Unlock()
// Execute with bounded concurrency
result, err := executor.Execute(ctx, func() (interface{}, error) {
return simulateOperation(opID)
})
mu.Lock()
defer mu.Unlock()
if err != nil {
if err.Error() == "operation rejected: context deadline exceeded" {
rejectedOps++
log.Printf("Operation %d rejected: %v", opID, err)
} else {
failedOps++
log.Printf("Operation %d failed: %v", opID, err)
}
} else {
successfulOps++
log.Printf("Operation %d succeeded: %v", opID, result)
}
}(i)
// Add some delay between operations
time.Sleep(50 * time.Millisecond)
}
// Wait for all operations to complete
wg.Wait()
// Print summary
log.Printf("\nSummary:")
log.Printf("- Total operations: %d", totalOps)
log.Printf("- Successful: %d", successfulOps)
log.Printf("- Rejected: %d", rejectedOps)
log.Printf("- Failed: %d", failedOps)
}
Graceful Degradation
Design systems to degrade gracefully under load:
package main
import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// ServiceLevel represents different service quality levels
type ServiceLevel int
const (
FullService ServiceLevel = iota
ReducedService
MinimalService
EmergencyService
)
// LoadManager monitors system load and adjusts service levels
type LoadManager struct {
currentLevel ServiceLevel
cpuLoad int64
memoryUsage int64
requestRate int64
errorRate int64
thresholds map[ServiceLevel]map[string]int64
mu sync.RWMutex
onLevelChange func(from, to ServiceLevel)
degradationRules map[ServiceLevel]func()
}
// NewLoadManager creates a new load manager
func NewLoadManager() *LoadManager {
lm := &LoadManager{
currentLevel: FullService,
thresholds: map[ServiceLevel]map[string]int64{
ReducedService: {
"cpu": 70, // 70% CPU
"memory": 80, // 80% memory
"requestRate": 1000, // 1000 req/sec
"errorRate": 5, // 5% errors
},
MinimalService: {
"cpu": 85,
"memory": 90,
"requestRate": 2000,
"errorRate": 10,
},
EmergencyService: {
"cpu": 95,
"memory": 95,
"requestRate": 3000,
"errorRate": 20,
},
},
degradationRules: make(map[ServiceLevel]func()),
}
// Set default level change handler
lm.onLevelChange = func(from, to ServiceLevel) {
log.Printf("Service level changed from %v to %v", from, to)
}
return lm
}
// SetOnLevelChange sets the callback for service level changes
func (lm *LoadManager) SetOnLevelChange(callback func(from, to ServiceLevel)) {
lm.mu.Lock()
defer lm.mu.Unlock()
lm.onLevelChange = callback
}
// SetDegradationRule sets the function to call when degrading to a specific level
func (lm *LoadManager) SetDegradationRule(level ServiceLevel, rule func()) {
lm.mu.Lock()
defer lm.mu.Unlock()
lm.degradationRules[level] = rule
}
// UpdateMetrics updates the load metrics
func (lm *LoadManager) UpdateMetrics(cpu, memory, requestRate, errorRate int64) {
lm.mu.Lock()
defer lm.mu.Unlock()
lm.cpuLoad = cpu
lm.memoryUsage = memory
lm.requestRate = requestRate
lm.errorRate = errorRate
// Check if we need to change service level
lm.adjustServiceLevel()
}
// adjustServiceLevel changes the service level based on current metrics
func (lm *LoadManager) adjustServiceLevel() {
// Determine appropriate service level
var newLevel ServiceLevel
if lm.cpuLoad >= lm.thresholds[EmergencyService]["cpu"] ||
lm.memoryUsage >= lm.thresholds[EmergencyService]["memory"] ||
lm.requestRate >= lm.thresholds[EmergencyService]["requestRate"] ||
lm.errorRate >= lm.thresholds[EmergencyService]["errorRate"] {
newLevel = EmergencyService
} else if lm.cpuLoad >= lm.thresholds[MinimalService]["cpu"] ||
lm.memoryUsage >= lm.thresholds[MinimalService]["memory"] ||
lm.requestRate >= lm.thresholds[MinimalService]["requestRate"] ||
lm.errorRate >= lm.thresholds[MinimalService]["errorRate"] {
newLevel = MinimalService
} else if lm.cpuLoad >= lm.thresholds[ReducedService]["cpu"] ||
lm.memoryUsage >= lm.thresholds[ReducedService]["memory"] ||
lm.requestRate >= lm.thresholds[ReducedService]["requestRate"] ||
lm.errorRate >= lm.thresholds[ReducedService]["errorRate"] {
newLevel = ReducedService
} else {
newLevel = FullService
}
// If level changed, notify and apply degradation rules
if newLevel != lm.currentLevel {
oldLevel := lm.currentLevel
lm.currentLevel = newLevel
// Notify about level change
if lm.onLevelChange != nil {
lm.onLevelChange(oldLevel, newLevel)
}
// Apply degradation rule if available
if rule, exists := lm.degradationRules[newLevel]; exists && rule != nil {
rule()
}
}
}
// GetCurrentLevel returns the current service level
func (lm *LoadManager) GetCurrentLevel() ServiceLevel {
lm.mu.RLock()
defer lm.mu.RUnlock()
return lm.currentLevel
}
// GetMetrics returns the current metrics
func (lm *LoadManager) GetMetrics() map[string]int64 {
lm.mu.RLock()
defer lm.mu.RUnlock()
return map[string]int64{
"cpu": lm.cpuLoad,
"memory": lm.memoryUsage,
"requestRate": lm.requestRate,
"errorRate": lm.errorRate,
}
}
// DegradableService demonstrates graceful degradation
type DegradableService struct {
loadManager *LoadManager
workerPool *BoundedExecutor
cacheEnabled bool
retryEnabled bool
featureFlags map[string]bool
requestCounter int64
errorCounter int64
}
// NewDegradableService creates a new service with degradation capabilities
func NewDegradableService() *DegradableService {
loadManager := NewLoadManager()
service := &DegradableService{
loadManager: loadManager,
workerPool: NewBoundedExecutor(20, 1*time.Second),
cacheEnabled: true,
retryEnabled: true,
featureFlags: map[string]bool{
"analytics": true,
"notifications": true,
"recommendations": true,
"fullHistory": true,
},
}
// Configure degradation rules
loadManager.SetDegradationRule(ReducedService, func() {
log.Println("Applying REDUCED service level:")
log.Println("- Disabling analytics")
log.Println("- Reducing worker pool to 15")
service.featureFlags["analytics"] = false
service.workerPool = NewBoundedExecutor(15, 800*time.Millisecond)
})
loadManager.SetDegradationRule(MinimalService, func() {
log.Println("Applying MINIMAL service level:")
log.Println("- Disabling recommendations")
log.Println("- Disabling notifications")
log.Println("- Reducing worker pool to 10")
log.Println("- Shortening timeouts")
service.featureFlags["recommendations"] = false
service.featureFlags["notifications"] = false
service.workerPool = NewBoundedExecutor(10, 500*time.Millisecond)
})
loadManager.SetDegradationRule(EmergencyService, func() {
log.Println("Applying EMERGENCY service level:")
log.Println("- Disabling full history")
log.Println("- Disabling retries")
log.Println("- Reducing worker pool to 5")
log.Println("- Shortening timeouts further")
service.featureFlags["fullHistory"] = false
service.retryEnabled = false
service.workerPool = NewBoundedExecutor(5, 300*time.Millisecond)
})
return service
}
// HandleRequest processes a request with graceful degradation
func (s *DegradableService) HandleRequest(ctx context.Context, requestID string) (string, error) {
// Increment request counter
atomic.AddInt64(&s.requestCounter, 1)
// Get current service level
level := s.loadManager.GetCurrentLevel()
// Execute with bounded concurrency
result, err := s.workerPool.Execute(ctx, func() (interface{}, error) {
// Simulate processing based on service level
switch level {
case FullService:
// Full processing
time.Sleep(100 * time.Millisecond)
if s.featureFlags["analytics"] {
// Do analytics processing
time.Sleep(50 * time.Millisecond)
}
if s.featureFlags["recommendations"] {
// Generate recommendations
time.Sleep(50 * time.Millisecond)
}
case ReducedService:
// Skip some processing
time.Sleep(80 * time.Millisecond)
case MinimalService:
// Minimal processing
time.Sleep(50 * time.Millisecond)
case EmergencyService:
// Critical path only
time.Sleep(30 * time.Millisecond)
}
// Simulate occasional failures
if rand.Intn(100) < 5 {
atomic.AddInt64(&s.errorCounter, 1)
return nil, fmt.Errorf("processing error")
}
return fmt.Sprintf("Response for %s (service level: %v)", requestID, level), nil
})
if err != nil {
// Handle error based on service level
if s.retryEnabled && level < MinimalService {
// Retry once for less severe degradation levels
log.Printf("Retrying request %s after error: %v", requestID, err)
result, err = s.workerPool.Execute(ctx, func() (interface{}, error) {
time.Sleep(50 * time.Millisecond)
if rand.Intn(100) < 5 {
atomic.AddInt64(&s.errorCounter, 1)
return nil, fmt.Errorf("retry failed")
}
return fmt.Sprintf("Retry response for %s (service level: %v)", requestID, level), nil
})
}
if err != nil {
atomic.AddInt64(&s.errorCounter, 1)
return "", err
}
}
return result.(string), nil
}
// SimulateLoad generates synthetic load for the service
func (s *DegradableService) SimulateLoad(ctx context.Context, duration time.Duration) {
// Reset counters
atomic.StoreInt64(&s.requestCounter, 0)
atomic.StoreInt64(&s.errorCounter, 0)
// Start time
startTime := time.Now()
endTime := startTime.Add(duration)
// Launch load generator
var wg sync.WaitGroup
// Start with low load
go func() {
rate := int64(50) // requests per second
for time.Now().Before(endTime) {
// Calculate how many requests to send this second
currentRate := atomic.LoadInt64(&rate)
interval := time.Second / time.Duration(currentRate)
for i := 0; i < int(currentRate); i++ {
wg.Add(1)
go func(reqID string) {
defer wg.Done()
reqCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
_, _ = s.HandleRequest(reqCtx, reqID)
}(fmt.Sprintf("req-%d", time.Now().UnixNano()))
time.Sleep(interval)
}
// Increase load over time
elapsed := time.Since(startTime)
progress := float64(elapsed) / float64(duration)
// Simulate a load curve that peaks in the middle
if progress < 0.5 {
// Ramp up to peak
newRate := 50 + int64(progress*2*1950) // Max 2000 req/sec at peak
atomic.StoreInt64(&rate, newRate)
} else {
// Ramp down from peak
newRate := 50 + int64((1-progress)*2*1950)
atomic.StoreInt64(&rate, newRate)
}
// Update load metrics every second
cpuLoad := 30 + int64(progress*70) // Simulate CPU increasing with load
if progress > 0.5 {
cpuLoad = 30 + int64((1-progress)*140)
}
memoryUsage := 40 + int64(progress*55) // Memory grows and stays high
// Calculate request rate (requests per second)
requestRate := atomic.LoadInt64(&s.requestCounter)
atomic.StoreInt64(&s.requestCounter, 0)
// Calculate error rate (percentage)
errorCount := atomic.LoadInt64(&s.errorCounter)
atomic.StoreInt64(&s.errorCounter, 0)
var errorRate int64
if requestRate > 0 {
errorRate = (errorCount * 100) / requestRate
}
// Update load manager
s.loadManager.UpdateMetrics(cpuLoad, memoryUsage, requestRate, errorRate)
// Log current status
level := s.loadManager.GetCurrentLevel()
metrics := s.loadManager.GetMetrics()
log.Printf("Load: CPU %d%%, Mem %d%%, Rate %d req/s, Errors %d%%, Level %v",
metrics["cpu"], metrics["memory"], metrics["requestRate"],
metrics["errorRate"], level)
// Wait for next second
time.Sleep(1 * time.Second)
}
}()
// Wait for load test to complete
time.Sleep(duration)
wg.Wait()
log.Println("Load test completed")
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create a degradable service
service := NewDegradableService()
// Create a context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Run a load test
log.Println("Starting load test with graceful degradation...")
service.SimulateLoad(ctx, 30*time.Second)
}
Observability Integration
Always instrument your concurrent code for proper observability:
package main
import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Metrics represents a collection of Prometheus metrics
type Metrics struct {
requestCounter *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
goroutineGauge prometheus.Gauge
workerPoolSize *prometheus.GaugeVec
queueDepth *prometheus.GaugeVec
errorCounter *prometheus.CounterVec
inFlightRequests *prometheus.GaugeVec
}
// NewMetrics creates and registers Prometheus metrics
func NewMetrics(reg prometheus.Registerer) *Metrics {
m := &Metrics{
requestCounter: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "requests_total",
Help: "Total number of requests processed",
},
[]string{"service", "endpoint", "status"},
),
requestDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "request_duration_seconds",
Help: "Request duration in seconds",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
},
[]string{"service", "endpoint"},
),
goroutineGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "goroutines_total",
Help: "Current number of goroutines",
},
),
workerPoolSize: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "worker_pool_size",
Help: "Current size of worker pools",
},
[]string{"pool"},
),
queueDepth: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "queue_depth",
Help: "Current depth of work queues",
},
[]string{"queue"},
),
errorCounter: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "errors_total",
Help: "Total number of errors",
},
[]string{"service", "type"},
),
inFlightRequests: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "in_flight_requests",
Help: "Current number of in-flight requests",
},
[]string{"service"},
),
}
// Register all metrics
reg.MustRegister(
m.requestCounter,
m.requestDuration,
m.goroutineGauge,
m.workerPoolSize,
m.queueDepth,
m.errorCounter,
m.inFlightRequests,
)
// Start goroutine collector
go func() {
for {
m.goroutineGauge.Set(float64(runtime.NumGoroutine()))
time.Sleep(1 * time.Second)
}
}()
return m
}
// InstrumentedWorkerPool is a worker pool with metrics
type InstrumentedWorkerPool struct {
name string
workers int
queue chan Job
metrics *Metrics
wg sync.WaitGroup
shutdown chan struct{}
processing int32
}
// Job represents a unit of work
type Job struct {
ID string
Handler func(ctx context.Context) (interface{}, error)
Priority int
}
// NewInstrumentedWorkerPool creates a new worker pool with metrics
func NewInstrumentedWorkerPool(name string, workers, queueSize int, metrics *Metrics) *InstrumentedWorkerPool {
pool := &InstrumentedWorkerPool{
name: name,
workers: workers,
queue: make(chan Job, queueSize),
metrics: metrics,
shutdown: make(chan struct{}),
}
// Update initial metrics
metrics.workerPoolSize.WithLabelValues(name).Set(float64(workers))
metrics.queueDepth.WithL
abelValues(name).Set(0)
// Start workers
for i := 0; i < workers; i++ {
pool.wg.Add(1)
go pool.worker()
}
return pool
}
// worker processes jobs from the queue
func (p *InstrumentedWorkerPool) worker() {
defer p.wg.Done()
for {
select {
case job, ok := <-p.queue:
if !ok {
return // Channel closed
}
// Update metrics
atomic.AddInt32(&p.processing, 1)
p.metrics.inFlightRequests.WithLabelValues(p.name).Inc()
// Process the job with tracing
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
startTime := time.Now()
result, err := job.Handler(ctx)
duration := time.Since(startTime)
// Update metrics based on result
p.metrics.requestDuration.WithLabelValues(p.name, job.ID).Observe(duration.Seconds())
if err != nil {
p.metrics.errorCounter.WithLabelValues(p.name, "job_error").Inc()
p.metrics.requestCounter.WithLabelValues(p.name, job.ID, "error").Inc()
log.Printf("[%s] Job %s failed: %v", p.name, job.ID, err)
} else {
p.metrics.requestCounter.WithLabelValues(p.name, job.ID, "success").Inc()
log.Printf("[%s] Job %s succeeded: %v", p.name, job.ID, result)
}
// Update in-flight metrics
atomic.AddInt32(&p.processing, -1)
p.metrics.inFlightRequests.WithLabelValues(p.name).Dec()
// Update queue depth metric
p.metrics.queueDepth.WithLabelValues(p.name).Set(float64(len(p.queue)))
cancel() // Always cancel the context
case <-p.shutdown:
return
}
}
}
// Submit adds a job to the worker pool
func (p *InstrumentedWorkerPool) Submit(job Job) error {
select {
case p.queue <- job:
// Update queue depth metric
p.metrics.queueDepth.WithLabelValues(p.name).Set(float64(len(p.queue)))
return nil
default:
p.metrics.errorCounter.WithLabelValues(p.name, "queue_full").Inc()
return fmt.Errorf("queue is full")
}
}
// Shutdown stops the worker pool
func (p *InstrumentedWorkerPool) Shutdown() {
close(p.shutdown)
close(p.queue)
p.wg.Wait()
// Update metrics
p.metrics.workerPoolSize.WithLabelValues(p.name).Set(0)
p.metrics.queueDepth.WithLabelValues(p.name).Set(0)
p.metrics.inFlightRequests.WithLabelValues(p.name).Set(0)
}
// GetMetrics returns current metrics for the pool
func (p *InstrumentedWorkerPool) GetMetrics() map[string]interface{} {
return map[string]interface{}{
"workers": p.workers,
"queue_size": len(p.queue),
"processing": atomic.LoadInt32(&p.processing),
}
}
func main() {
// Create a Prometheus registry
reg := prometheus.NewRegistry()
// Create metrics
metrics := NewMetrics(reg)
// Start HTTP server for Prometheus metrics
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
go func() {
log.Println("Starting metrics server on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Printf("Metrics server error: %v", err)
}
}()
// Create worker pools
highPriorityPool := NewInstrumentedWorkerPool("high-priority", 5, 10, metrics)
lowPriorityPool := NewInstrumentedWorkerPool("low-priority", 3, 20, metrics)
// Create a context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Submit jobs to the pools
for i := 0; i < 20; i++ {
// Create jobs with different priorities
highPriorityJob := Job{
ID: fmt.Sprintf("high-%d", i),
Handler: func(ctx context.Context) (interface{}, error) {
// Simulate work
time.Sleep(100 * time.Millisecond)
// Simulate occasional errors
if rand.Intn(10) == 0 {
return nil, fmt.Errorf("high priority job failed")
}
return "high priority result", nil
},
Priority: 2,
}
lowPriorityJob := Job{
ID: fmt.Sprintf("low-%d", i),
Handler: func(ctx context.Context) (interface{}, error) {
// Simulate work
time.Sleep(200 * time.Millisecond)
// Simulate occasional errors
if rand.Intn(5) == 0 {
return nil, fmt.Errorf("low priority job failed")
}
return "low priority result", nil
},
Priority: 1,
}
// Submit jobs
if err := highPriorityPool.Submit(highPriorityJob); err != nil {
log.Printf("Failed to submit high priority job: %v", err)
}
if err := lowPriorityPool.Submit(lowPriorityJob); err != nil {
log.Printf("Failed to submit low priority job: %v", err)
}
// Add some delay between submissions
time.Sleep(50 * time.Millisecond)
}
// Wait for jobs to complete
time.Sleep(5 * time.Second)
// Print metrics
log.Printf("High priority pool metrics: %v", highPriorityPool.GetMetrics())
log.Printf("Low priority pool metrics: %v", lowPriorityPool.GetMetrics())
// Shutdown pools
highPriorityPool.Shutdown()
lowPriorityPool.Shutdown()
log.Println("Worker pools shut down")
}
This observability integration approach is essential for distributed systems because it:
- Provides real-time visibility into system behavior
- Enables detection of performance bottlenecks
- Facilitates capacity planning and scaling decisions
- Helps identify and diagnose issues quickly
- Supports data-driven optimization of concurrent code