Adaptive and Dynamic Rate Limiting

In real-world systems, traffic patterns can vary significantly over time. Adaptive rate limiting adjusts limits dynamically based on system conditions, providing better resource utilization and protection.

Load-Based Adaptive Rate Limiter

This implementation adjusts rate limits based on system load:

package main

import (
	"fmt"
	"math"
	"runtime"
	"sync"
	"time"
)

// AdaptiveRateLimiter implements a rate limiter that adjusts based on system load
type AdaptiveRateLimiter struct {
	mu              sync.Mutex
	baseLimit       int
	currentLimit    int
	windowSize      time.Duration
	requestCount    int
	windowStart     time.Time
	loadCheckPeriod time.Duration
	lastLoadCheck   time.Time
	minLimit        int
	maxLimit        int
}

// NewAdaptiveRateLimiter creates a new adaptive rate limiter
func NewAdaptiveRateLimiter(baseLimit, minLimit, maxLimit int, windowSize time.Duration) *AdaptiveRateLimiter {
	limiter := &AdaptiveRateLimiter{
		baseLimit:       baseLimit,
		currentLimit:    baseLimit,
		windowSize:      windowSize,
		windowStart:     time.Now(),
		loadCheckPeriod: 5 * time.Second,
		lastLoadCheck:   time.Now(),
		minLimit:        minLimit,
		maxLimit:        maxLimit,
	}
	
	// Start a goroutine to periodically adjust the limit based on system load
	go limiter.adaptToLoad()
	
	return limiter
}

// adaptToLoad periodically checks system load and adjusts the rate limit
func (l *AdaptiveRateLimiter) adaptToLoad() {
	ticker := time.NewTicker(l.loadCheckPeriod)
	defer ticker.Stop()
	
	for range ticker.C {
		l.adjustLimit()
	}
}

// adjustLimit modifies the current limit based on CPU utilization
func (l *AdaptiveRateLimiter) adjustLimit() {
	l.mu.Lock()
	defer l.mu.Unlock()
	
	// Get current CPU utilization (simplified)
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	numCPU := float64(runtime.NumCPU())
	cpuUtilization := float64(runtime.NumGoroutine()) / numCPU / 10 // Simplified metric
	
	// Adjust limit based on CPU utilization
	// - High utilization: reduce limit
	// - Low utilization: increase limit
	adjustmentFactor := 1.0
	if cpuUtilization > 0.7 { // High load
		adjustmentFactor = 0.8 // Reduce by 20%
	} else if cpuUtilization < 0.3 { // Low load
		adjustmentFactor = 1.2 // Increase by 20%
	}
	
	// Apply adjustment with bounds
	newLimit := int(float64(l.currentLimit) * adjustmentFactor)
	l.currentLimit = int(math.Max(float64(l.minLimit), math.Min(float64(l.maxLimit), float64(newLimit))))
	
	fmt.Printf("System load: %.2f, Adjusted limit: %d\n", cpuUtilization, l.currentLimit)
}

// Allow checks if a request should be allowed based on the current adaptive rate
func (l *AdaptiveRateLimiter) Allow() bool {
	l.mu.Lock()
	defer l.mu.Unlock()
	
	now := time.Now()
	
	// If the window has expired, reset the counter
	if now.Sub(l.windowStart) >= l.windowSize {
		l.requestCount = 0
		l.windowStart = now
	}
	
	// Check if we've reached the current limit
	if l.requestCount >= l.currentLimit {
		return false
	}
	
	// Increment the counter and allow the request
	l.requestCount++
	return true
}

func main() {
	// Create an adaptive rate limiter: base 100 RPS, min 50 RPS, max 200 RPS
	limiter := NewAdaptiveRateLimiter(100, 50, 200, time.Second)
	
	// Simulate requests with varying load
	for i := 1; i <= 20; i++ {
		// Create artificial load every 5 iterations
		if i%5 == 0 {
			fmt.Println("Creating artificial load...")
			for j := 0; j < 1000; j++ {
				go func() {
					time.Sleep(2 * time.Second)
				}()
			}
		}
		
		allowed := limiter.Allow()
		fmt.Printf("Request %d: %v (current limit: %d)\n", i, allowed, limiter.currentLimit)
		time.Sleep(50 * time.Millisecond)
	}
}

