Advanced Circuit Breaker Strategies

Now let’s explore more sophisticated circuit breaker implementations and strategies.

Adaptive Circuit Breaker

An adaptive circuit breaker adjusts its parameters based on observed behavior:

package circuitbreaker

import (
    "math"
    "sync"
    "time"
)

// AdaptiveCircuitBreaker dynamically adjusts its thresholds based on traffic patterns
type AdaptiveCircuitBreaker struct {
    baseFailureThreshold int
    minFailureThreshold  int
    maxFailureThreshold  int
    
    baseResetTimeout     time.Duration
    minResetTimeout      time.Duration
    maxResetTimeout      time.Duration
    
    currentFailureThreshold int
    currentResetTimeout     time.Duration
    
    consecutiveSuccesses int
    consecutiveFailures  int
    
    state                State
    lastStateChange      time.Time
    failureCount         int
    mutex                sync.RWMutex
}

// NewAdaptiveCircuitBreaker creates a new adaptive circuit breaker
func NewAdaptiveCircuitBreaker(
    baseFailureThreshold int,
    minFailureThreshold int,
    maxFailureThreshold int,
    baseResetTimeout time.Duration,
    minResetTimeout time.Duration,
    maxResetTimeout time.Duration,
) *AdaptiveCircuitBreaker {
    return &AdaptiveCircuitBreaker{
        baseFailureThreshold:    baseFailureThreshold,
        minFailureThreshold:     minFailureThreshold,
        maxFailureThreshold:     maxFailureThreshold,
        baseResetTimeout:        baseResetTimeout,
        minResetTimeout:         minResetTimeout,
        maxResetTimeout:         maxResetTimeout,
        currentFailureThreshold: baseFailureThreshold,
        currentResetTimeout:     baseResetTimeout,
        state:                   Closed,
    }
}

// AllowRequest checks if a request should be allowed through
func (cb *AdaptiveCircuitBreaker) AllowRequest() bool {
    cb.mutex.RLock()
    defer cb.mutex.RUnlock()
    
    switch cb.state {
    case Closed:
        return true
    case Open:
        if time.Since(cb.lastStateChange) > cb.currentResetTimeout {
            cb.mutex.RUnlock()
            cb.mutex.Lock()
            defer cb.mutex.Unlock()
            
            if cb.state == Open {
                cb.state = HalfOpen
                cb.lastStateChange = time.Now()
            }
            return cb.state == HalfOpen
        }
        return false
    case HalfOpen:
        return true
    default:
        return false
    }
}

// RecordResult records the result of a request and adjusts thresholds
func (cb *AdaptiveCircuitBreaker) RecordResult(success bool) {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    
    now := time.Now()
    
    if success {
        cb.consecutiveSuccesses++
        cb.consecutiveFailures = 0
        
        // Adjust thresholds on consecutive successes
        if cb.consecutiveSuccesses >= 5 {
            // Increase failure threshold (more tolerant)
            cb.currentFailureThreshold = int(math.Min(
                float64(cb.maxFailureThreshold),
                float64(cb.currentFailureThreshold+1),
            ))
            
            // Decrease reset timeout (faster recovery)
            cb.currentResetTimeout = time.Duration(math.Max(
                float64(cb.minResetTimeout),
                float64(cb.currentResetTimeout)*0.9,
            ))
            
            cb.consecutiveSuccesses = 0
        }
        
        switch cb.state {
        case HalfOpen:
            // On success in half-open state, reset and close the circuit
            cb.failureCount = 0
            cb.state = Closed
            cb.lastStateChange = now
        case Closed:
            // Reset failure count on success
            cb.failureCount = 0
        }
    } else {
        cb.consecutiveFailures++
        cb.consecutiveSuccesses = 0
        
        // Adjust thresholds on consecutive failures
        if cb.consecutiveFailures >= 3 {
            // Decrease failure threshold (less tolerant)
            cb.currentFailureThreshold = int(math.Max(
                float64(cb.minFailureThreshold),
                float64(cb.currentFailureThreshold-1),
            ))
            
            // Increase reset timeout (slower recovery)
            cb.currentResetTimeout = time.Duration(math.Min(
                float64(cb.maxResetTimeout),
                float64(cb.currentResetTimeout)*1.1,
            ))
            
            cb.consecutiveFailures = 0
        }
        
        switch cb.state {
        case HalfOpen:
            // On failure in half-open state, reopen the circuit
            cb.state = Open
            cb.lastStateChange = now
        case Closed:
            // Increment failure count and check threshold
            cb.failureCount++
            if cb.failureCount >= cb.currentFailureThreshold {
                cb.state = Open
                cb.lastStateChange = now
            }
        }
    }
}

// Execute runs the given function with circuit breaker protection
func (cb *AdaptiveCircuitBreaker) Execute(fn func() error) error {
    if !cb.AllowRequest() {
        return ErrCircuitOpen
    }
    
    err := fn()
    cb.RecordResult(err == nil)
    return err
}

