Leaky Bucket Algorithm

The leaky bucket algorithm models rate limiting as a bucket with a hole in the bottom. Requests fill the bucket, and they “leak” out at a constant rate. If the bucket overflows, new requests are rejected.

package main

import (
	"fmt"
	"sync"
	"time"
)

// LeakyBucket implements the leaky bucket algorithm for rate limiting
type LeakyBucket struct {
	mu           sync.Mutex
	capacity     int           // Maximum bucket capacity
	remaining    int           // Current bucket level
	leakRate     float64       // Items per second
	lastLeakTime time.Time
}

// NewLeakyBucket creates a new leaky bucket rate limiter
func NewLeakyBucket(capacity int, leakRate float64) *LeakyBucket {
	return &LeakyBucket{
		capacity:     capacity,
		remaining:    0,
		leakRate:     leakRate,
		lastLeakTime: time.Now(),
	}
}

// leak removes items from the bucket based on elapsed time
func (lb *LeakyBucket) leak() {
	now := time.Now()
	elapsed := now.Sub(lb.lastLeakTime).Seconds()
	
	// Calculate items to leak based on elapsed time
	leakAmount := int(elapsed * lb.leakRate)
	
	// Update bucket level, ensuring it doesn't go below zero
	if leakAmount > 0 {
		lb.remaining = max(0, lb.remaining-leakAmount)
		lb.lastLeakTime = now
	}
}

// Allow checks if a request can be added to the bucket
func (lb *LeakyBucket) Allow() bool {
	lb.mu.Lock()
	defer lb.mu.Unlock()
	
	lb.leak()
	
	// If bucket is full, reject the request
	if lb.remaining >= lb.capacity {
		return false
	}
	
	// Add the request to the bucket
	lb.remaining++
	return true
}

// max returns the maximum of two integers
func max(a, b int) int {
	if a > b {
		return a
	}
	return b
}

func main() {
	// Create a leaky bucket: capacity 5, leak rate 2 per second
	limiter := NewLeakyBucket(5, 2)
	
	// Simulate 7 requests in quick succession
	for i := 1; i <= 7; i++ {
		allowed := limiter.Allow()
		fmt.Printf("Request %d: %v\n", i, allowed)
	}
	
	// Wait for some requests to leak out
	fmt.Println("Waiting for leakage...")
	time.Sleep(2 * time.Second) // Wait for ~4 requests to leak
	
	// Try 5 more requests
	for i := 8; i <= 12; i++ {
		allowed := limiter.Allow()
		fmt.Printf("Request %d: %v\n", i, allowed)
	}
}

The leaky bucket algorithm is particularly useful for:

  1. Traffic Shaping: It enforces a consistent outflow rate
  2. Queue Management: It can be extended to queue requests rather than reject them
  3. Network Traffic: It’s well-suited for network packet shaping

Sliding Window Counter

The sliding window counter algorithm addresses the edge effects of fixed windows by considering a weighted average of the current and previous windows:

package main

import (
	"fmt"
	"sync"
	"time"
)

// SlidingWindowCounter implements a sliding window rate limiter
type SlidingWindowCounter struct {
	mu              sync.Mutex
	limit           int
	windowSize      time.Duration
	previousCount   int
	currentCount    int
	windowStart     time.Time
}

// NewSlidingWindowCounter creates a new sliding window counter rate limiter
func NewSlidingWindowCounter(limit int, windowSize time.Duration) *SlidingWindowCounter {
	return &SlidingWindowCounter{
		limit:       limit,
		windowSize:  windowSize,
		windowStart: time.Now(),
	}
}

// Allow checks if a request should be allowed based on the sliding window calculation
func (sw *SlidingWindowCounter) Allow() bool {
	sw.mu.Lock()
	defer sw.mu.Unlock()
	
	now := time.Now()
	elapsed := now.Sub(sw.windowStart)
	
	// If the window has expired, shift the window
	if elapsed >= sw.windowSize {
		// Calculate how many complete windows have passed
		completeWindows := int(elapsed / sw.windowSize)
		
		// If more than one window has passed, reset all counts
		if completeWindows > 1 {
			sw.previousCount = 0
			sw.currentCount = 0
		} else {
			// Shift window by one
			sw.previousCount = sw.currentCount
			sw.currentCount = 0
		}
		
		// Update window start time
		sw.windowStart = sw.windowStart.Add(time.Duration(completeWindows) * sw.windowSize)
		elapsed = now.Sub(sw.windowStart)
	}
	
	// Calculate the position within the current window (0.0 to 1.0)
	windowPosition := float64(elapsed) / float64(sw.windowSize)
	
	// Calculate the weighted rate: 
	// (previous_count * (1 - position) + current_count)
	weightedCount := float64(sw.previousCount) * (1 - windowPosition) + float64(sw.currentCount)
	
	// Check if we've reached the limit
	if int(weightedCount) >= sw.limit {
		return false
	}
	
	// Increment the counter and allow the request
	sw.currentCount++
	return true
}

