Conditional Update Pattern
Sometimes we need to update a value only if it meets certain conditions:
func conditionalUpdate() {
var value atomic.Int64
value.Store(10)
// Update value only if it meets a condition
conditionalIncrement := func(max int64) bool {
for {
current := value.Load()
// Check condition
if current >= max {
// Condition not met, don't update
return false
}
// Compute new value
new := current + 1
// Try to update
if value.CompareAndSwap(current, new) {
// Success! Value was updated
return true
}
// If CAS failed, retry
}
}
fmt.Printf("\nConditional update pattern:\n")
fmt.Printf(" Initial value: %d\n", value.Load())
// Try to increment up to 15
for i := 0; i < 10; i++ {
result := conditionalIncrement(15)
fmt.Printf(" Increment attempt %d: %v, value = %d\n",
i+1, result, value.Load())
}
}
This example demonstrates a conditional update pattern, where a value is updated only if it meets certain conditions. This pattern is useful for implementing bounded counters, rate limiters, and other constrained data structures.
Multiple Field Update Pattern
Updating multiple fields atomically requires careful design:
func multipleFieldUpdate() {
// Structure with multiple fields
type Counter struct {
hits int64
total int64
}
// Atomic pointer to the structure
var counterPtr atomic.Pointer[Counter]
counterPtr.Store(&Counter{})
// Update multiple fields atomically
updateCounter := func(hit bool, value int64) {
for {
// Get the current counter
current := counterPtr.Load()
// Create a new counter with updated values
new := &Counter{
hits: current.hits,
total: current.total + value,
}
if hit {
new.hits++
}
// Try to update the pointer
if counterPtr.CompareAndSwap(current, new) {
// Success! Counter was updated
return
}
// If CAS failed, retry
}
}
// Test concurrent updates
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// Every third operation is a hit
updateCounter(i%3 == 0, int64(i))
}(i)
}
wg.Wait()
// Get final counter
final := counterPtr.Load()
fmt.Printf("\nMultiple field update pattern:\n")
fmt.Printf(" Hits: %d\n", final.hits)
fmt.Printf(" Total: %d\n", final.total)
}
This example demonstrates how to atomically update multiple fields using an atomic pointer to a structure. By creating a new structure with the updated values and atomically swapping the pointer, we can ensure that all fields are updated atomically.
Performance Analysis and Benchmarking
To understand when to use atomic operations versus traditional locks, we need to analyze their performance characteristics under different scenarios.
Atomic vs. Mutex Benchmarks
Let’s benchmark atomic operations against mutex-based synchronization:
package main
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
// Counter implementations
// AtomicCounter uses atomic operations
type AtomicCounter struct {
value atomic.Int64
}
func (c *AtomicCounter) Increment() {
c.value.Add(1)
}
func (c *AtomicCounter) Value() int64 {
return c.value.Load()
}
// MutexCounter uses a mutex
type MutexCounter struct {
value int64
mu sync.Mutex
}
func (c *MutexCounter) Increment() {
c.mu.Lock()
c.value++
c.mu.Unlock()
}
func (c *MutexCounter) Value() int64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
// RWMutexCounter uses a read-write mutex
type RWMutexCounter struct {
value int64
mu sync.RWMutex
}
func (c *RWMutexCounter) Increment() {
c.mu.Lock()
c.value++
c.mu.Unlock()
}
func (c *RWMutexCounter) Value() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.value
}
// Benchmark functions
func BenchmarkAtomicIncrement(b *testing.B) {
counter := AtomicCounter{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
counter.Increment()
}
})
}
func BenchmarkMutexIncrement(b *testing.B) {
counter := MutexCounter{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
counter.Increment()
}
})
}
func BenchmarkRWMutexIncrement(b *testing.B) {
counter := RWMutexCounter{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
counter.Increment()
}
})
}
func BenchmarkAtomicRead(b *testing.B) {
counter := AtomicCounter{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = counter.Value()
}
})
}
func BenchmarkMutexRead(b *testing.B) {
counter := MutexCounter{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = counter.Value()
}
})
}
func BenchmarkRWMutexRead(b *testing.B) {
counter := RWMutexCounter{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = counter.Value()
}
})
}
// Mixed read/write benchmarks (95% reads, 5% writes)
func BenchmarkAtomicMixed(b *testing.B) {
counter := AtomicCounter{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if fastrand()%100 < 5 {
counter.Increment()
} else {
_ = counter.Value()
}
}
})
}
func BenchmarkMutexMixed(b *testing.B) {
counter := MutexCounter{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if fastrand()%100 < 5 {
counter.Increment()
} else {
_ = counter.Value()
}
}
})
}
func BenchmarkRWMutexMixed(b *testing.B) {
counter := RWMutexCounter{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if fastrand()%100 < 5 {
counter.Increment()
} else {
_ = counter.Value()
}
}
})
}
// Simple fast random number generator for benchmark
var randState uint32 = 1
func fastrand() uint32 {
x := randState
x ^= x << 13
x ^= x >> 17
x ^= x << 5
randState = x
return x
}
func main() {
// Run benchmarks and print results
fmt.Println("Running benchmarks...")
// Define benchmarks to run
benchmarks := []struct {
name string
fn func(*testing.B)
}{
{"AtomicIncrement", BenchmarkAtomicIncrement},
{"MutexIncrement", BenchmarkMutexIncrement},
{"RWMutexIncrement", BenchmarkRWMutexIncrement},
{"AtomicRead", BenchmarkAtomicRead},
{"MutexRead", BenchmarkMutexRead},
{"RWMutexRead", BenchmarkRWMutexRead},
{"AtomicMixed", BenchmarkAtomicMixed},
{"MutexMixed", BenchmarkMutexMixed},
{"RWMutexMixed", BenchmarkRWMutexMixed},
}
// Run each benchmark
results := make(map[string]testing.BenchmarkResult)
for _, bm := range benchmarks {
result := testing.Benchmark(bm.fn)
results[bm.name] = result
}
// Print results in a table
fmt.Println("\nBenchmark Results:")
fmt.Println("-----------------------------------------------------------")
fmt.Printf("%-20s %-15s %-15s %-15s\n", "Benchmark", "Operations", "Time/Op", "Allocs/Op")
fmt.Println("-----------------------------------------------------------")
for _, bm := range benchmarks {
result := results[bm.name]
fmt.Printf("%-20s %-15d %-15s %-15d\n",
bm.name,
result.N,
result.T.Round(time.Nanosecond)/time.Duration(result.N),
result.AllocsPerOp())
}
fmt.Println("-----------------------------------------------------------")
fmt.Println("Note: Lower Time/Op is better")
// Print comparative analysis
fmt.Println("\nComparative Analysis:")
// Compare increment operations
atomicInc := results["AtomicIncrement"].T.Nanoseconds() / int64(results["AtomicIncrement"].N)
mutexInc := results["MutexIncrement"].T.Nanoseconds() / int64(results["MutexIncrement"].N)
rwmutexInc := results["RWMutexIncrement"].T.Nanoseconds() / int64(results["RWMutexIncrement"].N)
fmt.Printf("Increment: Atomic is %.2fx faster than Mutex\n", float64(mutexInc)/float64(atomicInc))
fmt.Printf("Increment: Atomic is %.2fx faster than RWMutex\n", float64(rwmutexInc)/float64(atomicInc))
// Compare read operations
atomicRead := results["AtomicRead"].T.Nanoseconds() / int64(results["AtomicRead"].N)
mutexRead := results["MutexRead"].T.Nanoseconds() / int64(results["MutexRead"].N)
rwmutexRead := results["RWMutexRead"].T.Nanoseconds() / int64(results["RWMutexRead"].N)
fmt.Printf("Read: Atomic is %.2fx faster than Mutex\n", float64(mutexRead)/float64(atomicRead))
fmt.Printf("Read: Atomic is %.2fx faster than RWMutex\n", float64(rwmutexRead)/float64(atomicRead))
// Compare mixed operations
atomicMixed := results["AtomicMixed"].T.Nanoseconds() / int64(results["AtomicMixed"].N)
mutexMixed := results["MutexMixed"].T.Nanoseconds() / int64(results["MutexMixed"].N)
rwmutexMixed := results["RWMutexMixed"].T.Nanoseconds() / int64(results["RWMutexMixed"].N)
fmt.Printf("Mixed (95%% reads): Atomic is %.2fx faster than Mutex\n", float64(mutexMixed)/float64(atomicMixed))
fmt.Printf("Mixed (95%% reads): Atomic is %.2fx faster than RWMutex\n", float64(rwmutexMixed)/float64(atomicMixed))
// Analyze contention scenarios
analyzeContention()
}
// Analyze performance under different contention scenarios
func analyzeContention() {
fmt.Println("\nContention Analysis:")
// Test different numbers of goroutines
goroutineCounts := []int{1, 2, 4, 8, 16, 32, 64}
fmt.Println("Operations per second (higher is better):")
fmt.Printf("%-10s %-15s %-15s %-15s\n", "Goroutines", "Atomic", "Mutex", "RWMutex")
fmt.Println("--------------------------------------------------")
for _, count := range goroutineCounts {
atomic := benchmarkContention(&AtomicCounter{}, count)
mutex := benchmarkContention(&MutexCounter{}, count)
rwmutex := benchmarkContention(&RWMutexCounter{}, count)
fmt.Printf("%-10d %-15d %-15d %-15d\n", count, atomic, mutex, rwmutex)
}
}
// Interface for counters
type Counter interface {
Increment()
Value() int64
}
// Benchmark counter performance under contention
func benchmarkContention(counter Counter, goroutines int) int {
const duration = 100 * time.Millisecond
var ops int64
// Start goroutines
var wg sync.WaitGroup
start := time.Now()
end := start.Add(duration)
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
var localOps int64
for time.Now().Before(end) {
counter.Increment()
localOps++
}
atomic.AddInt64(&ops, localOps)
}()
}
wg.Wait()
// Calculate operations per second
elapsed := time.Since(start)
opsPerSecond := int(float64(ops) / elapsed.Seconds())
return opsPerSecond
}
The benchmark results typically show that atomic operations are significantly faster than mutex-based synchronization, especially under high contention. For example:
- Increment Operations: Atomic increments are often 5-10x faster than mutex-protected increments.
- Read Operations: Atomic reads are typically 3-5x faster than mutex-protected reads, and 2-3x faster than RWMutex-protected reads.
- Mixed Workloads: For workloads with a mix of reads and writes, atomic operations maintain their performance advantage, though the gap narrows as the read percentage increases.
- Scaling with Contention: As the number of goroutines increases, the performance gap between atomic operations and mutex-based synchronization widens, demonstrating the superior scalability of atomic operations.