Distributed Rate Limiting Patterns
In distributed systems, rate limiting becomes more complex as requests may be processed across multiple servers. Let’s explore patterns for coordinating rate limits across a distributed environment.
Redis-Based Distributed Rate Limiter
Redis is commonly used for distributed rate limiting due to its atomic operations and high performance. Here’s an implementation using the Redis-based token bucket algorithm:
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
// RedisTokenBucket implements a distributed token bucket algorithm using Redis
type RedisTokenBucket struct {
client *redis.Client
keyPrefix string
maxTokens int
refillRate float64 // tokens per second
tokenExpiry time.Duration
}
// NewRedisTokenBucket creates a new Redis-based token bucket rate limiter
func NewRedisTokenBucket(client *redis.Client, keyPrefix string, maxTokens int, refillRate float64) *RedisTokenBucket {
return &RedisTokenBucket{
client: client,
keyPrefix: keyPrefix,
maxTokens: maxTokens,
refillRate: refillRate,
tokenExpiry: time.Hour, // Keys expire after 1 hour of inactivity
}
}
// Allow checks if a request from a specific client should be allowed
func (rtb *RedisTokenBucket) Allow(ctx context.Context, clientID string) (bool, error) {
// Keys for storing the token count and last refill time
tokenKey := fmt.Sprintf("%s:%s:tokens", rtb.keyPrefix, clientID)
timestampKey := fmt.Sprintf("%s:%s:ts", rtb.keyPrefix, clientID)
// Use Redis MULTI/EXEC to ensure atomicity
txf := func(tx *redis.Tx) error {
// Get current token count and last refill timestamp
tokensCmd := tx.Get(ctx, tokenKey)
timestampCmd := tx.Get(ctx, timestampKey)
var tokens float64
var lastRefillTime time.Time
// Handle token count
if tokensCmd.Err() == redis.Nil {
// Key doesn't exist, initialize with max tokens
tokens = float64(rtb.maxTokens)
} else if tokensCmd.Err() != nil {
return tokensCmd.Err()
} else {
// Parse existing token count
var err error
tokens, err = tokensCmd.Float64()
if err != nil {
return err
}
}
// Handle timestamp
now := time.Now()
if timestampCmd.Err() == redis.Nil {
// Key doesn't exist, use current time
lastRefillTime = now
} else if timestampCmd.Err() != nil {
return timestampCmd.Err()
} else {
// Parse existing timestamp
ts, err := timestampCmd.Int64()
if err != nil {
return err
}
lastRefillTime = time.Unix(0, ts)
}
// Calculate token refill
elapsed := now.Sub(lastRefillTime).Seconds()
newTokens := tokens + (elapsed * rtb.refillRate)
if newTokens > float64(rtb.maxTokens) {
newTokens = float64(rtb.maxTokens)
}
// Check if we have enough tokens
if newTokens < 1 {
// Not enough tokens, update values but return false
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, tokenKey, newTokens, rtb.tokenExpiry)
pipe.Set(ctx, timestampKey, now.UnixNano(), rtb.tokenExpiry)
return nil
})
if err != nil {
return err
}
return nil
}
// We have enough tokens, consume one
newTokens--
// Update values in Redis
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, tokenKey, newTokens, rtb.tokenExpiry)
pipe.Set(ctx, timestampKey, now.UnixNano(), rtb.tokenExpiry)
return nil
})
return err
}
// Execute the transaction with optimistic locking
for i := 0; i < 3; i++ { // Retry up to 3 times
err := rtb.client.Watch(ctx, txf, tokenKey, timestampKey)
if err == nil {
return true, nil
}
if err != redis.TxFailedErr {
return false, err
}
// If we got TxFailedErr, retry
}
return false, fmt.Errorf("failed to execute Redis transaction after retries")
}
func main() {
// This is a demonstration - in a real application, you would configure Redis properly
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
// Create a Redis-based token bucket: 10 max tokens, refill rate of 1 token per second
limiter := NewRedisTokenBucket(rdb, "ratelimit", 10, 1)
ctx := context.Background()
// Simulate requests from a client
clientID := "user123"
// Try 12 requests in quick succession
for i := 1; i <= 12; i++ {
allowed, err := limiter.Allow(ctx, clientID)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fmt.Printf("Request %d: %v\n", i, allowed)
}
// Wait for some tokens to refill
fmt.Println("Waiting for token refill...")
time.Sleep(5 * time.Second)
// Try 5 more requests
for i := 13; i <= 17; i++ {
allowed, err := limiter.Allow(ctx, clientID)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fmt.Printf("Request %d: %v\n", i, allowed)
}
}
This Redis-based implementation provides several advantages for distributed environments:
- Consistency: All servers share the same rate limit state
- Scalability: Redis can handle high throughput and many clients
- Persistence: Rate limit state can survive service restarts
- Low Latency: Redis operations are typically sub-millisecond
Distributed Rate Limiting with Lua Scripts
For even better performance and atomicity, we can use Redis Lua scripts:
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
// RedisScriptLimiter implements rate limiting using Redis Lua scripts
type RedisScriptLimiter struct {
client *redis.Client
keyPrefix string
windowSize time.Duration
limit int
luaScript *redis.Script
}
// NewRedisScriptLimiter creates a new Redis-based rate limiter using Lua scripts
func NewRedisScriptLimiter(client *redis.Client, keyPrefix string, limit int, windowSize time.Duration) *RedisScriptLimiter {
// Lua script for sliding window rate limiting
// KEYS[1] - The Redis key to use for this rate limit
// ARGV[1] - The current timestamp in milliseconds
// ARGV[2] - The window size in milliseconds
// ARGV[3] - The maximum number of requests allowed in the window
luaScript := redis.NewScript(`
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
-- Clean up old requests outside the current window
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- Count requests in the current window
local count = redis.call('ZCARD', key)
-- If we're under the limit, add the current request and return allowed
if count < limit then
redis.call('ZADD', key, now, now .. '-' .. math.random())
redis.call('EXPIRE', key, math.ceil(window/1000))
return 1
end
-- We're over the limit
return 0
`)
return &RedisScriptLimiter{
client: client,
keyPrefix: keyPrefix,
windowSize: windowSize,
limit: limit,
luaScript: luaScript,
}
}
// Allow checks if a request from a specific client should be allowed
func (rsl *RedisScriptLimiter) Allow(ctx context.Context, clientID string) (bool, error) {
key := fmt.Sprintf("%s:%s", rsl.keyPrefix, clientID)
now := time.Now().UnixNano() / int64(time.Millisecond)
windowMs := int64(rsl.windowSize / time.Millisecond)
// Execute the Lua script
result, err := rsl.luaScript.Run(ctx, rsl.client, []string{key}, now, windowMs, rsl.limit).Int()
if err != nil {
return false, err
}
return result == 1, nil
}
func main() {
// This is a demonstration - in a real application, you would configure Redis properly
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
// Create a Redis-based rate limiter: 5 requests per minute per client
limiter := NewRedisScriptLimiter(rdb, "ratelimit", 5, time.Minute)
ctx := context.Background()
// Simulate requests from a client
clientID := "user123"
// Try 7 requests in quick succession
for i := 1; i <= 7; i++ {
allowed, err := limiter.Allow(ctx, clientID)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fmt.Printf("Request %d: %v\n", i, allowed)
}
}
Using Lua scripts provides several benefits:
- Atomicity: The entire rate limiting logic executes in a single atomic operation
- Performance: Reduced network round-trips between application and Redis
- Consistency: Logic runs entirely on the Redis server, eliminating race conditions