Atomic Counters and Accumulators

Atomic operations are commonly used to implement high-performance counters and accumulators:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// AtomicCounter provides a thread-safe counter
type AtomicCounter struct {
	value int64
}

func (c *AtomicCounter) Increment() {
	atomic.AddInt64(&c.value, 1)
}

func (c *AtomicCounter) Add(n int64) {
	atomic.AddInt64(&c.value, n)
}

func (c *AtomicCounter) Value() int64 {
	return atomic.LoadInt64(&c.value)
}

// AtomicAccumulator accumulates values with min/max tracking
type AtomicAccumulator struct {
	sum   int64
	count int64
	min   int64
	max   int64
}

func NewAtomicAccumulator() *AtomicAccumulator {
	return &AtomicAccumulator{
		min: int64(^uint64(0) >> 1), // Max int64
		max: -int64(^uint64(0) >> 1) - 1, // Min int64
	}
}

func (a *AtomicAccumulator) Add(value int64) {
	atomic.AddInt64(&a.sum, value)
	atomic.AddInt64(&a.count, 1)
	
	// Update minimum (if value is smaller)
	for {
		currentMin := atomic.LoadInt64(&a.min)
		if value >= currentMin {
			break // Current value is not smaller than min
		}
		if atomic.CompareAndSwapInt64(&a.min, currentMin, value) {
			break // Successfully updated min
		}
		// If CAS failed, another thread updated min; try again
	}
	
	// Update maximum (if value is larger)
	for {
		currentMax := atomic.LoadInt64(&a.max)
		if value <= currentMax {
			break // Current value is not larger than max
		}
		if atomic.CompareAndSwapInt64(&a.max, currentMax, value) {
			break // Successfully updated max
		}
		// If CAS failed, another thread updated max; try again
	}
}

func (a *AtomicAccumulator) Stats() (sum, count, min, max int64) {
	sum = atomic.LoadInt64(&a.sum)
	count = atomic.LoadInt64(&a.count)
	min = atomic.LoadInt64(&a.min)
	max = atomic.LoadInt64(&a.max)
	return
}

func (a *AtomicAccumulator) Average() float64 {
	sum := atomic.LoadInt64(&a.sum)
	count := atomic.LoadInt64(&a.count)
	if count == 0 {
		return 0
	}
	return float64(sum) / float64(count)
}

func main() {
	// Demonstrate atomic counter
	counter := AtomicCounter{}
	
	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for j := 0; j < 1000; j++ {
				counter.Increment()
			}
		}()
	}
	
	wg.Wait()
	fmt.Printf("Counter value: %d\n", counter.Value())
	
	// Demonstrate atomic accumulator
	accumulator := NewAtomicAccumulator()
	
	// Generate some random-ish values concurrently
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(seed int64) {
			defer wg.Done()
			
			// Use seed to generate different sequences
			value := seed
			for j := 0; j < 100; j++ {
				// Simple pseudo-random sequence
				value = (value*1103515245 + 12345) % 10000
				accumulator.Add(value)
				time.Sleep(time.Microsecond)
			}
		}(int64(i + 1))
	}
	
	wg.Wait()
	
	sum, count, min, max := accumulator.Stats()
	avg := accumulator.Average()
	
	fmt.Printf("Accumulator stats:\n")
	fmt.Printf("  Count: %d\n", count)
	fmt.Printf("  Sum: %d\n", sum)
	fmt.Printf("  Min: %d\n", min)
	fmt.Printf("  Max: %d\n", max)
	fmt.Printf("  Avg: %.2f\n", avg)
}

This example demonstrates how to implement thread-safe counters and accumulators using atomic operations. The AtomicAccumulator is particularly interesting as it tracks minimum and maximum values using compare-and-swap operations to ensure correctness under concurrent access.

Advanced Lock-Free Data Structures

Building on the atomic primitives, we can implement sophisticated lock-free data structures that provide high-performance concurrent access without traditional locks.

Lock-Free Stack

A lock-free stack is a fundamental data structure that allows multiple goroutines to push and pop elements concurrently:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

// LockFreeStack implements a lock-free stack using atomic operations
type LockFreeStack[T any] struct {
	head atomic.Pointer[node[T]]
}

type node[T any] struct {
	value T
	next  *node[T]
}

// NewLockFreeStack creates a new lock-free stack
func NewLockFreeStack[T any]() *LockFreeStack[T] {
	return &LockFreeStack[T]{}
}