// State returns the current state of the circuit breaker
func (cb *AdaptiveCircuitBreaker) State() State {
    cb.mutex.RLock()
    defer cb.mutex.RUnlock()
    return cb.state
}

// CurrentThresholds returns the current adaptive thresholds
func (cb *AdaptiveCircuitBreaker) CurrentThresholds() (int, time.Duration) {
    cb.mutex.RLock()
    defer cb.mutex.RUnlock()
    return cb.currentFailureThreshold, cb.currentResetTimeout
}

Composite Circuit Breaker

For complex systems, we can create a composite circuit breaker that manages multiple downstream dependencies:

package circuitbreaker

import (
    "errors"
    "sync"
)

// CompositeCircuitBreaker manages multiple circuit breakers for different dependencies
type CompositeCircuitBreaker struct {
    breakers map[string]CircuitBreaker
    mutex    sync.RWMutex
}

// CircuitBreaker interface defines the methods a circuit breaker must implement
type CircuitBreaker interface {
    AllowRequest() bool
    RecordResult(success bool)
    Execute(fn func() error) error
    State() State
}

// NewCompositeCircuitBreaker creates a new composite circuit breaker
func NewCompositeCircuitBreaker() *CompositeCircuitBreaker {
    return &CompositeCircuitBreaker{
        breakers: make(map[string]CircuitBreaker),
    }
}

// AddBreaker adds a circuit breaker for a specific dependency
func (c *CompositeCircuitBreaker) AddBreaker(name string, breaker CircuitBreaker) {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    c.breakers[name] = breaker
}

// Execute runs a function for a specific dependency with circuit breaker protection
func (c *CompositeCircuitBreaker) Execute(name string, fn func() error) error {
    c.mutex.RLock()
    breaker, exists := c.breakers[name]
    c.mutex.RUnlock()
    
    if !exists {
        return errors.New("circuit breaker not found for: " + name)
    }
    
    return breaker.Execute(fn)
}

// AllowRequest checks if a request to a specific dependency should be allowed
func (c *CompositeCircuitBreaker) AllowRequest(name string) bool {
    c.mutex.RLock()
    defer c.mutex.RUnlock()
    
    breaker, exists := c.breakers[name]
    if !exists {
        return false
    }
    
    return breaker.AllowRequest()
}

// RecordResult records the result of a request to a specific dependency
func (c *CompositeCircuitBreaker) RecordResult(name string, success bool) {
    c.mutex.RLock()
    breaker, exists := c.breakers[name]
    c.mutex.RUnlock()
    
    if exists {
        breaker.RecordResult(success)
    }
}

// State returns the state of a specific circuit breaker
func (c *CompositeCircuitBreaker) State(name string) (State, bool) {
    c.mutex.RLock()
    defer c.mutex.RUnlock()
    
    breaker, exists := c.breakers[name]
    if !exists {
        return Closed, false
    }
    
    return breaker.State(), true
}

// States returns the states of all circuit breakers
func (c *CompositeCircuitBreaker) States() map[string]State {
    c.mutex.RLock()
    defer c.mutex.RUnlock()
    
    states := make(map[string]State, len(c.breakers))
    for name, breaker := range c.breakers {
        states[name] = breaker.State()
    }
    
    return states
}

Bulkhead Pattern Implementation

The bulkhead pattern complements circuit breakers by limiting concurrent requests:

package bulkhead

import (
    "errors"
    "sync"
)

var ErrBulkheadFull = errors.New("bulkhead is full")

// Bulkhead limits the number of concurrent requests
type Bulkhead struct {
    maxConcurrent int
    current       int
    mutex         sync.Mutex
}

// NewBulkhead creates a new bulkhead
func NewBulkhead(maxConcurrent int) *Bulkhead {
    return &Bulkhead{
        maxConcurrent: maxConcurrent,
    }
}

// Acquire attempts to acquire a slot in the bulkhead
func (b *Bulkhead) Acquire() bool {
    b.mutex.Lock()
    defer b.mutex.Unlock()
    
    if b.current >= b.maxConcurrent {
        return false
    }
    
    b.current++
    return true
}

// Release releases a slot in the bulkhead
func (b *Bulkhead) Release() {
    b.mutex.Lock()
    defer b.mutex.Unlock()
    
    if b.current > 0 {
        b.current--
    }
}

// Execute runs a function with bulkhead protection
func (b *Bulkhead) Execute(fn func() error) error {
    if !b.Acquire() {
        return ErrBulkheadFull
    }
    defer b.Release()
    
    return fn()
}

// CurrentLoad returns the current number of active requests
func (b *Bulkhead) CurrentLoad() int {
    b.mutex.Lock()
    defer b.mutex.Unlock()
    return b.current
}