Coordinating Multiple Services
In microservice architectures, coordinating shutdown across multiple services requires careful orchestration.
Dependency-Aware Shutdown
Services often have dependencies that dictate the order of shutdown. Here’s a pattern for dependency-aware shutdown:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// Service represents a component that can be started and stopped
type Service interface {
Name() string
Start() error
Stop(ctx context.Context) error
Dependencies() []Service
}
// BaseService provides common functionality for services
type BaseService struct {
name string
dependencies []Service
}
func (s *BaseService) Name() string {
return s.name
}
func (s *BaseService) Dependencies() []Service {
return s.dependencies
}
// DatabaseService represents a database connection
type DatabaseService struct {
BaseService
}
func NewDatabaseService() *DatabaseService {
return &DatabaseService{
BaseService: BaseService{
name: "database",
dependencies: []Service{},
},
}
}
func (s *DatabaseService) Start() error {
log.Printf("Starting %s service", s.Name())
time.Sleep(1 * time.Second) // Simulate startup
return nil
}
func (s *DatabaseService) Stop(ctx context.Context) error {
log.Printf("Stopping %s service", s.Name())
time.Sleep(2 * time.Second) // Simulate cleanup
return nil
}
// CacheService represents a cache service
type CacheService struct {
BaseService
}
func NewCacheService() *CacheService {
return &CacheService{
BaseService: BaseService{
name: "cache",
dependencies: []Service{},
},
}
}
func (s *CacheService) Start() error {
log.Printf("Starting %s service", s.Name())
time.Sleep(500 * time.Millisecond) // Simulate startup
return nil
}
func (s *CacheService) Stop(ctx context.Context) error {
log.Printf("Stopping %s service", s.Name())
time.Sleep(1 * time.Second) // Simulate cleanup
return nil
}
// APIService represents an API server
type APIService struct {
BaseService
}
func NewAPIService(db *DatabaseService, cache *CacheService) *APIService {
return &APIService{
BaseService: BaseService{
name: "api",
dependencies: []Service{db, cache},
},
}
}
func (s *APIService) Start() error {
log.Printf("Starting %s service", s.Name())
time.Sleep(1 * time.Second) // Simulate startup
return nil
}
func (s *APIService) Stop(ctx context.Context) error {
log.Printf("Stopping %s service", s.Name())
time.Sleep(3 * time.Second) // Simulate cleanup
return nil
}
// Application coordinates all services
type Application struct {
services []Service
mu sync.Mutex
}
func NewApplication(services ...Service) *Application {
return &Application{
services: services,
}
}
// Start starts all services in dependency order
func (a *Application) Start() error {
started := make(map[string]bool)
var startService func(Service) error
startService = func(s Service) error {
a.mu.Lock()
if started[s.Name()] {
a.mu.Unlock()
return nil
}
a.mu.Unlock()
// Start dependencies first
for _, dep := range s.Dependencies() {
if err := startService(dep); err != nil {
return fmt.Errorf("failed to start dependency %s: %w", dep.Name(), err)
}
}
// Start the service
if err := s.Start(); err != nil {
return fmt.Errorf("failed to start service %s: %w", s.Name(), err)
}
a.mu.Lock()
started[s.Name()] = true
a.mu.Unlock()
return nil
}
// Start all services
for _, s := range a.services {
if err := startService(s); err != nil {
return err
}
}
return nil
}
// Stop stops all services in reverse dependency order
func (a *Application) Stop(ctx context.Context) error {
// Build a reverse dependency graph
dependedOnBy := make(map[string][]Service)
for _, s := range a.services {
for _, dep := range s.Dependencies() {
dependedOnBy[dep.Name()] = append(dependedOnBy[dep.Name()], s)
}
}
// Find services with no dependents (leaf nodes)
var leaves []Service
for _, s := range a.services {
if len(dependedOnBy[s.Name()]) == 0 {
leaves = append(leaves, s)
}
}
// Stop services in reverse dependency order
stopped := make(map[string]bool)
var wg sync.WaitGroup
errCh := make(chan error, len(a.services))
var stopService func(Service)
stopService = func(s Service) {
defer wg.Done()
a.mu.Lock()
if stopped[s.Name()] {
a.mu.Unlock()
return
}
stopped[s.Name()] = true
a.mu.Unlock()
// Stop the service
if err := s.Stop(ctx); err != nil {
errCh <- fmt.Errorf("failed to stop service %s: %w", s.Name(), err)
return
}
// Stop dependencies after dependents
for _, dep := range s.Dependencies() {
// Check if all services depending on this dependency have been stopped
canStopDep := true
for _, depDependent := range dependedOnBy[dep.Name()] {
if !stopped[depDependent.Name()] {
canStopDep = false
break
}
}
if canStopDep {
wg.Add(1)
go stopService(dep)
}
}
}
// Start stopping leaf services
for _, s := range leaves {
wg.Add(1)
go stopService(s)
}
// Wait for all services to stop or context to be cancelled
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// Check for errors
close(errCh)
var errs []error
for err := range errCh {
errs = append(errs, err)
}
if len(errs) > 0 {
return fmt.Errorf("errors during shutdown: %v", errs)
}
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
// Create services
db := NewDatabaseService()
cache := NewCacheService()
api := NewAPIService(db, cache)
// Create application
app := NewApplication(api, db, cache)
// Start application
if err := app.Start(); err != nil {
log.Fatalf("Failed to start application: %v", err)
}
log.Println("Application started successfully")
// Channel to listen for interrupt signals
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
// Block until we receive a signal
sig := <-shutdown
log.Printf("Received signal: %v", sig)
// Create a deadline for graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Stop application
if err := app.Stop(ctx); err != nil {
log.Printf("Error during shutdown: %v", err)
os.Exit(1)
}
log.Println("Application shutdown complete")
}
This sophisticated pattern demonstrates:
- Modeling service dependencies explicitly
- Starting services in dependency order
- Stopping services in reverse dependency order
- Parallel shutdown where possible
- Timeout handling for the entire shutdown process
Health Checks and Readiness Probes
Health checks and readiness probes are essential for coordinating with orchestration systems like Kubernetes.