Advanced Circuit Breaker Strategies
Now let’s explore more sophisticated circuit breaker implementations and strategies.
Adaptive Circuit Breaker
An adaptive circuit breaker adjusts its parameters based on observed behavior:
package circuitbreaker
import (
"math"
"sync"
"time"
)
// AdaptiveCircuitBreaker dynamically adjusts its thresholds based on traffic patterns
type AdaptiveCircuitBreaker struct {
baseFailureThreshold int
minFailureThreshold int
maxFailureThreshold int
baseResetTimeout time.Duration
minResetTimeout time.Duration
maxResetTimeout time.Duration
currentFailureThreshold int
currentResetTimeout time.Duration
consecutiveSuccesses int
consecutiveFailures int
state State
lastStateChange time.Time
failureCount int
mutex sync.RWMutex
}
// NewAdaptiveCircuitBreaker creates a new adaptive circuit breaker
func NewAdaptiveCircuitBreaker(
baseFailureThreshold int,
minFailureThreshold int,
maxFailureThreshold int,
baseResetTimeout time.Duration,
minResetTimeout time.Duration,
maxResetTimeout time.Duration,
) *AdaptiveCircuitBreaker {
return &AdaptiveCircuitBreaker{
baseFailureThreshold: baseFailureThreshold,
minFailureThreshold: minFailureThreshold,
maxFailureThreshold: maxFailureThreshold,
baseResetTimeout: baseResetTimeout,
minResetTimeout: minResetTimeout,
maxResetTimeout: maxResetTimeout,
currentFailureThreshold: baseFailureThreshold,
currentResetTimeout: baseResetTimeout,
state: Closed,
}
}
// AllowRequest checks if a request should be allowed through
func (cb *AdaptiveCircuitBreaker) AllowRequest() bool {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
switch cb.state {
case Closed:
return true
case Open:
if time.Since(cb.lastStateChange) > cb.currentResetTimeout {
cb.mutex.RUnlock()
cb.mutex.Lock()
defer cb.mutex.Unlock()
if cb.state == Open {
cb.state = HalfOpen
cb.lastStateChange = time.Now()
}
return cb.state == HalfOpen
}
return false
case HalfOpen:
return true
default:
return false
}
}
// RecordResult records the result of a request and adjusts thresholds
func (cb *AdaptiveCircuitBreaker) RecordResult(success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
if success {
cb.consecutiveSuccesses++
cb.consecutiveFailures = 0
// Adjust thresholds on consecutive successes
if cb.consecutiveSuccesses >= 5 {
// Increase failure threshold (more tolerant)
cb.currentFailureThreshold = int(math.Min(
float64(cb.maxFailureThreshold),
float64(cb.currentFailureThreshold+1),
))
// Decrease reset timeout (faster recovery)
cb.currentResetTimeout = time.Duration(math.Max(
float64(cb.minResetTimeout),
float64(cb.currentResetTimeout)*0.9,
))
cb.consecutiveSuccesses = 0
}
switch cb.state {
case HalfOpen:
// On success in half-open state, reset and close the circuit
cb.failureCount = 0
cb.state = Closed
cb.lastStateChange = now
case Closed:
// Reset failure count on success
cb.failureCount = 0
}
} else {
cb.consecutiveFailures++
cb.consecutiveSuccesses = 0
// Adjust thresholds on consecutive failures
if cb.consecutiveFailures >= 3 {
// Decrease failure threshold (less tolerant)
cb.currentFailureThreshold = int(math.Max(
float64(cb.minFailureThreshold),
float64(cb.currentFailureThreshold-1),
))
// Increase reset timeout (slower recovery)
cb.currentResetTimeout = time.Duration(math.Min(
float64(cb.maxResetTimeout),
float64(cb.currentResetTimeout)*1.1,
))
cb.consecutiveFailures = 0
}
switch cb.state {
case HalfOpen:
// On failure in half-open state, reopen the circuit
cb.state = Open
cb.lastStateChange = now
case Closed:
// Increment failure count and check threshold
cb.failureCount++
if cb.failureCount >= cb.currentFailureThreshold {
cb.state = Open
cb.lastStateChange = now
}
}
}
}
// Execute runs the given function with circuit breaker protection
func (cb *AdaptiveCircuitBreaker) Execute(fn func() error) error {
if !cb.AllowRequest() {
return ErrCircuitOpen
}
err := fn()
cb.RecordResult(err == nil)
return err
}
// State returns the current state of the circuit breaker
func (cb *AdaptiveCircuitBreaker) State() State {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
return cb.state
}
// CurrentThresholds returns the current adaptive thresholds
func (cb *AdaptiveCircuitBreaker) CurrentThresholds() (int, time.Duration) {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
return cb.currentFailureThreshold, cb.currentResetTimeout
}
Composite Circuit Breaker
For complex systems, we can create a composite circuit breaker that manages multiple downstream dependencies:
package circuitbreaker
import (
"errors"
"sync"
)
// CompositeCircuitBreaker manages multiple circuit breakers for different dependencies
type CompositeCircuitBreaker struct {
breakers map[string]CircuitBreaker
mutex sync.RWMutex
}
// CircuitBreaker interface defines the methods a circuit breaker must implement
type CircuitBreaker interface {
AllowRequest() bool
RecordResult(success bool)
Execute(fn func() error) error
State() State
}
// NewCompositeCircuitBreaker creates a new composite circuit breaker
func NewCompositeCircuitBreaker() *CompositeCircuitBreaker {
return &CompositeCircuitBreaker{
breakers: make(map[string]CircuitBreaker),
}
}
// AddBreaker adds a circuit breaker for a specific dependency
func (c *CompositeCircuitBreaker) AddBreaker(name string, breaker CircuitBreaker) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.breakers[name] = breaker
}
// Execute runs a function for a specific dependency with circuit breaker protection
func (c *CompositeCircuitBreaker) Execute(name string, fn func() error) error {
c.mutex.RLock()
breaker, exists := c.breakers[name]
c.mutex.RUnlock()
if !exists {
return errors.New("circuit breaker not found for: " + name)
}
return breaker.Execute(fn)
}
// AllowRequest checks if a request to a specific dependency should be allowed
func (c *CompositeCircuitBreaker) AllowRequest(name string) bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
breaker, exists := c.breakers[name]
if !exists {
return false
}
return breaker.AllowRequest()
}
// RecordResult records the result of a request to a specific dependency
func (c *CompositeCircuitBreaker) RecordResult(name string, success bool) {
c.mutex.RLock()
breaker, exists := c.breakers[name]
c.mutex.RUnlock()
if exists {
breaker.RecordResult(success)
}
}
// State returns the state of a specific circuit breaker
func (c *CompositeCircuitBreaker) State(name string) (State, bool) {
c.mutex.RLock()
defer c.mutex.RUnlock()
breaker, exists := c.breakers[name]
if !exists {
return Closed, false
}
return breaker.State(), true
}
// States returns the states of all circuit breakers
func (c *CompositeCircuitBreaker) States() map[string]State {
c.mutex.RLock()
defer c.mutex.RUnlock()
states := make(map[string]State, len(c.breakers))
for name, breaker := range c.breakers {
states[name] = breaker.State()
}
return states
}
Bulkhead Pattern Implementation
The bulkhead pattern complements circuit breakers by limiting concurrent requests:
package bulkhead
import (
"errors"
"sync"
)
var ErrBulkheadFull = errors.New("bulkhead is full")
// Bulkhead limits the number of concurrent requests
type Bulkhead struct {
maxConcurrent int
current int
mutex sync.Mutex
}
// NewBulkhead creates a new bulkhead
func NewBulkhead(maxConcurrent int) *Bulkhead {
return &Bulkhead{
maxConcurrent: maxConcurrent,
}
}
// Acquire attempts to acquire a slot in the bulkhead
func (b *Bulkhead) Acquire() bool {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.current >= b.maxConcurrent {
return false
}
b.current++
return true
}
// Release releases a slot in the bulkhead
func (b *Bulkhead) Release() {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.current > 0 {
b.current--
}
}
// Execute runs a function with bulkhead protection
func (b *Bulkhead) Execute(fn func() error) error {
if !b.Acquire() {
return ErrBulkheadFull
}
defer b.Release()
return fn()
}
// CurrentLoad returns the current number of active requests
func (b *Bulkhead) CurrentLoad() int {
b.mutex.Lock()
defer b.mutex.Unlock()
return b.current
}