Advanced Synchronization Patterns

Beyond the basic primitives, Go enables sophisticated synchronization patterns by combining multiple primitives and leveraging channels.

Semaphore Implementation

Semaphores are not provided directly in the sync package but can be implemented using channels or a combination of mutex and condition variable:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// ChannelSemaphore implements a semaphore using a buffered channel
type ChannelSemaphore struct {
	permits chan struct{}
}

// NewChannelSemaphore creates a new semaphore with the given number of permits
func NewChannelSemaphore(n int) *ChannelSemaphore {
	return &ChannelSemaphore{
		permits: make(chan struct{}, n),
	}
}

// Acquire acquires a permit, blocking until one is available
func (s *ChannelSemaphore) Acquire() {
	s.permits <- struct{}{}
}

// TryAcquire attempts to acquire a permit without blocking
func (s *ChannelSemaphore) TryAcquire() bool {
	select {
	case s.permits <- struct{}{}:
		return true
	default:
		return false
	}
}

// AcquireWithTimeout attempts to acquire a permit with a timeout
func (s *ChannelSemaphore) AcquireWithTimeout(timeout time.Duration) bool {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	
	select {
	case s.permits <- struct{}{}:
		return true
	case <-ctx.Done():
		return false
	}
}

// Release releases a permit
func (s *ChannelSemaphore) Release() {
	select {
	case <-s.permits:
	default:
		panic("Semaphore release without acquire")
	}
}

// MutexSemaphore implements a semaphore using mutex and condition variable
type MutexSemaphore struct {
	mu      sync.Mutex
	cond    *sync.Cond
	count   int
	maxSize int
}

// NewMutexSemaphore creates a new semaphore with the given number of permits
func NewMutexSemaphore(n int) *MutexSemaphore {
	s := &MutexSemaphore{
		maxSize: n,
		count:   n,
	}
	s.cond = sync.NewCond(&s.mu)
	return s
}

// Acquire acquires a permit, blocking until one is available
func (s *MutexSemaphore) Acquire() {
	s.mu.Lock()
	defer s.mu.Unlock()
	
	for s.count == 0 {
		s.cond.Wait()
	}
	
	s.count--
}

// TryAcquire attempts to acquire a permit without blocking
func (s *MutexSemaphore) TryAcquire() bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	
	if s.count == 0 {
		return false
	}
	
	s.count--
	return true
}

// Release releases a permit
func (s *MutexSemaphore) Release() {
	s.mu.Lock()
	defer s.mu.Unlock()
	
	if s.count >= s.maxSize {
		panic("Semaphore release without acquire")
	}
	
	s.count++
	s.cond.Signal()
}

func demonstrateSemaphores() {
	fmt.Println("\n=== Channel-based Semaphore ===")
	
	// Create a semaphore with 3 permits
	sem := NewChannelSemaphore(3)
	
	// Launch workers that need to acquire the semaphore
	var wg sync.WaitGroup
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			fmt.Printf("Worker %d: Attempting to acquire semaphore\n", id)
			
			// Try with timeout first
			if sem.AcquireWithTimeout(time.Duration(id) * 50 * time.Millisecond) {
				fmt.Printf("Worker %d: Acquired with timeout\n", id)
			} else {
				fmt.Printf("Worker %d: Timeout expired, acquiring with blocking call\n", id)
				sem.Acquire()
				fmt.Printf("Worker %d: Acquired after blocking\n", id)
			}
			
			// Simulate work
			fmt.Printf("Worker %d: Working...\n", id)
			time.Sleep(200 * time.Millisecond)
			
			// Release the semaphore
			fmt.Printf("Worker %d: Releasing semaphore\n", id)
			sem.Release()
		}(i)
		
		// Small delay between starting workers
		time.Sleep(10 * time.Millisecond)
	}
	
	wg.Wait()
	
	fmt.Println("\n=== Mutex-based Semaphore ===")
	
	// Create a mutex-based semaphore with 2 permits
	mutexSem := NewMutexSemaphore(2)
	
	// Launch workers
	for i := 1; i <= 4; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			fmt.Printf("Worker %d: Attempting to acquire mutex semaphore\n", id)
			
			// Try non-blocking acquire first
			if mutexSem.TryAcquire() {
				fmt.Printf("Worker %d: Acquired without blocking\n", id)
			} else {
				fmt.Printf("Worker %d: Could not acquire immediately, blocking...\n", id)
				mutexSem.Acquire()
				fmt.Printf("Worker %d: Acquired after blocking\n", id)
			}
			
			// Simulate work
			fmt.Printf("Worker %d: Working...\n", id)
			time.Sleep(150 * time.Millisecond)
			
			// Release the semaphore
			fmt.Printf("Worker %d: Releasing mutex semaphore\n", id)
			mutexSem.Release()
		}(i)
		
		// Small delay between starting workers
		time.Sleep(10 * time.Millisecond)
	}
	
	wg.Wait()
}

