Production Deployment Strategies
Deploying worker pools in production environments requires careful consideration of resource utilization, graceful shutdown, and integration with monitoring systems.
Graceful Shutdown Worker Pool
Proper shutdown handling is critical for production systems. Let’s implement a worker pool with comprehensive graceful shutdown capabilities:
package main
import (
func NewBackpressurePool(workers, maxQueueDepth int) *Backp
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// GracefulPool represents a worker pool with graceful shutdown capabilities
type GracefulPool struct {
workers int
tasks chan func(ctx context.Context) error
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
shutdownCh chan struct{}
drainTimeout time.Duration
}
// NewGracefulPool creates a new worker pool with graceful shutdown
func NewGracefulPool(workers, queueSize int, drainTimeout time.Duration) *GracefulPool {
ctx, cancel := context.WithCancel(context.Background())
return &GracefulPool{
workers: workers,
tasks: make(chan func(ctx context.Context) error, queueSize),
ctx: ctx,
cancel: cancel,
shutdownCh: make(chan struct{}),
drainTimeout: drainTimeout,
}
}
// Start launches the worker pool
func (p *GracefulPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for {
select {
case task, ok := <-p.tasks:
if !ok {
// Channel closed, worker should exit
fmt.Printf("Worker %d shutting down (channel closed)\n", workerID)
return
}
// Execute the task with the pool's context
err := task(p.ctx)
if err != nil {
fmt.Printf("Worker %d task error: %v\n", workerID, err)
}
case <-p.ctx.Done():
// Context canceled, worker should exit
fmt.Printf("Worker %d shutting down (context canceled)\n", workerID)
return
}
}
}(i + 1)
}
// Handle OS signals for graceful shutdown
go p.handleSignals()
}
// Submit adds a task to the pool
func (p *GracefulPool) Submit(task func(ctx context.Context) error) bool {
select {
case p.tasks <- task:
return true
case <-p.ctx.Done():
return false
default:
// Queue is full
return false
}
}
// handleSignals sets up signal handling for graceful shutdown
func (p *GracefulPool) handleSignals() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
// Wait for termination signal
sig := <-signals
fmt.Printf("Received signal %v, initiating graceful shutdown\n", sig)
// Trigger graceful shutdown
p.Shutdown()
}
// Shutdown initiates a graceful shutdown of the pool
func (p *GracefulPool) Shutdown() {
fmt.Println("Starting graceful shutdown...")
// Phase 1: Stop accepting new tasks
fmt.Println("Phase 1: Stopping task acceptance")
p.cancel() // Cancel context to prevent new submissions
// Phase 2: Process remaining tasks with timeout
fmt.Println("Phase 2: Processing remaining tasks")
// Create a timeout for draining the queue
drainCtx, drainCancel := context.WithTimeout(context.Background(), p.drainTimeout)
defer drainCancel()
// Create a goroutine to signal when all workers are done
workersDone := make(chan struct{})
go func() {
p.wg.Wait()
close(workersDone)
}()
// Wait for either all tasks to be processed or timeout
select {
case <-workersDone:
fmt.Println("All tasks processed successfully")
case <-drainCtx.Done():
fmt.Println("Drain timeout reached, some tasks may not be processed")
}
// Phase 3: Close task channel to signal workers to exit
fmt.Println("Phase 3: Closing task channel")
close(p.tasks)
// Phase 4: Wait for all workers to exit
fmt.Println("Phase 4: Waiting for workers to exit")
p.wg.Wait()
fmt.Println("Graceful shutdown complete")
close(p.shutdownCh)
}
// Wait blocks until the pool has shut down
func (p *GracefulPool) Wait() {
<-p.shutdownCh
}
func main() {
// Create a graceful pool with 5 workers, queue size 10, and 5s drain timeout
pool := NewGracefulPool(5, 10, 5*time.Second)
pool.Start()
// Submit some tasks
for i := 0; i < 20; i++ {
taskID := i
pool.Submit(func(ctx context.Context) error {
fmt.Printf("Processing task %d\n", taskID)
// Simulate work with context awareness
select {
case <-time.After(500 * time.Millisecond):
fmt.Printf("Completed task %d\n", taskID)
return nil
case <-ctx.Done():
fmt.Printf("Task %d canceled\n", taskID)
return ctx.Err()
}
})
}
// Simulate running for a while
time.Sleep(2 * time.Second)
// Manually trigger shutdown (in a real app, this would be triggered by OS signals)
go func() {
time.Sleep(1 * time.Second)
fmt.Println("Manually triggering shutdown")
pool.Shutdown()
}()
// Wait for shutdown to complete
pool.Wait()
fmt.Println("Application exiting")
}
This graceful shutdown implementation provides a robust approach to terminating a worker pool:
- Signal handling: Captures OS termination signals to initiate graceful shutdown
- Phased shutdown: Implements a multi-phase shutdown process
- Drain timeout: Allows specifying a maximum time to process remaining tasks
- Context propagation: Tasks receive a context that is canceled during shutdown