// Push adds a value to the top of the stack
func (s *LockFreeStack[T]) Push(value T) {
	newNode := &node[T]{value: value}
	
	for {
		// Get the current head
		oldHead := s.head.Load()
		
		// Set the new node's next pointer to the current head
		newNode.next = oldHead
		
		// Try to atomically update the head to point to the new node
		if s.head.CompareAndSwap(oldHead, newNode) {
			return
		}
		
		// If CAS failed, another goroutine modified the stack; try again
	}
}

// Pop removes and returns the top value from the stack
// Returns the value and true if successful, or zero value and false if the stack is empty
func (s *LockFreeStack[T]) Pop() (T, bool) {
	var zero T
	
	for {
		// Get the current head
		oldHead := s.head.Load()
		
		// Check if the stack is empty
		if oldHead == nil {
			return zero, false
		}
		
		// Try to atomically update the head to point to the next node
		if s.head.CompareAndSwap(oldHead, oldHead.next) {
			return oldHead.value, true
		}
		
		// If CAS failed, another goroutine modified the stack; try again
	}
}

// Peek returns the top value without removing it
// Returns the value and true if successful, or zero value and false if the stack is empty
func (s *LockFreeStack[T]) Peek() (T, bool) {
	var zero T
	
	// Get the current head
	head := s.head.Load()
	
	// Check if the stack is empty
	if head == nil {
		return zero, false
	}
	
	return head.value, true
}

// IsEmpty returns true if the stack is empty
func (s *LockFreeStack[T]) IsEmpty() bool {
	return s.head.Load() == nil
}

func main() {
	// Create a new lock-free stack
	stack := NewLockFreeStack[int]()
	
	// Test basic operations
	stack.Push(1)
	stack.Push(2)
	stack.Push(3)
	
	val, ok := stack.Peek()
	fmt.Printf("Peek: %d, %v\n", val, ok)
	
	val, ok = stack.Pop()
	fmt.Printf("Pop: %d, %v\n", val, ok)
	
	val, ok = stack.Pop()
	fmt.Printf("Pop: %d, %v\n", val, ok)
	
	val, ok = stack.Pop()
	fmt.Printf("Pop: %d, %v\n", val, ok)
	
	val, ok = stack.Pop()
	fmt.Printf("Pop: %d, %v\n", val, ok)
	
	// Test concurrent operations
	testConcurrentStack()
}

func testConcurrentStack() {
	stack := NewLockFreeStack[int]()
	var wg sync.WaitGroup
	
	// Number of operations per goroutine
	const ops = 1000
	
	// Number of goroutines
	const goroutines = 10
	
	// Track the sum of popped values
	var sum int64
	
	// Launch producer goroutines
	for i := 0; i < goroutines; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			// Push values
			for j := 0; j < ops; j++ {
				value := id*ops + j
				stack.Push(value)
			}
		}(i)
	}
	
	// Launch consumer goroutines
	for i := 0; i < goroutines; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			
			// Pop values
			for j := 0; j < ops; j++ {
				if val, ok := stack.Pop(); ok {
					atomic.AddInt64(&sum, int64(val))
				} else {
					// Stack is empty, wait a bit and try again
					j--
				}
			}
		}()
	}
	
	wg.Wait()
	
	// Verify that all values were processed
	// The sum of numbers from 0 to n-1 is n*(n-1)/2
	n := goroutines * ops
	expectedSum := int64(n) * int64(n-1) / 2
	
	fmt.Printf("Concurrent stack test:\n")
	fmt.Printf("  Expected sum: %d\n", expectedSum)
	fmt.Printf("  Actual sum: %d\n", sum)
	fmt.Printf("  Stack is empty: %v\n", stack.IsEmpty())
}

This example implements a lock-free stack using atomic pointer operations. The implementation uses a compare-and-swap loop to ensure that concurrent pushes and pops are handled correctly. The concurrent test demonstrates that the stack correctly handles multiple producers and consumers.

Lock-Free Queue

A lock-free queue allows multiple goroutines to enqueue and dequeue elements concurrently:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

// LockFreeQueue implements a lock-free queue using atomic operations
type LockFreeQueue[T any] struct {
	head atomic.Pointer[node[T]]
	tail atomic.Pointer[node[T]]
}

type node[T any] struct {
	value T
	next  atomic.Pointer[node[T]]
}

// NewLockFreeQueue creates a new lock-free queue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
	// Create a sentinel node (dummy node)
	sentinel := &node[T]{}
	
	q := &LockFreeQueue[T]{}
	q.head.Store(sentinel)
	q.tail.Store(sentinel)
	
	return q
}