func main() {
	demonstrateSemaphores()
}

This example demonstrates two semaphore implementations:

  1. Channel-based Semaphore: Uses a buffered channel to represent permits.
  2. Mutex-based Semaphore: Uses a mutex and condition variable for more complex control.

Both implementations provide similar functionality but have different performance characteristics and capabilities.

Read-Write Preference Control

While sync.RWMutex gives preference to writers, sometimes we need more control over access patterns:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// ReaderPreferenceRWMutex gives preference to readers over writers
type ReaderPreferenceRWMutex struct {
	mu          sync.Mutex
	readerCount int32
	readerWait  sync.Cond
	writerWait  sync.Cond
	writing     bool
}

// NewReaderPreferenceRWMutex creates a new reader-preference RW mutex
func NewReaderPreferenceRWMutex() *ReaderPreferenceRWMutex {
	rw := &ReaderPreferenceRWMutex{}
	rw.readerWait.L = &rw.mu
	rw.writerWait.L = &rw.mu
	return rw
}

// RLock acquires a read lock
func (rw *ReaderPreferenceRWMutex) RLock() {
	rw.mu.Lock()
	// Wait if someone is writing
	for rw.writing {
		rw.readerWait.Wait()
	}
	atomic.AddInt32(&rw.readerCount, 1)
	rw.mu.Unlock()
}

// RUnlock releases a read lock
func (rw *ReaderPreferenceRWMutex) RUnlock() {
	rw.mu.Lock()
	atomic.AddInt32(&rw.readerCount, -1)
	// If no more readers, signal a waiting writer
	if atomic.LoadInt32(&rw.readerCount) == 0 {
		rw.writerWait.Signal()
	}
	rw.mu.Unlock()
}

// Lock acquires a write lock
func (rw *ReaderPreferenceRWMutex) Lock() {
	rw.mu.Lock()
	// Wait until there are no readers and no one is writing
	for atomic.LoadInt32(&rw.readerCount) > 0 || rw.writing {
		rw.writerWait.Wait()
	}
	rw.writing = true
	rw.mu.Unlock()
}

// Unlock releases a write lock
func (rw *ReaderPreferenceRWMutex) Unlock() {
	rw.mu.Lock()
	rw.writing = false
	// Signal all waiting readers (preference to readers)
	rw.readerWait.Broadcast()
	// Also signal one writer
	rw.writerWait.Signal()
	rw.mu.Unlock()
}

func demonstrateRWPreference() {
	fmt.Println("\n=== Reader Preference RWMutex ===")
	
	// Create a reader-preference RW mutex
	rwMutex := NewReaderPreferenceRWMutex()
	
	// Shared resource
	var sharedData int
	
	// Track operation counts
	var readCount, writeCount int32
	
	// Start time for measuring operations
	startTime := time.Now()
	endTime := startTime.Add(1 * time.Second)
	
	// Launch reader goroutines
	var wg sync.WaitGroup
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for time.Now().Before(endTime) {
				// Acquire read lock
				rwMutex.RLock()
				
				// Read shared data
				_ = sharedData
				atomic.AddInt32(&readCount, 1)
				
				// Hold lock briefly
				time.Sleep(5 * time.Millisecond)
				
				// Release read lock
				rwMutex.RUnlock()
				
				// Small pause between reads
				time.Sleep(5 * time.Millisecond)
			}
		}(i)
	}
	
	// Launch writer goroutines
	for i := 1; i <= 2; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for time.Now().Before(endTime) {
				// Acquire write lock
				rwMutex.Lock()
				
				// Update shared data
				sharedData++
				atomic.AddInt32(&writeCount, 1)
				
				// Hold lock briefly
				time.Sleep(10 * time.Millisecond)
				
				// Release write lock
				rwMutex.Unlock()
				
				// Small pause between writes
				time.Sleep(20 * time.Millisecond)
			}
		}(i)
	}
	
	wg.Wait()
	
	fmt.Printf("Reader operations: %d\n", atomic.LoadInt32(&readCount))
	fmt.Printf("Writer operations: %d\n", atomic.LoadInt32(&writeCount))
	fmt.Printf("Ratio (reads/writes): %.2f\n",
		float64(atomic.LoadInt32(&readCount))/float64(atomic.LoadInt32(&writeCount)))
	
	// Compare with standard RWMutex (which has writer preference)
	compareWithStandardRWMutex()
}