func main() {
	// Create a sliding window counter: 10 requests per minute
	limiter := NewSlidingWindowCounter(10, time.Minute)
	
	// Simulate 8 requests in the first half of the window
	for i := 1; i <= 8; i++ {
		allowed := limiter.Allow()
		fmt.Printf("Request %d: %v\n", i, allowed)
	}
	
	// Simulate moving to the next window (30 seconds later)
	limiter.windowStart = limiter.windowStart.Add(-30 * time.Second)
	fmt.Println("Time elapsed: 30 seconds (50% through window)")
	
	// Try 5 more requests
	// With sliding window, we should allow only about 2-3 more requests
	// because we're counting 50% of the previous window (8 requests)
	for i := 9; i <= 13; i++ {
		allowed := limiter.Allow()
		fmt.Printf("Request %d: %v\n", i, allowed)
	}
}

The sliding window counter provides a more accurate rate limiting approach by:

  1. Smoothing Boundaries: Eliminating the edge effects of fixed windows
  2. Accurate Limiting: Providing a more consistent rate limit across window boundaries
  3. Memory Efficiency: Requiring only two counters per client

Fixed Window Counter with Cell-Based Storage

For systems with many clients, memory usage becomes a concern. A cell-based approach can optimize storage:

package main

import (
	"fmt"
	"sync"
	"time"
)

// Cell represents a time window cell with a request count
type Cell struct {
	timestamp time.Time
	count     int
}

// CellBasedRateLimiter implements a memory-efficient rate limiter
type CellBasedRateLimiter struct {
	mu          sync.Mutex
	clients     map[string]*Cell
	limit       int
	windowSize  time.Duration
	cleanupInterval time.Duration
	lastCleanup time.Time
}

// NewCellBasedRateLimiter creates a new cell-based rate limiter
func NewCellBasedRateLimiter(limit int, windowSize time.Duration) *CellBasedRateLimiter {
	return &CellBasedRateLimiter{
		clients:     make(map[string]*Cell),
		limit:       limit,
		windowSize:  windowSize,
		cleanupInterval: windowSize * 2, // Clean up every 2 window periods
		lastCleanup: time.Now(),
	}
}

// Allow checks if a request from a specific client should be allowed
func (cb *CellBasedRateLimiter) Allow(clientID string) bool {
	cb.mu.Lock()
	defer cb.mu.Unlock()
	
	now := time.Now()
	
	// Periodically clean up expired client entries
	if now.Sub(cb.lastCleanup) >= cb.cleanupInterval {
		cb.cleanup(now)
		cb.lastCleanup = now
	}
	
	// Get or create client cell
	cell, exists := cb.clients[clientID]
	if !exists || now.Sub(cell.timestamp) >= cb.windowSize {
		// Create new cell or reset expired cell
		cb.clients[clientID] = &Cell{
			timestamp: now,
			count:     1,
		}
		return true
	}
	
	// Check if client has reached the limit
	if cell.count >= cb.limit {
		return false
	}
	
	// Increment the counter and allow the request
	cell.count++
	return true
}

// cleanup removes expired client entries to free memory
func (cb *CellBasedRateLimiter) cleanup(now time.Time) {
	for clientID, cell := range cb.clients {
		if now.Sub(cell.timestamp) >= cb.windowSize {
			delete(cb.clients, clientID)
		}
	}
}

func main() {
	// Create a cell-based rate limiter: 5 requests per minute per client
	limiter := NewCellBasedRateLimiter(5, time.Minute)
	
	// Simulate requests from different clients
	clients := []string{"client1", "client2", "client3"}
	
	for _, client := range clients {
		fmt.Printf("Testing client: %s\n", client)
		
		// Try 7 requests per client
		for i := 1; i <= 7; i++ {
			allowed := limiter.Allow(client)
			fmt.Printf("  Request %d: %v\n", i, allowed)
		}
	}
	
	// Simulate time passing and cleanup
	fmt.Println("\nSimulating time passing (2 minutes)...")
	limiter.lastCleanup = limiter.lastCleanup.Add(-2 * time.Minute)
	
	// Try client1 again after cleanup
	fmt.Println("Testing client1 again:")
	for i := 1; i <= 3; i++ {
		allowed := limiter.Allow("client1")
		fmt.Printf("  Request %d: %v\n", i, allowed)
	}
}

This implementation is particularly useful for systems with:

  1. Many Clients: Efficiently handles large numbers of distinct clients
  2. Memory Constraints: Automatically cleans up expired entries
  3. High Throughput: Minimizes lock contention with efficient data structures