Atomic Counters and Accumulators
Atomic operations are commonly used to implement high-performance counters and accumulators:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// AtomicCounter provides a thread-safe counter
type AtomicCounter struct {
value int64
}
func (c *AtomicCounter) Increment() {
atomic.AddInt64(&c.value, 1)
}
func (c *AtomicCounter) Add(n int64) {
atomic.AddInt64(&c.value, n)
}
func (c *AtomicCounter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
// AtomicAccumulator accumulates values with min/max tracking
type AtomicAccumulator struct {
sum int64
count int64
min int64
max int64
}
func NewAtomicAccumulator() *AtomicAccumulator {
return &AtomicAccumulator{
min: int64(^uint64(0) >> 1), // Max int64
max: -int64(^uint64(0) >> 1) - 1, // Min int64
}
}
func (a *AtomicAccumulator) Add(value int64) {
atomic.AddInt64(&a.sum, value)
atomic.AddInt64(&a.count, 1)
// Update minimum (if value is smaller)
for {
currentMin := atomic.LoadInt64(&a.min)
if value >= currentMin {
break // Current value is not smaller than min
}
if atomic.CompareAndSwapInt64(&a.min, currentMin, value) {
break // Successfully updated min
}
// If CAS failed, another thread updated min; try again
}
// Update maximum (if value is larger)
for {
currentMax := atomic.LoadInt64(&a.max)
if value <= currentMax {
break // Current value is not larger than max
}
if atomic.CompareAndSwapInt64(&a.max, currentMax, value) {
break // Successfully updated max
}
// If CAS failed, another thread updated max; try again
}
}
func (a *AtomicAccumulator) Stats() (sum, count, min, max int64) {
sum = atomic.LoadInt64(&a.sum)
count = atomic.LoadInt64(&a.count)
min = atomic.LoadInt64(&a.min)
max = atomic.LoadInt64(&a.max)
return
}
func (a *AtomicAccumulator) Average() float64 {
sum := atomic.LoadInt64(&a.sum)
count := atomic.LoadInt64(&a.count)
if count == 0 {
return 0
}
return float64(sum) / float64(count)
}
func main() {
// Demonstrate atomic counter
counter := AtomicCounter{}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
counter.Increment()
}
}()
}
wg.Wait()
fmt.Printf("Counter value: %d\n", counter.Value())
// Demonstrate atomic accumulator
accumulator := NewAtomicAccumulator()
// Generate some random-ish values concurrently
for i := 0; i < 10; i++ {
wg.Add(1)
go func(seed int64) {
defer wg.Done()
// Use seed to generate different sequences
value := seed
for j := 0; j < 100; j++ {
// Simple pseudo-random sequence
value = (value*1103515245 + 12345) % 10000
accumulator.Add(value)
time.Sleep(time.Microsecond)
}
}(int64(i + 1))
}
wg.Wait()
sum, count, min, max := accumulator.Stats()
avg := accumulator.Average()
fmt.Printf("Accumulator stats:\n")
fmt.Printf(" Count: %d\n", count)
fmt.Printf(" Sum: %d\n", sum)
fmt.Printf(" Min: %d\n", min)
fmt.Printf(" Max: %d\n", max)
fmt.Printf(" Avg: %.2f\n", avg)
}
This example demonstrates how to implement thread-safe counters and accumulators using atomic operations. The AtomicAccumulator
is particularly interesting as it tracks minimum and maximum values using compare-and-swap operations to ensure correctness under concurrent access.
Advanced Lock-Free Data Structures
Building on the atomic primitives, we can implement sophisticated lock-free data structures that provide high-performance concurrent access without traditional locks.
Lock-Free Stack
A lock-free stack is a fundamental data structure that allows multiple goroutines to push and pop elements concurrently:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// LockFreeStack implements a lock-free stack using atomic operations
type LockFreeStack[T any] struct {
head atomic.Pointer[node[T]]
}
type node[T any] struct {
value T
next *node[T]
}
// NewLockFreeStack creates a new lock-free stack
func NewLockFreeStack[T any]() *LockFreeStack[T] {
return &LockFreeStack[T]{}
}
// Push adds a value to the top of the stack
func (s *LockFreeStack[T]) Push(value T) {
newNode := &node[T]{value: value}
for {
// Get the current head
oldHead := s.head.Load()
// Set the new node's next pointer to the current head
newNode.next = oldHead
// Try to atomically update the head to point to the new node
if s.head.CompareAndSwap(oldHead, newNode) {
return
}
// If CAS failed, another goroutine modified the stack; try again
}
}
// Pop removes and returns the top value from the stack
// Returns the value and true if successful, or zero value and false if the stack is empty
func (s *LockFreeStack[T]) Pop() (T, bool) {
var zero T
for {
// Get the current head
oldHead := s.head.Load()
// Check if the stack is empty
if oldHead == nil {
return zero, false
}
// Try to atomically update the head to point to the next node
if s.head.CompareAndSwap(oldHead, oldHead.next) {
return oldHead.value, true
}
// If CAS failed, another goroutine modified the stack; try again
}
}
// Peek returns the top value without removing it
// Returns the value and true if successful, or zero value and false if the stack is empty
func (s *LockFreeStack[T]) Peek() (T, bool) {
var zero T
// Get the current head
head := s.head.Load()
// Check if the stack is empty
if head == nil {
return zero, false
}
return head.value, true
}
// IsEmpty returns true if the stack is empty
func (s *LockFreeStack[T]) IsEmpty() bool {
return s.head.Load() == nil
}
func main() {
// Create a new lock-free stack
stack := NewLockFreeStack[int]()
// Test basic operations
stack.Push(1)
stack.Push(2)
stack.Push(3)
val, ok := stack.Peek()
fmt.Printf("Peek: %d, %v\n", val, ok)
val, ok = stack.Pop()
fmt.Printf("Pop: %d, %v\n", val, ok)
val, ok = stack.Pop()
fmt.Printf("Pop: %d, %v\n", val, ok)
val, ok = stack.Pop()
fmt.Printf("Pop: %d, %v\n", val, ok)
val, ok = stack.Pop()
fmt.Printf("Pop: %d, %v\n", val, ok)
// Test concurrent operations
testConcurrentStack()
}
func testConcurrentStack() {
stack := NewLockFreeStack[int]()
var wg sync.WaitGroup
// Number of operations per goroutine
const ops = 1000
// Number of goroutines
const goroutines = 10
// Track the sum of popped values
var sum int64
// Launch producer goroutines
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Push values
for j := 0; j < ops; j++ {
value := id*ops + j
stack.Push(value)
}
}(i)
}
// Launch consumer goroutines
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Pop values
for j := 0; j < ops; j++ {
if val, ok := stack.Pop(); ok {
atomic.AddInt64(&sum, int64(val))
} else {
// Stack is empty, wait a bit and try again
j--
}
}
}()
}
wg.Wait()
// Verify that all values were processed
// The sum of numbers from 0 to n-1 is n*(n-1)/2
n := goroutines * ops
expectedSum := int64(n) * int64(n-1) / 2
fmt.Printf("Concurrent stack test:\n")
fmt.Printf(" Expected sum: %d\n", expectedSum)
fmt.Printf(" Actual sum: %d\n", sum)
fmt.Printf(" Stack is empty: %v\n", stack.IsEmpty())
}
This example implements a lock-free stack using atomic pointer operations. The implementation uses a compare-and-swap loop to ensure that concurrent pushes and pops are handled correctly. The concurrent test demonstrates that the stack correctly handles multiple producers and consumers.
Lock-Free Queue
A lock-free queue allows multiple goroutines to enqueue and dequeue elements concurrently:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// LockFreeQueue implements a lock-free queue using atomic operations
type LockFreeQueue[T any] struct {
head atomic.Pointer[node[T]]
tail atomic.Pointer[node[T]]
}
type node[T any] struct {
value T
next atomic.Pointer[node[T]]
}
// NewLockFreeQueue creates a new lock-free queue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
// Create a sentinel node (dummy node)
sentinel := &node[T]{}
q := &LockFreeQueue[T]{}
q.head.Store(sentinel)
q.tail.Store(sentinel)
return q
}
// Enqueue adds a value to the end of the queue
func (q *LockFreeQueue[T]) Enqueue(value T) {
// Create a new node
newNode := &node[T]{value: value}
for {
// Get the current tail
tail := q.tail.Load()
// Get the next pointer of the current tail
next := tail.next.Load()
// Check if tail is still the actual tail
if tail == q.tail.Load() {
// If tail.next is nil, try to update it to point to the new node
if next == nil {
// Try to link the new node at the end of the list
if tail.next.CompareAndSwap(nil, newNode) {
// Try to update the tail to point to the new node
q.tail.CompareAndSwap(tail, newNode)
return
}
} else {
// Tail was not pointing to the last node, help advance it
q.tail.CompareAndSwap(tail, next)
}
}
// If any of the conditions fail, retry
}
}
// Dequeue removes and returns the value at the front of the queue
// Returns the value and true if successful, or zero value and false if the queue is empty
func (q *LockFreeQueue[T]) Dequeue() (T, bool) {
var zero T
for {
// Get the current head, tail, and head.next
head := q.head.Load()
tail := q.tail.Load()
next := head.next.Load()
// Check if head is still the actual head
if head == q.head.Load() {
// If head == tail, the queue might be empty
if head == tail {
// If next is nil, the queue is empty
if next == nil {
return zero, false
}
// Tail is falling behind, help advance it
q.tail.CompareAndSwap(tail, next)
} else {
// Queue is not empty, try to dequeue
value := next.value
// Try to advance head to the next node
if q.head.CompareAndSwap(head, next) {
return value, true
}
}
}
// If any of the conditions fail, retry
}
}
// IsEmpty returns true if the queue is empty
func (q *LockFreeQueue[T]) IsEmpty() bool {
head := q.head.Load()
tail := q.tail.Load()
return head == tail && head.next.Load() == nil
}
func main() {
// Create a new lock-free queue
queue := NewLockFreeQueue[int]()
// Test basic operations
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
val, ok := queue.Dequeue()
fmt.Printf("Dequeue: %d, %v\n", val, ok)
val, ok = queue.Dequeue()
fmt.Printf("Dequeue: %d, %v\n", val, ok)
val, ok = queue.Dequeue()
fmt.Printf("Dequeue: %d, %v\n", val, ok)
val, ok = queue.Dequeue()
fmt.Printf("Dequeue: %d, %v\n", val, ok)
// Test concurrent operations
testConcurrentQueue()
}
func testConcurrentQueue() {
queue := NewLockFreeQueue[int]()
var wg sync.WaitGroup
// Number of operations per goroutine
const ops = 1000
// Number of goroutines
const goroutines = 10
// Track the sum of dequeued values
var sum int64
var count int64
// Launch producer goroutines
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Enqueue values
for j := 0; j < ops; j++ {
value := id*ops + j
queue.Enqueue(value)
}
}(i)
}
// Launch consumer goroutines
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Dequeue values
for j := 0; j < ops; j++ {
if val, ok := queue.Dequeue(); ok {
atomic.AddInt64(&sum, int64(val))
atomic.AddInt64(&count, 1)
} else {
// Queue is empty, wait a bit and try again
j--
}
}
}()
}
wg.Wait()
// Verify that all values were processed
// The sum of numbers from 0 to n-1 is n*(n-1)/2
n := goroutines * ops
expectedSum := int64(n) * int64(n-1) / 2
fmt.Printf("Concurrent queue test:\n")
fmt.Printf(" Expected sum: %d\n", expectedSum)
fmt.Printf(" Actual sum: %d\n", sum)
fmt.Printf(" Items processed: %d\n", count)
fmt.Printf(" Queue is empty: %v\n", queue.IsEmpty())
}
This example implements a lock-free queue using the Michael-Scott algorithm, which is a classic lock-free queue algorithm. The implementation uses a sentinel node (dummy node) to simplify the logic and avoid edge cases. The concurrent test demonstrates that the queue correctly handles multiple producers and consumers.