Leaky Bucket Algorithm
The leaky bucket algorithm models rate limiting as a bucket with a hole in the bottom. Requests fill the bucket, and they “leak” out at a constant rate. If the bucket overflows, new requests are rejected.
package main
import (
"fmt"
"sync"
"time"
)
// LeakyBucket implements the leaky bucket algorithm for rate limiting
type LeakyBucket struct {
mu sync.Mutex
capacity int // Maximum bucket capacity
remaining int // Current bucket level
leakRate float64 // Items per second
lastLeakTime time.Time
}
// NewLeakyBucket creates a new leaky bucket rate limiter
func NewLeakyBucket(capacity int, leakRate float64) *LeakyBucket {
return &LeakyBucket{
capacity: capacity,
remaining: 0,
leakRate: leakRate,
lastLeakTime: time.Now(),
}
}
// leak removes items from the bucket based on elapsed time
func (lb *LeakyBucket) leak() {
now := time.Now()
elapsed := now.Sub(lb.lastLeakTime).Seconds()
// Calculate items to leak based on elapsed time
leakAmount := int(elapsed * lb.leakRate)
// Update bucket level, ensuring it doesn't go below zero
if leakAmount > 0 {
lb.remaining = max(0, lb.remaining-leakAmount)
lb.lastLeakTime = now
}
}
// Allow checks if a request can be added to the bucket
func (lb *LeakyBucket) Allow() bool {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.leak()
// If bucket is full, reject the request
if lb.remaining >= lb.capacity {
return false
}
// Add the request to the bucket
lb.remaining++
return true
}
// max returns the maximum of two integers
func max(a, b int) int {
if a > b {
return a
}
return b
}
func main() {
// Create a leaky bucket: capacity 5, leak rate 2 per second
limiter := NewLeakyBucket(5, 2)
// Simulate 7 requests in quick succession
for i := 1; i <= 7; i++ {
allowed := limiter.Allow()
fmt.Printf("Request %d: %v\n", i, allowed)
}
// Wait for some requests to leak out
fmt.Println("Waiting for leakage...")
time.Sleep(2 * time.Second) // Wait for ~4 requests to leak
// Try 5 more requests
for i := 8; i <= 12; i++ {
allowed := limiter.Allow()
fmt.Printf("Request %d: %v\n", i, allowed)
}
}
The leaky bucket algorithm is particularly useful for:
- Traffic Shaping: It enforces a consistent outflow rate
- Queue Management: It can be extended to queue requests rather than reject them
- Network Traffic: It’s well-suited for network packet shaping
Sliding Window Counter
The sliding window counter algorithm addresses the edge effects of fixed windows by considering a weighted average of the current and previous windows:
package main
import (
"fmt"
"sync"
"time"
)
// SlidingWindowCounter implements a sliding window rate limiter
type SlidingWindowCounter struct {
mu sync.Mutex
limit int
windowSize time.Duration
previousCount int
currentCount int
windowStart time.Time
}
// NewSlidingWindowCounter creates a new sliding window counter rate limiter
func NewSlidingWindowCounter(limit int, windowSize time.Duration) *SlidingWindowCounter {
return &SlidingWindowCounter{
limit: limit,
windowSize: windowSize,
windowStart: time.Now(),
}
}
// Allow checks if a request should be allowed based on the sliding window calculation
func (sw *SlidingWindowCounter) Allow() bool {
sw.mu.Lock()
defer sw.mu.Unlock()
now := time.Now()
elapsed := now.Sub(sw.windowStart)
// If the window has expired, shift the window
if elapsed >= sw.windowSize {
// Calculate how many complete windows have passed
completeWindows := int(elapsed / sw.windowSize)
// If more than one window has passed, reset all counts
if completeWindows > 1 {
sw.previousCount = 0
sw.currentCount = 0
} else {
// Shift window by one
sw.previousCount = sw.currentCount
sw.currentCount = 0
}
// Update window start time
sw.windowStart = sw.windowStart.Add(time.Duration(completeWindows) * sw.windowSize)
elapsed = now.Sub(sw.windowStart)
}
// Calculate the position within the current window (0.0 to 1.0)
windowPosition := float64(elapsed) / float64(sw.windowSize)
// Calculate the weighted rate:
// (previous_count * (1 - position) + current_count)
weightedCount := float64(sw.previousCount) * (1 - windowPosition) + float64(sw.currentCount)
// Check if we've reached the limit
if int(weightedCount) >= sw.limit {
return false
}
// Increment the counter and allow the request
sw.currentCount++
return true
}
func main() {
// Create a sliding window counter: 10 requests per minute
limiter := NewSlidingWindowCounter(10, time.Minute)
// Simulate 8 requests in the first half of the window
for i := 1; i <= 8; i++ {
allowed := limiter.Allow()
fmt.Printf("Request %d: %v\n", i, allowed)
}
// Simulate moving to the next window (30 seconds later)
limiter.windowStart = limiter.windowStart.Add(-30 * time.Second)
fmt.Println("Time elapsed: 30 seconds (50% through window)")
// Try 5 more requests
// With sliding window, we should allow only about 2-3 more requests
// because we're counting 50% of the previous window (8 requests)
for i := 9; i <= 13; i++ {
allowed := limiter.Allow()
fmt.Printf("Request %d: %v\n", i, allowed)
}
}
The sliding window counter provides a more accurate rate limiting approach by:
- Smoothing Boundaries: Eliminating the edge effects of fixed windows
- Accurate Limiting: Providing a more consistent rate limit across window boundaries
- Memory Efficiency: Requiring only two counters per client
Fixed Window Counter with Cell-Based Storage
For systems with many clients, memory usage becomes a concern. A cell-based approach can optimize storage:
package main
import (
"fmt"
"sync"
"time"
)
// Cell represents a time window cell with a request count
type Cell struct {
timestamp time.Time
count int
}
// CellBasedRateLimiter implements a memory-efficient rate limiter
type CellBasedRateLimiter struct {
mu sync.Mutex
clients map[string]*Cell
limit int
windowSize time.Duration
cleanupInterval time.Duration
lastCleanup time.Time
}
// NewCellBasedRateLimiter creates a new cell-based rate limiter
func NewCellBasedRateLimiter(limit int, windowSize time.Duration) *CellBasedRateLimiter {
return &CellBasedRateLimiter{
clients: make(map[string]*Cell),
limit: limit,
windowSize: windowSize,
cleanupInterval: windowSize * 2, // Clean up every 2 window periods
lastCleanup: time.Now(),
}
}
// Allow checks if a request from a specific client should be allowed
func (cb *CellBasedRateLimiter) Allow(clientID string) bool {
cb.mu.Lock()
defer cb.mu.Unlock()
now := time.Now()
// Periodically clean up expired client entries
if now.Sub(cb.lastCleanup) >= cb.cleanupInterval {
cb.cleanup(now)
cb.lastCleanup = now
}
// Get or create client cell
cell, exists := cb.clients[clientID]
if !exists || now.Sub(cell.timestamp) >= cb.windowSize {
// Create new cell or reset expired cell
cb.clients[clientID] = &Cell{
timestamp: now,
count: 1,
}
return true
}
// Check if client has reached the limit
if cell.count >= cb.limit {
return false
}
// Increment the counter and allow the request
cell.count++
return true
}
// cleanup removes expired client entries to free memory
func (cb *CellBasedRateLimiter) cleanup(now time.Time) {
for clientID, cell := range cb.clients {
if now.Sub(cell.timestamp) >= cb.windowSize {
delete(cb.clients, clientID)
}
}
}
func main() {
// Create a cell-based rate limiter: 5 requests per minute per client
limiter := NewCellBasedRateLimiter(5, time.Minute)
// Simulate requests from different clients
clients := []string{"client1", "client2", "client3"}
for _, client := range clients {
fmt.Printf("Testing client: %s\n", client)
// Try 7 requests per client
for i := 1; i <= 7; i++ {
allowed := limiter.Allow(client)
fmt.Printf(" Request %d: %v\n", i, allowed)
}
}
// Simulate time passing and cleanup
fmt.Println("\nSimulating time passing (2 minutes)...")
limiter.lastCleanup = limiter.lastCleanup.Add(-2 * time.Minute)
// Try client1 again after cleanup
fmt.Println("Testing client1 again:")
for i := 1; i <= 3; i++ {
allowed := limiter.Allow("client1")
fmt.Printf(" Request %d: %v\n", i, allowed)
}
}
This implementation is particularly useful for systems with:
- Many Clients: Efficiently handles large numbers of distinct clients
- Memory Constraints: Automatically cleans up expired entries
- High Throughput: Minimizes lock contention with efficient data structures