Monitoring and Observability
Effective rate limiting requires comprehensive monitoring to tune parameters and detect issues. Let’s implement a rate limiter with built-in metrics:
package main
import (
"fmt"
"sync"
"time"
)
// RateLimiterMetrics tracks statistics about rate limiter behavior
type RateLimiterMetrics struct {
mu sync.Mutex
totalRequests int64
allowedRequests int64
rejectedRequests int64
currentQPS float64
peakQPS float64
lastCalculated time.Time
requestCounts []int
requestCountsIndex int
requestCountsSize int
}
// NewRateLimiterMetrics creates a new metrics tracker
func NewRateLimiterMetrics() *RateLimiterMetrics {
metrics := &RateLimiterMetrics{
lastCalculated: time.Now(),
requestCountsSize: 60, // Track last 60 seconds
}
metrics.requestCounts = make([]int, metrics.requestCountsSize)
// Start a goroutine to periodically calculate QPS
go metrics.calculateQPS()
return metrics
}
// calculateQPS periodically calculates the current QPS
func (m *RateLimiterMetrics) calculateQPS() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
m.mu.Lock()
// Calculate QPS based on the sum of all request counts
totalRequests := 0
for _, count := range m.requestCounts {
totalRequests += count
}
m.currentQPS = float64(totalRequests) / float64(m.requestCountsSize)
if m.currentQPS > m.peakQPS {
m.peakQPS = m.currentQPS
}
// Reset the current second's count
m.requestCounts[m.requestCountsIndex] = 0
m.requestCountsIndex = (m.requestCountsIndex + 1) % m.requestCountsSize
m.mu.Unlock()
}
}
// RecordRequest records a request and its outcome
func (m *RateLimiterMetrics) RecordRequest(allowed bool) {
m.mu.Lock()
defer m.mu.Unlock()
m.totalRequests++
m.requestCounts[m.requestCountsIndex]++
if allowed {
m.allowedRequests++
} else {
m.rejectedRequests++
}
}
// GetMetrics returns the current metrics
func (m *RateLimiterMetrics) GetMetrics() map[string]interface{} {
m.mu.Lock()
defer m.mu.Unlock()
rejectionRate := float64(0)
if m.totalRequests > 0 {
rejectionRate = float64(m.rejectedRequests) / float64(m.totalRequests)
}
return map[string]interface{}{
"total_requests": m.totalRequests,
"allowed_requests": m.allowedRequests,
"rejected_requests": m.rejectedRequests,
"rejection_rate": rejectionRate,
"current_qps": m.currentQPS,
"peak_qps": m.peakQPS,
}
}
// InstrumentedTokenBucket is a token bucket with metrics
type InstrumentedTokenBucket struct {
tokenBucket *TokenBucket
metrics *RateLimiterMetrics
}
// NewInstrumentedTokenBucket creates a new instrumented token bucket
func NewInstrumentedTokenBucket(maxTokens, refillRate float64) *InstrumentedTokenBucket {
return &InstrumentedTokenBucket{
tokenBucket: NewTokenBucket(maxTokens, refillRate),
metrics: NewRateLimiterMetrics(),
}
}
// Allow checks if a request can proceed and records metrics
func (itb *InstrumentedTokenBucket) Allow() bool {
allowed := itb.tokenBucket.Allow()
itb.metrics.RecordRequest(allowed)
return allowed
}
// GetMetrics returns the current metrics
func (itb *InstrumentedTokenBucket) GetMetrics() map[string]interface{} {
return itb.metrics.GetMetrics()
}
// TokenBucket implements the token bucket algorithm for rate limiting
type TokenBucket struct {
mu sync.Mutex
tokens float64
maxTokens float64
refillRate float64 // tokens per second
lastRefillTime time.Time
}
// NewTokenBucket creates a new token bucket rate limiter
func NewTokenBucket(maxTokens, refillRate float64) *TokenBucket {
return &TokenBucket{
tokens: maxTokens,
maxTokens: maxTokens,
refillRate: refillRate,
lastRefillTime: time.Now(),
}
}
// refill adds tokens to the bucket based on elapsed time
func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefillTime).Seconds()
// Calculate tokens to add based on elapsed time
newTokens := elapsed * tb.refillRate
// Update token count, capped at maxTokens
tb.tokens = min(tb.tokens+newTokens, tb.maxTokens)
tb.lastRefillTime = now
}
// Allow checks if a request can proceed and consumes a token if allowed
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
if tb.tokens >= 1.0 {
tb.tokens--
return true
}
return false
}
// min returns the minimum of two float64 values
func min(a, b float64) float64 {
if a < b {
return a
}
return b
}
func main() {
// Create an instrumented token bucket: 10 max tokens, refill rate of 5 tokens per second
limiter := NewInstrumentedTokenBucket(10, 5)
// Start a goroutine to periodically print metrics
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for range ticker.C {
metrics := limiter.GetMetrics()
fmt.Printf("\nCurrent Metrics:\n")
fmt.Printf(" Total Requests: %d\n", metrics["total_requests"])
fmt.Printf(" Allowed Requests: %d\n", metrics["allowed_requests"])
fmt.Printf(" Rejected Requests: %d\n", metrics["rejected_requests"])
fmt.Printf(" Rejection Rate: %.2f%%\n", metrics["rejection_rate"].(float64)*100)
fmt.Printf(" Current QPS: %.2f\n", metrics["current_qps"])
fmt.Printf(" Peak QPS: %.2f\n", metrics["peak_qps"])
}
}()
// Simulate normal traffic
fmt.Println("Simulating normal traffic...")
for i := 1; i <= 20; i++ {
allowed := limiter.Allow()
fmt.Printf("Request %d: %v\n", i, allowed)
time.Sleep(200 * time.Millisecond)
}
// Simulate burst traffic
fmt.Println("\nSimulating burst traffic...")
for i := 21; i <= 40; i++ {
allowed := limiter.Allow()
fmt.Printf("Request %d: %v\n", i, allowed)
time.Sleep(50 * time.Millisecond)
}
// Wait for metrics to be calculated
time.Sleep(4 * time.Second)
}
Prometheus Integration
For production systems, integrating with monitoring systems like Prometheus is essential:
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
// Define Prometheus metrics
requestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "rate_limiter_requests_total",
Help: "The total number of requests",
},
[]string{"status"},
)
currentTokens = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "rate_limiter_current_tokens",
Help: "The current number of tokens in the bucket",
},
[]string{"limiter_id"},
)
requestLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "rate_limiter_request_duration_seconds",
Help: "The latency of rate limiter decisions",
Buckets: prometheus.DefBuckets,
},
[]string{"status"},
)
)
// PrometheusTokenBucket is a token bucket with Prometheus metrics
type PrometheusTokenBucket struct {
mu sync.Mutex
tokens float64
maxTokens float64
refillRate float64 // tokens per second
lastRefillTime time.Time
limiterID string
}
// NewPrometheusTokenBucket creates a new token bucket with Prometheus metrics
func NewPrometheusTokenBucket(limiterID string, maxTokens, refillRate float64) *PrometheusTokenBucket {
tb := &PrometheusTokenBucket{
tokens: maxTokens,
maxTokens: maxTokens,
refillRate: refillRate,
lastRefillTime: time.Now(),
limiterID: limiterID,
}
// Update token gauge periodically
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
tb.mu.Lock()
tb.refill()
currentTokens.WithLabelValues(tb.limiterID).Set(tb.tokens)
tb.mu.Unlock()
}
}()
return tb
}
// refill adds tokens to the bucket based on elapsed time
func (tb *PrometheusTokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefillTime).Seconds()
// Calculate tokens to add based on elapsed time
newTokens := elapsed * tb.refillRate
// Update token count, capped at maxTokens
tb.tokens = min(tb.tokens+newTokens, tb.maxTokens)
tb.lastRefillTime = now
}
// Allow checks if a request can proceed and records metrics
func (tb *PrometheusTokenBucket) Allow() bool {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
// This function will be called with the observed duration
// We'll handle the status label in the Allow method
}))
defer timer.ObserveDuration()
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
currentTokens.WithLabelValues(tb.limiterID).Set(tb.tokens)
if tb.tokens >= 1.0 {
tb.tokens--
requestsTotal.WithLabelValues("allowed").Inc()
requestLatency.WithLabelValues("allowed").Observe(timer.ObserveDuration().Seconds())
return true
}
requestsTotal.WithLabelValues("rejected").Inc()
requestLatency.WithLabelValues("rejected").Observe(timer.ObserveDuration().Seconds())
return false
}
// min returns the minimum of two float64 values
func min(a, b float64) float64 {
if a < b {
return a
}
return b
}
func main() {
// Create a token bucket with Prometheus metrics
limiter := NewPrometheusTokenBucket("api_limiter", 10, 5)
// Create a simple handler with rate limiting
http.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}
fmt.Fprintln(w, "API request successful")
})
// Expose Prometheus metrics
http.Handle("/metrics", promhttp.Handler())
// Start the server
fmt.Println("Server starting on :8080...")
fmt.Println("API endpoint: http://localhost:8080/api")
fmt.Println("Metrics endpoint: http://localhost:8080/metrics")
log.Fatal(http.ListenAndServe(":8080", nil))
}
Key Takeaways: Building Effective Rate Limiting Systems
Rate limiting is a critical component of modern distributed systems, providing protection against both malicious attacks and unintentional traffic spikes. Throughout this article, we’ve explored various rate limiting algorithms, implementation strategies, and integration patterns. Let’s summarize the key insights:
-
Algorithm Selection Matters: Different rate limiting algorithms have different trade-offs:
- Token bucket provides good burst handling with smooth rate control
- Leaky bucket enforces a consistent outflow rate
- Sliding window counters provide more accurate limiting across time boundaries
- Fixed window counters are simple but can allow edge-case bursts
-
Distributed Coordination: In multi-node systems, rate limiting requires coordination:
- Redis-based implementations provide a centralized state store
- Lua scripts ensure atomic operations for consistent behavior
- Consider the performance impact of network round-trips
-
Adaptive Strategies: One-size-fits-all rate limits are often suboptimal:
- Load-based adaptation can dynamically adjust limits based on system health
- Time-of-day patterns can anticipate traffic variations
- Multi-tier limiting can differentiate between operation costs
-
Integration Patterns: Rate limiting should be integrated thoughtfully:
- HTTP middleware provides a clean separation of concerns
- Circuit breakers complement rate limiters for enhanced resilience
- Monitoring is essential for tuning and troubleshooting
-
Observability: Effective rate limiting requires comprehensive metrics:
- Track allowed vs. rejected requests
- Monitor current and peak QPS
- Integrate with monitoring systems like Prometheus
By implementing these patterns and practices, you can build robust rate limiting systems that protect your services while providing optimal performance under varying load conditions. Remember that rate limiting is not just about saying “no” to excess traffic—it’s about ensuring fair resource allocation and maintaining system stability for all users.
Rate limiting is a continuous process rather than a one-time implementation. As your system evolves, regularly revisit your rate limiting strategy, analyze metrics, and adjust parameters to ensure it continues to meet your needs. With the patterns and implementations covered in this article, you now have the tools to build sophisticated traffic control systems that can handle the demands of modern high-scale applications.
For services with predictable traffic patterns, time-of-day based rate limiting can be effective:
package main
import (
"fmt"
"sync"
"time"
)
// TimeBasedRateLimiter implements a rate limiter with different limits based on time of day
type TimeBasedRateLimiter struct {
mu sync.Mutex
requestCount int
windowSize time.Duration
windowStart time.Time
limitRules []LimitRule
}
// LimitRule defines a rate limit for a specific time range
type LimitRule struct {
StartHour int
EndHour int
Limit int
}
// NewTimeBasedRateLimiter creates a new time-based rate limiter
func NewTimeBasedRateLimiter(windowSize time.Duration, rules []LimitRule) *TimeBasedRateLimiter {
return &TimeBasedRateLimiter{
windowSize: windowSize,
windowStart: time.Now(),
limitRules: rules,
}
}
// getCurrentLimit determines the appropriate limit based on the current time
func (l *TimeBasedRateLimiter) getCurrentLimit() int {
currentHour := time.Now().Hour()
// Find the matching rule for the current hour
for _, rule := range l.limitRules {
if rule.StartHour <= currentHour && currentHour < rule.EndHour {
return rule.Limit
}
}
// Default limit if no rule matches
return 100
}
// Allow checks if a request should be allowed based on the current time-based rate
func (l *TimeBasedRateLimiter) Allow() bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
// If the window has expired, reset the counter
if now.Sub(l.windowStart) >= l.windowSize {
l.requestCount = 0
l.windowStart = now
}
// Get the current limit based on time of day
currentLimit := l.getCurrentLimit()
// Check if we've reached the limit
if l.requestCount >= currentLimit {
return false
}
// Increment the counter and allow the request
l.requestCount++
return true
}
func main() {
// Define rules for different times of day
rules := []LimitRule{
{StartHour: 0, EndHour: 6, Limit: 50}, // Midnight to 6 AM: 50 RPS
{StartHour: 6, EndHour: 9, Limit: 100}, // 6 AM to 9 AM: 100 RPS
{StartHour: 9, EndHour: 17, Limit: 200}, // 9 AM to 5 PM: 200 RPS
{StartHour: 17, EndHour: 22, Limit: 150}, // 5 PM to 10 PM: 150 RPS
{StartHour: 22, EndHour: 24, Limit: 80}, // 10 PM to Midnight: 80 RPS
}
// Create a time-based rate limiter with a 1-second window
limiter := NewTimeBasedRateLimiter(time.Second, rules)
// Simulate requests
for i := 1; i <= 10; i++ {
allowed := limiter.Allow()
currentHour := time.Now().Hour()
currentLimit := limiter.getCurrentLimit()
fmt.Printf("Request %d: %v (Hour: %d, Limit: %d)\n", i, allowed, currentHour, currentLimit)
}
}
Multi-Tier Rate Limiting
In many applications, different types of requests have different resource costs. Multi-tier rate limiting allows for more granular control:
package main
import (
"fmt"
"sync"
"time"
)
// RequestTier represents the resource cost tier of a request
type RequestTier int
const (
TierLow RequestTier = iota // Low-cost operations (e.g., reads)
TierMedium // Medium-cost operations (e.g., simple writes)
TierHigh // High-cost operations (e.g., complex queries)
)
// TierCost defines the token cost for each tier
var TierCost = map[RequestTier]float64{
TierLow: 1.0,
TierMedium: 5.0,
TierHigh: 10.0,
}
// MultiTierTokenBucket implements a token bucket with different costs per request tier
type MultiTierTokenBucket struct {
mu sync.Mutex
tokens float64
maxTokens float64
refillRate float64 // tokens per second
lastRefillTime time.Time
}
// NewMultiTierTokenBucket creates a new multi-tier token bucket rate limiter
func NewMultiTierTokenBucket(maxTokens, refillRate float64) *MultiTierTokenBucket {
return &MultiTierTokenBucket{
tokens: maxTokens,
maxTokens: maxTokens,
refillRate: refillRate,
lastRefillTime: time.Now(),
}
}
// refill adds tokens to the bucket based on elapsed time
func (tb *MultiTierTokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefillTime).Seconds()
// Calculate tokens to add based on elapsed time
newTokens := elapsed * tb.refillRate
// Update token count, capped at maxTokens
tb.tokens = min(tb.tokens+newTokens, tb.maxTokens)
tb.lastRefillTime = now
}
// Allow checks if a request of the specified tier can proceed
func (tb *MultiTierTokenBucket) Allow(tier RequestTier) bool {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
// Get the cost for this tier
cost := TierCost[tier]
if tb.tokens >= cost {
tb.tokens -= cost
return true
}
return false
}
// min returns the minimum of two float64 values
func min(a, b float64) float64 {
if a < b {
return a
}
return b
}
func main() {
// Create a multi-tier token bucket: 20 max tokens, refill rate of 5 tokens per second
limiter := NewMultiTierTokenBucket(20, 5)
// Simulate different types of requests
requests := []struct {
ID int
Tier RequestTier
}{
{1, TierLow}, // Cost: 1
{2, TierMedium}, // Cost: 5
{3, TierLow}, // Cost: 1
{4, TierHigh}, // Cost: 10
{5, TierMedium}, // Cost: 5
{6, TierLow}, // Cost: 1
}
for _, req := range requests {
allowed := limiter.Allow(req.Tier)
fmt.Printf("Request %d (Tier: %v, Cost: %.1f): %v\n",
req.ID, req.Tier, TierCost[req.Tier], allowed)
// If this was rejected, wait a bit and try again
if !allowed {
fmt.Println("Waiting for token refill...")
time.Sleep(time.Second)
allowed = limiter.Allow(req.Tier)
fmt.Printf("Retry Request %d: %v\n", req.ID, allowed)
}
}
}
Integration with Web Services
Rate limiting is most commonly applied in web services. Let’s explore how to integrate rate limiting into HTTP servers and middleware.
HTTP Middleware for Rate Limiting
Here’s a complete implementation of rate limiting middleware for standard Go HTTP servers:
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
)
// RateLimiterMiddleware provides rate limiting for HTTP handlers
type RateLimiterMiddleware struct {
limiter *IPRateLimiter
}
// NewRateLimiterMiddleware creates a new rate limiter middleware
func NewRateLimiterMiddleware(rps int, burst int) *RateLimiterMiddleware {
return &RateLimiterMiddleware{
limiter: NewIPRateLimiter(rps, burst),
}
}
// Middleware is the HTTP middleware function that applies rate limiting
func (m *RateLimiterMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Get client IP
ip := getClientIP(r)
// Check if the request is allowed
if !m.limiter.Allow(ip) {
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}
// Call the next handler
next.ServeHTTP(w, r)
})
}
// getClientIP extracts the client IP from the request
func getClientIP(r *http.Request) string {
// Try to get IP from X-Forwarded-For header
ip := r.Header.Get("X-Forwarded-For")
if ip != "" {
return ip
}
// Try to get IP from X-Real-IP header
ip = r.Header.Get("X-Real-IP")
if ip != "" {
return ip
}
// Fall back to RemoteAddr
return r.RemoteAddr
}
// IPRateLimiter manages rate limiters for different IP addresses
type IPRateLimiter struct {
mu sync.Mutex
limiters map[string]*TokenBucket
rps int // Requests per second
burst int // Maximum burst size
cleanupInterval time.Duration
lastCleanup time.Time
}
// NewIPRateLimiter creates a new IP-based rate limiter
func NewIPRateLimiter(rps, burst int) *IPRateLimiter {
limiter := &IPRateLimiter{
limiters: make(map[string]*TokenBucket),
rps: rps,
burst: burst,
cleanupInterval: 10 * time.Minute,
lastCleanup: time.Now(),
}
// Start cleanup goroutine
go limiter.cleanup()
return limiter
}
// cleanup periodically removes inactive limiters to prevent memory leaks
func (i *IPRateLimiter) cleanup() {
ticker := time.NewTicker(i.cleanupInterval)
defer ticker.Stop()
for range ticker.C {
i.mu.Lock()
// Remove limiters that haven't been used recently
now := time.Now()
for ip, limiter := range i.limiters {
if now.Sub(limiter.lastUsed) > 30*time.Minute {
delete(i.limiters, ip)
}
}
i.mu.Unlock()
}
}
// Allow checks if a request from the given IP should be allowed
func (i *IPRateLimiter) Allow(ip string) bool {
i.mu.Lock()
// Check if we need to clean up
now := time.Now()
if now.Sub(i.lastCleanup) > i.cleanupInterval {
// We'll do actual cleanup in a separate goroutine
i.lastCleanup = now
}
// Get or create limiter for this IP
limiter, exists := i.limiters[ip]
if !exists {
limiter = NewTokenBucket(float64(i.burst), float64(i.rps))
i.limiters[ip] = limiter
}
i.mu.Unlock()
return limiter.Allow()
}
// TokenBucket implements the token bucket algorithm for rate limiting
type TokenBucket struct {
mu sync.Mutex
tokens float64
maxTokens float64
refillRate float64 // tokens per second
lastRefillTime time.Time
lastUsed time.Time
}
// NewTokenBucket creates a new token bucket rate limiter
func NewTokenBucket(maxTokens, refillRate float64) *TokenBucket {
now := time.Now()
return &TokenBucket{
tokens: maxTokens,
maxTokens: maxTokens,
refillRate: refillRate,
lastRefillTime: now,
lastUsed: now,
}
}
// refill adds tokens to the bucket based on elapsed time
func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefillTime).Seconds()
// Calculate tokens to add based on elapsed time
newTokens := elapsed * tb.refillRate
// Update token count, capped at maxTokens
if newTokens > 0 {
tb.tokens = min(tb.tokens+newTokens, tb.maxTokens)
tb.lastRefillTime = now
}
}
// Allow checks if a request can proceed and consumes a token if allowed
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
tb.lastUsed = time.Now()
if tb.tokens >= 1.0 {
tb.tokens--
return true
}
return false
}
// min returns the minimum of two float64 values
func min(a, b float64) float64 {
if a < b {
return a
}
return b
}
func main() {
// Create a rate limiter middleware: 5 requests per second, burst of 10
rateLimiter := NewRateLimiterMiddleware(5, 10)
// Create a simple handler
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, World!")
})
// Apply the middleware
http.Handle("/", rateLimiter.Middleware(handler))
// Start the server
fmt.Println("Server starting on :8080...")
log.Fatal(http.ListenAndServe(":8080", nil))
}