Integration with Circuit Breakers

Rate limiting can be combined with circuit breakers for enhanced resilience:

package main

import (
	"errors"
	"fmt"
	"sync"
	"time"
)

// CircuitState represents the state of a circuit breaker
type CircuitState int

const (
	StateClosed CircuitState = iota // Normal operation, requests allowed
	StateOpen                       // Circuit is open, requests are rejected
	StateHalfOpen                   // Testing if the circuit can be closed again
)

// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
	mu                sync.Mutex
	state             CircuitState
	failureThreshold  int
	failureCount      int
	resetTimeout      time.Duration
	lastStateChange   time.Time
	halfOpenMaxCalls  int
	halfOpenCallCount int
}

// NewCircuitBreaker creates a new circuit breaker
func NewCircuitBreaker(failureThreshold int, resetTimeout time.Duration) *CircuitBreaker {
	return &CircuitBreaker{
		state:            StateClosed,
		failureThreshold: failureThreshold,
		resetTimeout:     resetTimeout,
		lastStateChange:  time.Now(),
		halfOpenMaxCalls: 3,
	}
}

// Execute runs the given function if the circuit allows it
func (cb *CircuitBreaker) Execute(fn func() error) error {
	cb.mu.Lock()
	
	// Check if the circuit is open
	if cb.state == StateOpen {
		// Check if it's time to try half-open state
		if time.Since(cb.lastStateChange) > cb.resetTimeout {
			cb.toHalfOpen()
		} else {
			cb.mu.Unlock()
			return errors.New("circuit breaker is open")
		}
	}
	
	// If half-open, check if we've reached the call limit
	if cb.state == StateHalfOpen && cb.halfOpenCallCount >= cb.halfOpenMaxCalls {
		cb.mu.Unlock()
		return errors.New("circuit breaker is half-open and at call limit")
	}
	
	// Increment call count for half-open state
	if cb.state == StateHalfOpen {
		cb.halfOpenCallCount++
	}
	
	cb.mu.Unlock()
	
	// Execute the function
	err := fn()
	
	cb.mu.Lock()
	defer cb.mu.Unlock()
	
	// Handle the result
	if err != nil {
		// Record failure
		cb.failureCount++
		
		// Check if we need to open the circuit
		if (cb.state == StateClosed && cb.failureCount >= cb.failureThreshold) ||
			cb.state == StateHalfOpen {
			cb.toOpen()
		}
		
		return err
	}
	
	// Success - if we're half-open, close the circuit
	if cb.state == StateHalfOpen {
		cb.toClosed()
	}
	
	// Reset failure count on success in closed state
	if cb.state == StateClosed {
		cb.failureCount = 0
	}
	
	return nil
}

// toOpen changes the circuit state to open
func (cb *CircuitBreaker) toOpen() {
	cb.state = StateOpen
	cb.lastStateChange = time.Now()
	fmt.Println("Circuit breaker state changed to OPEN")
}

// toHalfOpen changes the circuit state to half-open
func (cb *CircuitBreaker) toHalfOpen() {
	cb.state = StateHalfOpen
	cb.lastStateChange = time.Now()
	cb.halfOpenCallCount = 0
	fmt.Println("Circuit breaker state changed to HALF-OPEN")
}

// toClosed changes the circuit state to closed
func (cb *CircuitBreaker) toClosed() {
	cb.state = StateClosed
	cb.lastStateChange = time.Now()
	cb.failureCount = 0
	fmt.Println("Circuit breaker state changed to CLOSED")
}

// RateLimitedCircuitBreaker combines rate limiting with circuit breaking
type RateLimitedCircuitBreaker struct {
	rateLimiter   *TokenBucket
	circuitBreaker *CircuitBreaker
}

// NewRateLimitedCircuitBreaker creates a new rate-limited circuit breaker
func NewRateLimitedCircuitBreaker(
	rps float64,
	burst float64,
	failureThreshold int,
	resetTimeout time.Duration,
) *RateLimitedCircuitBreaker {
	return &RateLimitedCircuitBreaker{
		rateLimiter:    NewTokenBucket(burst, rps),
		circuitBreaker: NewCircuitBreaker(failureThreshold, resetTimeout),
	}
}

