Resource Cleanup and Management
Proper resource management is critical during shutdown. Let’s explore patterns for cleaning up various types of resources.
Database Connection Cleanup
Ensuring database connections are properly closed prevents connection leaks and allows transactions to complete:
package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
_ "github.com/go-sql-driver/mysql"
)
type App struct {
db *sql.DB
}
func NewApp() (*App, error) {
// Open database connection
db, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/dbname")
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
// Configure connection pool
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(25)
db.SetConnMaxLifetime(5 * time.Minute)
// Verify connection
if err := db.Ping(); err != nil {
db.Close() // Close on error
return nil, fmt.Errorf("failed to ping database: %w", err)
}
return &App{db: db}, nil
}
func (a *App) Shutdown(ctx context.Context) error {
log.Println("Closing database connections...")
// Create a timeout for DB shutdown if not already set
dbCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// Use a channel to signal completion or timeout
done := make(chan struct{})
var err error
go func() {
// Close the database connection
err = a.db.Close()
close(done)
}()
// Wait for completion or timeout
select {
case <-done:
if err != nil {
return fmt.Errorf("error closing database: %w", err)
}
log.Println("Database connections closed successfully")
return nil
case <-dbCtx.Done():
return fmt.Errorf("database shutdown timed out: %w", dbCtx.Err())
}
}
func main() {
// Initialize application
app, err := NewApp()
if err != nil {
log.Fatalf("Failed to initialize app: %v", err)
}
// Channel to listen for interrupt signals
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
// Block until we receive a signal
sig := <-shutdown
log.Printf("Received signal: %v", sig)
// Create a deadline for graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Perform application shutdown
if err := app.Shutdown(ctx); err != nil {
log.Printf("Error during shutdown: %v", err)
os.Exit(1)
}
log.Println("Application shutdown complete")
}
This pattern demonstrates:
- Proper database connection pool configuration
- Graceful shutdown with timeout handling
- Error handling during shutdown
Worker Pool Graceful Shutdown
Worker pools are common in Go applications. Here’s a pattern for gracefully shutting down a worker pool:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// Job represents a unit of work
type Job struct {
ID int
}
// WorkerPool manages a pool of workers
type WorkerPool struct {
jobs chan Job
results chan Result
workerCount int
shutdown chan struct{}
wg sync.WaitGroup
}
// Result represents the outcome of a job
type Result struct {
JobID int
Output string
Error error
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool(workerCount int, queueSize int) *WorkerPool {
return &WorkerPool{
jobs: make(chan Job, queueSize),
results: make(chan Result, queueSize),
workerCount: workerCount,
shutdown: make(chan struct{}),
}
}
// Start launches the worker pool
func (p *WorkerPool) Start() {
// Start workers
for i := 1; i <= p.workerCount; i++ {
p.wg.Add(1)
go p.worker(i)
}
log.Printf("Started worker pool with %d workers", p.workerCount)
}
// worker processes jobs
func (p *WorkerPool) worker(id int) {
defer p.wg.Done()
log.Printf("Worker %d starting", id)
for {
select {
case job, ok := <-p.jobs:
if !ok {
log.Printf("Worker %d shutting down: job channel closed", id)
return
}
// Process job
log.Printf("Worker %d processing job %d", id, job.ID)
// Simulate work
time.Sleep(time.Duration(job.ID%3+1) * time.Second)
// Send result
p.results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Result for job %d", job.ID),
}
case <-p.shutdown:
log.Printf("Worker %d received shutdown signal", id)
return
}
}
}
// Submit adds a job to the pool
func (p *WorkerPool) Submit(job Job) {
p.jobs <- job
}
// Results returns the results channel
func (p *WorkerPool) Results() <-chan Result {
return p.results
}
// Shutdown gracefully shuts down the worker pool
func (p *WorkerPool) Shutdown(ctx context.Context) {
log.Println("Worker pool shutting down...")
// Signal all workers to stop
close(p.shutdown)
// Close the jobs channel to prevent new jobs
close(p.jobs)
// Create a channel to signal when workers are done
done := make(chan struct{})
go func() {
// Wait for all workers to finish
p.wg.Wait()
close(done)
}()
// Wait for workers to finish or timeout
select {
case <-done:
log.Println("All workers have stopped")
case <-ctx.Done():
log.Printf("Worker pool shutdown timed out: %v", ctx.Err())
}
// Close the results channel
close(p.results)
}
func main() {
// Create a worker pool with 5 workers and a queue size of 10
pool := NewWorkerPool(5, 10)
pool.Start()
// Start a goroutine to process results
go func() {
for result := range pool.Results() {
log.Printf("Got result: %s (error: %v)", result.Output, result.Error)
}
log.Println("Results channel closed")
}()
// Submit some jobs
for i := 1; i <= 10; i++ {
pool.Submit(Job{ID: i})
}
// Channel to listen for interrupt signals
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
// Block until we receive a signal
sig := <-shutdown
log.Printf("Received signal: %v", sig)
// Create a deadline for graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Shutdown the worker pool
pool.Shutdown(ctx)
log.Println("Application shutdown complete")
}
This pattern demonstrates:
- Creating a worker pool with controlled concurrency
- Signaling workers to stop processing
- Waiting for in-progress work to complete
- Handling shutdown timeouts