func compareWithStandardRWMutex() {
	fmt.Println("\n=== Standard RWMutex (Writer Preference) ===")
	
	// Create a standard RW mutex
	var rwMutex sync.RWMutex
	
	// Shared resource
	var sharedData int
	
	// Track operation counts
	var readCount, writeCount int32
	
	// Start time for measuring operations
	startTime := time.Now()
	endTime := startTime.Add(1 * time.Second)
	
	// Launch reader goroutines
	var wg sync.WaitGroup
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for time.Now().Before(endTime) {
				// Acquire read lock
				rwMutex.RLock()
				
				// Read shared data
				_ = sharedData
				atomic.AddInt32(&readCount, 1)
				
				// Hold lock briefly
				time.Sleep(5 * time.Millisecond)
				
				// Release read lock
				rwMutex.RUnlock()
				
				// Small pause between reads
				time.Sleep(5 * time.Millisecond)
			}
		}(i)
	}
	
	// Launch writer goroutines
	for i := 1; i <= 2; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for time.Now().Before(endTime) {
				// Acquire write lock
				rwMutex.Lock()
				
				// Update shared data
				sharedData++
				atomic.AddInt32(&writeCount, 1)
				
				// Hold lock briefly
				time.Sleep(10 * time.Millisecond)
				
				// Release write lock
				rwMutex.Unlock()
				
				// Small pause between writes
				time.Sleep(20 * time.Millisecond)
			}
		}(i)
	}
	
	wg.Wait()
	
	fmt.Printf("Reader operations: %d\n", atomic.LoadInt32(&readCount))
	fmt.Printf("Writer operations: %d\n", atomic.LoadInt32(&writeCount))
	fmt.Printf("Ratio (reads/writes): %.2f\n",
		float64(atomic.LoadInt32(&readCount))/float64(atomic.LoadInt32(&writeCount)))
}

func main() {
	demonstrateRWPreference()
}

This example demonstrates:

  1. Custom RWMutex Implementation: A reader-preference RWMutex that prioritizes readers over writers.
  2. Performance Comparison: Comparing the custom implementation with the standard RWMutex.

The choice between reader and writer preference depends on the specific workload and requirements of your application.

Atomic Operations and Lock-Free Programming

For simple shared state, atomic operations can provide synchronization without locks:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

func atomicPatterns() {
	// Basic atomic operations
	basicAtomic()
	
	// Compare-and-swap patterns
	casPatterns()
	
	// Lock-free counter
	lockFreeCounter()
}

func basicAtomic() {
	fmt.Println("\n=== Basic Atomic Operations ===")
	
	// Atomic integer operations
	var counter int64
	
	// Add
	atomic.AddInt64(&counter, 10)
	fmt.Printf("After adding 10: %d\n", atomic.LoadInt64(&counter))
	
	// Subtract (add negative)
	atomic.AddInt64(&counter, -5)
	fmt.Printf("After subtracting 5: %d\n", atomic.LoadInt64(&counter))
	
	// Store
	atomic.StoreInt64(&counter, 42)
	fmt.Printf("After storing 42: %d\n", atomic.LoadInt64(&counter))
	
	// Swap (returns old value)
	old := atomic.SwapInt64(&counter, 100)
	fmt.Printf("Swapped %d with 100, now: %d\n", old, atomic.LoadInt64(&counter))
}