// Execute runs the given function if both the rate limiter and circuit breaker allow it
func (rlcb *RateLimitedCircuitBreaker) Execute(fn func() error) error {
	// First check rate limiter
	if !rlcb.rateLimiter.Allow() {
		return errors.New("rate limit exceeded")
	}
	
	// Then check circuit breaker
	return rlcb.circuitBreaker.Execute(fn)
}

// TokenBucket implements the token bucket algorithm for rate limiting
type TokenBucket struct {
	mu            sync.Mutex
	tokens        float64
	maxTokens     float64
	refillRate    float64 // tokens per second
	lastRefillTime time.Time
}

// NewTokenBucket creates a new token bucket rate limiter
func NewTokenBucket(maxTokens, refillRate float64) *TokenBucket {
	return &TokenBucket{
		tokens:        maxTokens,
		maxTokens:     maxTokens,
		refillRate:    refillRate,
		lastRefillTime: time.Now(),
	}
}

// refill adds tokens to the bucket based on elapsed time
func (tb *TokenBucket) refill() {
	now := time.Now()
	elapsed := now.Sub(tb.lastRefillTime).Seconds()
	
	// Calculate tokens to add based on elapsed time
	newTokens := elapsed * tb.refillRate
	
	// Update token count, capped at maxTokens
	tb.tokens = min(tb.tokens+newTokens, tb.maxTokens)
	tb.lastRefillTime = now
}

// Allow checks if a request can proceed and consumes a token if allowed
func (tb *TokenBucket) Allow() bool {
	tb.mu.Lock()
	defer tb.mu.Unlock()
	
	tb.refill()
	
	if tb.tokens >= 1.0 {
		tb.tokens--
		return true
	}
	
	return false
}

// min returns the minimum of two float64 values
func min(a, b float64) float64 {
	if a < b {
		return a
	}
	return b
}

func main() {
	// Create a rate-limited circuit breaker:
	// - 5 RPS
	// - Burst of 10
	// - Circuit opens after 3 failures
	// - Circuit resets after 5 seconds
	rlcb := NewRateLimitedCircuitBreaker(5, 10, 3, 5*time.Second)
	
	// Simulate successful requests
	fmt.Println("Simulating successful requests...")
	for i := 1; i <= 5; i++ {
		err := rlcb.Execute(func() error {
			fmt.Printf("Request %d executed successfully\n", i)
			return nil
		})
		
		if err != nil {
			fmt.Printf("Request %d failed: %v\n", i, err)
		}
		
		time.Sleep(100 * time.Millisecond)
	}
	
	// Simulate failing requests
	fmt.Println("\nSimulating failing requests...")
	for i := 6; i <= 10; i++ {
		err := rlcb.Execute(func() error {
			fmt.Printf("Request %d would execute, but returning error\n", i)
			return errors.New("simulated error")
		})
		
		if err != nil {
			fmt.Printf("Request %d failed: %v\n", i, err)
		}
		
		time.Sleep(100 * time.Millisecond)
	}
	
	// Try more requests (should be rejected due to open circuit)
	fmt.Println("\nTrying more requests (should be rejected)...")
	for i := 11; i <= 15; i++ {
		err := rlcb.Execute(func() error {
			fmt.Printf("Request %d executed\n", i)
			return nil
		})
		
		if err != nil {
			fmt.Printf("Request %d failed: %v\n", i, err)
		}
		
		time.Sleep(100 * time.Millisecond)
	}
	
	// Wait for circuit to reset
	fmt.Println("\nWaiting for circuit breaker timeout...")
	time.Sleep(6 * time.Second)
	
	// Try successful requests again
	fmt.Println("\nTrying successful requests after timeout...")
	for i := 16; i <= 20; i++ {
		err := rlcb.Execute(func() error {
			fmt.Printf("Request %d executed successfully\n", i)
			return nil
		})
		
		if err != nil {
			fmt.Printf("Request %d failed: %v\n", i, err)
		}
		
		time.Sleep(100 * time.Millisecond)
	}
}