Adaptive and Dynamic Rate Limiting
In real-world systems, traffic patterns can vary significantly over time. Adaptive rate limiting adjusts limits dynamically based on system conditions, providing better resource utilization and protection.
Load-Based Adaptive Rate Limiter
This implementation adjusts rate limits based on system load:
package main
import (
"fmt"
"math"
"runtime"
"sync"
"time"
)
// AdaptiveRateLimiter implements a rate limiter that adjusts based on system load
type AdaptiveRateLimiter struct {
mu sync.Mutex
baseLimit int
currentLimit int
windowSize time.Duration
requestCount int
windowStart time.Time
loadCheckPeriod time.Duration
lastLoadCheck time.Time
minLimit int
maxLimit int
}
// NewAdaptiveRateLimiter creates a new adaptive rate limiter
func NewAdaptiveRateLimiter(baseLimit, minLimit, maxLimit int, windowSize time.Duration) *AdaptiveRateLimiter {
limiter := &AdaptiveRateLimiter{
baseLimit: baseLimit,
currentLimit: baseLimit,
windowSize: windowSize,
windowStart: time.Now(),
loadCheckPeriod: 5 * time.Second,
lastLoadCheck: time.Now(),
minLimit: minLimit,
maxLimit: maxLimit,
}
// Start a goroutine to periodically adjust the limit based on system load
go limiter.adaptToLoad()
return limiter
}
// adaptToLoad periodically checks system load and adjusts the rate limit
func (l *AdaptiveRateLimiter) adaptToLoad() {
ticker := time.NewTicker(l.loadCheckPeriod)
defer ticker.Stop()
for range ticker.C {
l.adjustLimit()
}
}
// adjustLimit modifies the current limit based on CPU utilization
func (l *AdaptiveRateLimiter) adjustLimit() {
l.mu.Lock()
defer l.mu.Unlock()
// Get current CPU utilization (simplified)
var m runtime.MemStats
runtime.ReadMemStats(&m)
numCPU := float64(runtime.NumCPU())
cpuUtilization := float64(runtime.NumGoroutine()) / numCPU / 10 // Simplified metric
// Adjust limit based on CPU utilization
// - High utilization: reduce limit
// - Low utilization: increase limit
adjustmentFactor := 1.0
if cpuUtilization > 0.7 { // High load
adjustmentFactor = 0.8 // Reduce by 20%
} else if cpuUtilization < 0.3 { // Low load
adjustmentFactor = 1.2 // Increase by 20%
}
// Apply adjustment with bounds
newLimit := int(float64(l.currentLimit) * adjustmentFactor)
l.currentLimit = int(math.Max(float64(l.minLimit), math.Min(float64(l.maxLimit), float64(newLimit))))
fmt.Printf("System load: %.2f, Adjusted limit: %d\n", cpuUtilization, l.currentLimit)
}
// Allow checks if a request should be allowed based on the current adaptive rate
func (l *AdaptiveRateLimiter) Allow() bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
// If the window has expired, reset the counter
if now.Sub(l.windowStart) >= l.windowSize {
l.requestCount = 0
l.windowStart = now
}
// Check if we've reached the current limit
if l.requestCount >= l.currentLimit {
return false
}
// Increment the counter and allow the request
l.requestCount++
return true
}
func main() {
// Create an adaptive rate limiter: base 100 RPS, min 50 RPS, max 200 RPS
limiter := NewAdaptiveRateLimiter(100, 50, 200, time.Second)
// Simulate requests with varying load
for i := 1; i <= 20; i++ {
// Create artificial load every 5 iterations
if i%5 == 0 {
fmt.Println("Creating artificial load...")
for j := 0; j < 1000; j++ {
go func() {
time.Sleep(2 * time.Second)
}()
}
}
allowed := limiter.Allow()
fmt.Printf("Request %d: %v (current limit: %d)\n", i, allowed, limiter.currentLimit)
time.Sleep(50 * time.Millisecond)
}
}
Integration with Circuit Breakers
Rate limiting can be combined with circuit breakers for enhanced resilience:
package main
import (
"errors"
"fmt"
"sync"
"time"
)
// CircuitState represents the state of a circuit breaker
type CircuitState int
const (
StateClosed CircuitState = iota // Normal operation, requests allowed
StateOpen // Circuit is open, requests are rejected
StateHalfOpen // Testing if the circuit can be closed again
)
// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
mu sync.Mutex
state CircuitState
failureThreshold int
failureCount int
resetTimeout time.Duration
lastStateChange time.Time
halfOpenMaxCalls int
halfOpenCallCount int
}
// NewCircuitBreaker creates a new circuit breaker
func NewCircuitBreaker(failureThreshold int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
failureThreshold: failureThreshold,
resetTimeout: resetTimeout,
lastStateChange: time.Now(),
halfOpenMaxCalls: 3,
}
}
// Execute runs the given function if the circuit allows it
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mu.Lock()
// Check if the circuit is open
if cb.state == StateOpen {
// Check if it's time to try half-open state
if time.Since(cb.lastStateChange) > cb.resetTimeout {
cb.toHalfOpen()
} else {
cb.mu.Unlock()
return errors.New("circuit breaker is open")
}
}
// If half-open, check if we've reached the call limit
if cb.state == StateHalfOpen && cb.halfOpenCallCount >= cb.halfOpenMaxCalls {
cb.mu.Unlock()
return errors.New("circuit breaker is half-open and at call limit")
}
// Increment call count for half-open state
if cb.state == StateHalfOpen {
cb.halfOpenCallCount++
}
cb.mu.Unlock()
// Execute the function
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
// Handle the result
if err != nil {
// Record failure
cb.failureCount++
// Check if we need to open the circuit
if (cb.state == StateClosed && cb.failureCount >= cb.failureThreshold) ||
cb.state == StateHalfOpen {
cb.toOpen()
}
return err
}
// Success - if we're half-open, close the circuit
if cb.state == StateHalfOpen {
cb.toClosed()
}
// Reset failure count on success in closed state
if cb.state == StateClosed {
cb.failureCount = 0
}
return nil
}
// toOpen changes the circuit state to open
func (cb *CircuitBreaker) toOpen() {
cb.state = StateOpen
cb.lastStateChange = time.Now()
fmt.Println("Circuit breaker state changed to OPEN")
}
// toHalfOpen changes the circuit state to half-open
func (cb *CircuitBreaker) toHalfOpen() {
cb.state = StateHalfOpen
cb.lastStateChange = time.Now()
cb.halfOpenCallCount = 0
fmt.Println("Circuit breaker state changed to HALF-OPEN")
}
// toClosed changes the circuit state to closed
func (cb *CircuitBreaker) toClosed() {
cb.state = StateClosed
cb.lastStateChange = time.Now()
cb.failureCount = 0
fmt.Println("Circuit breaker state changed to CLOSED")
}
// RateLimitedCircuitBreaker combines rate limiting with circuit breaking
type RateLimitedCircuitBreaker struct {
rateLimiter *TokenBucket
circuitBreaker *CircuitBreaker
}
// NewRateLimitedCircuitBreaker creates a new rate-limited circuit breaker
func NewRateLimitedCircuitBreaker(
rps float64,
burst float64,
failureThreshold int,
resetTimeout time.Duration,
) *RateLimitedCircuitBreaker {
return &RateLimitedCircuitBreaker{
rateLimiter: NewTokenBucket(burst, rps),
circuitBreaker: NewCircuitBreaker(failureThreshold, resetTimeout),
}
}
// Execute runs the given function if both the rate limiter and circuit breaker allow it
func (rlcb *RateLimitedCircuitBreaker) Execute(fn func() error) error {
// First check rate limiter
if !rlcb.rateLimiter.Allow() {
return errors.New("rate limit exceeded")
}
// Then check circuit breaker
return rlcb.circuitBreaker.Execute(fn)
}
// TokenBucket implements the token bucket algorithm for rate limiting
type TokenBucket struct {
mu sync.Mutex
tokens float64
maxTokens float64
refillRate float64 // tokens per second
lastRefillTime time.Time
}
// NewTokenBucket creates a new token bucket rate limiter
func NewTokenBucket(maxTokens, refillRate float64) *TokenBucket {
return &TokenBucket{
tokens: maxTokens,
maxTokens: maxTokens,
refillRate: refillRate,
lastRefillTime: time.Now(),
}
}
// refill adds tokens to the bucket based on elapsed time
func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefillTime).Seconds()
// Calculate tokens to add based on elapsed time
newTokens := elapsed * tb.refillRate
// Update token count, capped at maxTokens
tb.tokens = min(tb.tokens+newTokens, tb.maxTokens)
tb.lastRefillTime = now
}
// Allow checks if a request can proceed and consumes a token if allowed
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
if tb.tokens >= 1.0 {
tb.tokens--
return true
}
return false
}
// min returns the minimum of two float64 values
func min(a, b float64) float64 {
if a < b {
return a
}
return b
}
func main() {
// Create a rate-limited circuit breaker:
// - 5 RPS
// - Burst of 10
// - Circuit opens after 3 failures
// - Circuit resets after 5 seconds
rlcb := NewRateLimitedCircuitBreaker(5, 10, 3, 5*time.Second)
// Simulate successful requests
fmt.Println("Simulating successful requests...")
for i := 1; i <= 5; i++ {
err := rlcb.Execute(func() error {
fmt.Printf("Request %d executed successfully\n", i)
return nil
})
if err != nil {
fmt.Printf("Request %d failed: %v\n", i, err)
}
time.Sleep(100 * time.Millisecond)
}
// Simulate failing requests
fmt.Println("\nSimulating failing requests...")
for i := 6; i <= 10; i++ {
err := rlcb.Execute(func() error {
fmt.Printf("Request %d would execute, but returning error\n", i)
return errors.New("simulated error")
})
if err != nil {
fmt.Printf("Request %d failed: %v\n", i, err)
}
time.Sleep(100 * time.Millisecond)
}
// Try more requests (should be rejected due to open circuit)
fmt.Println("\nTrying more requests (should be rejected)...")
for i := 11; i <= 15; i++ {
err := rlcb.Execute(func() error {
fmt.Printf("Request %d executed\n", i)
return nil
})
if err != nil {
fmt.Printf("Request %d failed: %v\n", i, err)
}
time.Sleep(100 * time.Millisecond)
}
// Wait for circuit to reset
fmt.Println("\nWaiting for circuit breaker timeout...")
time.Sleep(6 * time.Second)
// Try successful requests again
fmt.Println("\nTrying successful requests after timeout...")
for i := 16; i <= 20; i++ {
err := rlcb.Execute(func() error {
fmt.Printf("Request %d executed successfully\n", i)
return nil
})
if err != nil {
fmt.Printf("Request %d failed: %v\n", i, err)
}
time.Sleep(100 * time.Millisecond)
}
}