Production Deployment Strategies

Deploying worker pools in production environments requires careful consideration of resource utilization, graceful shutdown, and integration with monitoring systems.

Graceful Shutdown Worker Pool

Proper shutdown handling is critical for production systems. Let’s implement a worker pool with comprehensive graceful shutdown capabilities:

package main

import (
func NewBackpressurePool(workers, maxQueueDepth int) *Backp
	"context"
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

// GracefulPool represents a worker pool with graceful shutdown capabilities
type GracefulPool struct {
	workers      int
	tasks        chan func(ctx context.Context) error
	wg           sync.WaitGroup
	ctx          context.Context
	cancel       context.CancelFunc
	shutdownCh   chan struct{}
	drainTimeout time.Duration
}

// NewGracefulPool creates a new worker pool with graceful shutdown
func NewGracefulPool(workers, queueSize int, drainTimeout time.Duration) *GracefulPool {
	ctx, cancel := context.WithCancel(context.Background())
	
	return &GracefulPool{
		workers:      workers,
		tasks:        make(chan func(ctx context.Context) error, queueSize),
		ctx:          ctx,
		cancel:       cancel,
		shutdownCh:   make(chan struct{}),
		drainTimeout: drainTimeout,
	}
}

// Start launches the worker pool
func (p *GracefulPool) Start() {
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go func(workerID int) {
			defer p.wg.Done()
			
			for {
				select {
				case task, ok := <-p.tasks:
					if !ok {
						// Channel closed, worker should exit
						fmt.Printf("Worker %d shutting down (channel closed)\n", workerID)
						return
					}
					
					// Execute the task with the pool's context
					err := task(p.ctx)
					if err != nil {
						fmt.Printf("Worker %d task error: %v\n", workerID, err)
					}
					
				case <-p.ctx.Done():
					// Context canceled, worker should exit
					fmt.Printf("Worker %d shutting down (context canceled)\n", workerID)
					return
				}
			}
		}(i + 1)
	}
	
	// Handle OS signals for graceful shutdown
	go p.handleSignals()
}

// Submit adds a task to the pool
func (p *GracefulPool) Submit(task func(ctx context.Context) error) bool {
	select {
	case p.tasks <- task:
		return true
	case <-p.ctx.Done():
		return false
	default:
		// Queue is full
		return false
	}
}

// handleSignals sets up signal handling for graceful shutdown
func (p *GracefulPool) handleSignals() {
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
	
	// Wait for termination signal
	sig := <-signals
	fmt.Printf("Received signal %v, initiating graceful shutdown\n", sig)
	
	// Trigger graceful shutdown
	p.Shutdown()
}

// Shutdown initiates a graceful shutdown of the pool
func (p *GracefulPool) Shutdown() {
	fmt.Println("Starting graceful shutdown...")
	
	// Phase 1: Stop accepting new tasks
	fmt.Println("Phase 1: Stopping task acceptance")
	p.cancel() // Cancel context to prevent new submissions
	
	// Phase 2: Process remaining tasks with timeout
	fmt.Println("Phase 2: Processing remaining tasks")
	
	// Create a timeout for draining the queue
	drainCtx, drainCancel := context.WithTimeout(context.Background(), p.drainTimeout)
	defer drainCancel()
	
	// Create a goroutine to signal when all workers are done
	workersDone := make(chan struct{})
	go func() {
		p.wg.Wait()
		close(workersDone)
	}()
	
	// Wait for either all tasks to be processed or timeout
	select {
	case <-workersDone:
		fmt.Println("All tasks processed successfully")
	case <-drainCtx.Done():
		fmt.Println("Drain timeout reached, some tasks may not be processed")
	}
	
	// Phase 3: Close task channel to signal workers to exit
	fmt.Println("Phase 3: Closing task channel")
	close(p.tasks)
	
	// Phase 4: Wait for all workers to exit
	fmt.Println("Phase 4: Waiting for workers to exit")
	p.wg.Wait()
	
	fmt.Println("Graceful shutdown complete")
	close(p.shutdownCh)
}

// Wait blocks until the pool has shut down
func (p *GracefulPool) Wait() {
	<-p.shutdownCh
}

func main() {
	// Create a graceful pool with 5 workers, queue size 10, and 5s drain timeout
	pool := NewGracefulPool(5, 10, 5*time.Second)
	pool.Start()
	
	// Submit some tasks
	for i := 0; i < 20; i++ {
		taskID := i
		pool.Submit(func(ctx context.Context) error {
			fmt.Printf("Processing task %d\n", taskID)
			
			// Simulate work with context awareness
			select {
			case <-time.After(500 * time.Millisecond):
				fmt.Printf("Completed task %d\n", taskID)
				return nil
			case <-ctx.Done():
				fmt.Printf("Task %d canceled\n", taskID)
				return ctx.Err()
			}
		})
	}
	
	// Simulate running for a while
	time.Sleep(2 * time.Second)
	
	// Manually trigger shutdown (in a real app, this would be triggered by OS signals)
	go func() {
		time.Sleep(1 * time.Second)
		fmt.Println("Manually triggering shutdown")
		pool.Shutdown()
	}()
	
	// Wait for shutdown to complete
	pool.Wait()
	fmt.Println("Application exiting")
}

This graceful shutdown implementation provides a robust approach to terminating a worker pool:

  1. Signal handling: Captures OS termination signals to initiate graceful shutdown
  2. Phased shutdown: Implements a multi-phase shutdown process
  3. Drain timeout: Allows specifying a maximum time to process remaining tasks
  4. Context propagation: Tasks receive a context that is canceled during shutdown