func casPatterns() {
	fmt.Println("\n=== Compare-And-Swap Patterns ===")
	
	var value int64 = 100
	
	// Basic CAS
	swapped := atomic.CompareAndSwapInt64(&value, 100, 200)
	fmt.Printf("CAS 100->200: success=%v, value=%d\n", swapped, atomic.LoadInt64(&value))
	
	// Failed CAS (wrong expected value)
	swapped = atomic.CompareAndSwapInt64(&value, 100, 300)
	fmt.Printf("CAS 100->300: success=%v, value=%d\n", swapped, atomic.LoadInt64(&value))
	
	// Increment with CAS (retry loop)
	incrementWithCAS := func(addr *int64, delta int64) {
		for {
			old := atomic.LoadInt64(addr)
			new := old + delta
			if atomic.CompareAndSwapInt64(addr, old, new) {
				return
			}
			// If CAS failed, loop and try again
		}
	}
	
	incrementWithCAS(&value, 50)
	fmt.Printf("After CAS increment by 50: %d\n", atomic.LoadInt64(&value))
}

// LockFreeCounter is a counter that can be incremented concurrently without locks
type LockFreeCounter struct {
	value uint64
}

// Increment atomically increments the counter and returns the new value
func (c *LockFreeCounter) Increment() uint64 {
	return atomic.AddUint64(&c.value, 1)
}

// Load atomically loads the counter value
func (c *LockFreeCounter) Load() uint64 {
	return atomic.LoadUint64(&c.value)
}

// LockBasedCounter is a counter that uses a mutex for synchronization
type LockBasedCounter struct {
	mu    sync.Mutex
	value uint64
}

// Increment increments the counter and returns the new value
func (c *LockBasedCounter) Increment() uint64 {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.value++
	return c.value
}

// Load returns the counter value
func (c *LockBasedCounter) Load() uint64 {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.value
}

func lockFreeCounter() {
	fmt.Println("\n=== Lock-Free vs. Lock-Based Counter Benchmark ===")
	
	// Create counters
	lockFree := &LockFreeCounter{}
	lockBased := &LockBasedCounter{}
	
	// Benchmark function
	benchmark := func(name string, counter interface{}, goroutines, iterations int) time.Duration {
		var wg sync.WaitGroup
		
		// Create a function that increments the appropriate counter
		var incrementFn func()
		
		switch c := counter.(type) {
		case *LockFreeCounter:
			incrementFn = func() { c.Increment() }
		case *LockBasedCounter:
			incrementFn = func() { c.Increment() }
		}
		
		// Start timing
		start := time.Now()
		
		// Launch goroutines
		for i := 0; i < goroutines; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()
				
				// Perform increments
				for j := 0; j < iterations; j++ {
					incrementFn()
				}
			}()
		}
		
		// Wait for all goroutines to complete
		wg.Wait()
		
		// Return elapsed time
		return time.Since(start)
	}
	
	// Run benchmarks
	goroutines := 4
	iterations := 1000000
	
	lockFreeTime := benchmark("Lock-Free", lockFree, goroutines, iterations)
	lockBasedTime := benchmark("Lock-Based", lockBased, goroutines, iterations)
	
	// Verify results
	fmt.Printf("Lock-Free Counter: %d\n", lockFree.Load())
	fmt.Printf("Lock-Based Counter: %d\n", lockBased.Load())
	
	// Compare performance
	fmt.Printf("Lock-Free Time: %v\n", lockFreeTime)
	fmt.Printf("Lock-Based Time: %v\n", lockBasedTime)
	fmt.Printf("Performance Ratio: %.2fx\n", float64(lockBasedTime)/float64(lockFreeTime))
}

func main() {
	atomicPatterns()
}

Key insights about atomic operations:

  1. Hardware Support: Atomic operations are implemented using special CPU instructions.
  2. No Locks Required: They provide synchronization without the overhead of locks.
  3. Limited Operations: Only simple operations like load, store, add, and compare-and-swap are available.
  4. Performance Advantage: Atomic operations are significantly faster than mutex-based synchronization for simple cases.

Lock-free programming using atomic operations can provide substantial performance benefits but requires careful design to ensure correctness.