// Enqueue adds a value to the end of the queue
func (q *LockFreeQueue[T]) Enqueue(value T) {
	// Create a new node
	newNode := &node[T]{value: value}
	
	for {
		// Get the current tail
		tail := q.tail.Load()
		// Get the next pointer of the current tail
		next := tail.next.Load()
		
		// Check if tail is still the actual tail
		if tail == q.tail.Load() {
			// If tail.next is nil, try to update it to point to the new node
			if next == nil {
				// Try to link the new node at the end of the list
				if tail.next.CompareAndSwap(nil, newNode) {
					// Try to update the tail to point to the new node
					q.tail.CompareAndSwap(tail, newNode)
					return
				}
			} else {
				// Tail was not pointing to the last node, help advance it
				q.tail.CompareAndSwap(tail, next)
			}
		}
		// If any of the conditions fail, retry
	}
}

// Dequeue removes and returns the value at the front of the queue
// Returns the value and true if successful, or zero value and false if the queue is empty
func (q *LockFreeQueue[T]) Dequeue() (T, bool) {
	var zero T
	
	for {
		// Get the current head, tail, and head.next
		head := q.head.Load()
		tail := q.tail.Load()
		next := head.next.Load()
		
		// Check if head is still the actual head
		if head == q.head.Load() {
			// If head == tail, the queue might be empty
			if head == tail {
				// If next is nil, the queue is empty
				if next == nil {
					return zero, false
				}
				// Tail is falling behind, help advance it
				q.tail.CompareAndSwap(tail, next)
			} else {
				// Queue is not empty, try to dequeue
				value := next.value
				
				// Try to advance head to the next node
				if q.head.CompareAndSwap(head, next) {
					return value, true
				}
			}
		}
		// If any of the conditions fail, retry
	}
}

// IsEmpty returns true if the queue is empty
func (q *LockFreeQueue[T]) IsEmpty() bool {
	head := q.head.Load()
	tail := q.tail.Load()
	return head == tail && head.next.Load() == nil
}

func main() {
	// Create a new lock-free queue
	queue := NewLockFreeQueue[int]()
	
	// Test basic operations
	queue.Enqueue(1)
	queue.Enqueue(2)
	queue.Enqueue(3)
	
	val, ok := queue.Dequeue()
	fmt.Printf("Dequeue: %d, %v\n", val, ok)
	
	val, ok = queue.Dequeue()
	fmt.Printf("Dequeue: %d, %v\n", val, ok)
	
	val, ok = queue.Dequeue()
	fmt.Printf("Dequeue: %d, %v\n", val, ok)
	
	val, ok = queue.Dequeue()
	fmt.Printf("Dequeue: %d, %v\n", val, ok)
	
	// Test concurrent operations
	testConcurrentQueue()
}

func testConcurrentQueue() {
	queue := NewLockFreeQueue[int]()
	var wg sync.WaitGroup
	
	// Number of operations per goroutine
	const ops = 1000
	
	// Number of goroutines
	const goroutines = 10
	
	// Track the sum of dequeued values
	var sum int64
	var count int64
	
	// Launch producer goroutines
	for i := 0; i < goroutines; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			// Enqueue values
			for j := 0; j < ops; j++ {
				value := id*ops + j
				queue.Enqueue(value)
			}
		}(i)
	}
	
	// Launch consumer goroutines
	for i := 0; i < goroutines; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			
			// Dequeue values
			for j := 0; j < ops; j++ {
				if val, ok := queue.Dequeue(); ok {
					atomic.AddInt64(&sum, int64(val))
					atomic.AddInt64(&count, 1)
				} else {
					// Queue is empty, wait a bit and try again
					j--
				}
			}
		}()
	}
	
	wg.Wait()
	
	// Verify that all values were processed
	// The sum of numbers from 0 to n-1 is n*(n-1)/2
	n := goroutines * ops
	expectedSum := int64(n) * int64(n-1) / 2
	
	fmt.Printf("Concurrent queue test:\n")
	fmt.Printf("  Expected sum: %d\n", expectedSum)
	fmt.Printf("  Actual sum: %d\n", sum)
	fmt.Printf("  Items processed: %d\n", count)
	fmt.Printf("  Queue is empty: %v\n", queue.IsEmpty())
}

This example implements a lock-free queue using the Michael-Scott algorithm, which is a classic lock-free queue algorithm. The implementation uses a sentinel node (dummy node) to simplify the logic and avoid edge cases. The concurrent test demonstrates that the queue correctly handles multiple producers and consumers.