Master Go’s synchronization primitives including mutexes.

#Go’s sync package contains the building blocks for safe concurrent programming. When multiple goroutines access shared data, you need synchronization to prevent race conditions and data corruption.

The sync package provides several key primitives:

  • Mutex: Mutual exclusion locks for protecting shared resources
  • RWMutex: Reader-writer locks for read-heavy workloads
  • WaitGroup: Waiting for a collection of goroutines to finish
  • Once: Ensuring a function runs exactly once
  • Cond: Condition variables for complex coordination
  • Pool: Object pools for reducing garbage collection pressure

When to Use Sync Primitives

Go’s philosophy is “Don’t communicate by sharing memory; share memory by communicating.” Channels are often the right choice for goroutine coordination. However, sync primitives are essential when:

  • Protecting shared state that multiple goroutines access
  • Implementing low-level concurrent data structures
  • Optimizing performance-critical code paths
  • Building higher-level synchronization abstractions

Common Pitfalls

Synchronization bugs are notoriously difficult to debug. Common mistakes include:

  • Deadlocks: Goroutines waiting for each other indefinitely
  • Race conditions: Unsynchronized access to shared data
  • Lock contention: Too many goroutines competing for the same lock
  • Forgetting to unlock: Causing permanent blocking

This guide covers each primitive in detail, with practical examples and guidance on when to use each approach.


Core Sync Package Primitives

The sync package provides several fundamental primitives that form the building blocks for concurrent programming in Go. Understanding their implementation details and behavior characteristics is essential for effective usage.

Mutex: Beyond Simple Locking

At its core, sync.Mutex provides mutual exclusion, ensuring that only one goroutine can access a protected resource at a time. However, its implementation and behavior have nuances worth exploring:

package main

import (
	"fmt"
	"runtime"
	"sync"
	"sync/atomic"
	"time"
)

func mutexInternals() {
	// Create a mutex
	var mu sync.Mutex
	
	// Counter to track goroutines that have acquired the lock
	var counter int32
	
	// Function that acquires the lock, increments counter, waits, then decrements
	acquireLock := func(id int, wg *sync.WaitGroup) {
		defer wg.Done()
		
		fmt.Printf("Goroutine %d: Attempting to acquire lock\n", id)
		mu.Lock()
		
		// Atomically increment the counter
		newCount := atomic.AddInt32(&counter, 1)
		fmt.Printf("Goroutine %d: Acquired lock, active holders: %d\n", id, newCount)
		
		// This should always be 1 if mutex is working correctly
		if newCount != 1 {
			panic(fmt.Sprintf("Mutex failure: %d goroutines holding lock", newCount))
		}
		
		// Simulate work while holding the lock
		time.Sleep(50 * time.Millisecond)
		
		// Atomically decrement the counter
		atomic.AddInt32(&counter, -1)
		fmt.Printf("Goroutine %d: Releasing lock\n", id)
		mu.Unlock()
	}
	
	// Launch multiple goroutines to compete for the lock
	var wg sync.WaitGroup
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go acquireLock(i, &wg)
	}
	
	wg.Wait()
	fmt.Println("All goroutines completed successfully")
	
	// Demonstrate mutex starvation scenario
	demonstrateMutexStarvation()
}

func demonstrateMutexStarvation() {
	var mu sync.Mutex
	
	// Track how many times each goroutine acquires the lock
	acquisitions := make([]int, 2)
	
	// Done channel to signal completion
	done := make(chan bool)
	
	// Launch two goroutines with different behaviors
	
	// Goroutine 1: Acquires lock briefly and releases quickly
	go func() {
		for i := 0; i < 1000 && !isChannelClosed(done); i++ {
			mu.Lock()
			acquisitions[0]++
			mu.Unlock()
			// Give other goroutines a chance
			runtime.Gosched()
		}
		done <- true
	}()
	
	// Goroutine 2: Holds lock for longer periods
	go func() {
		for i := 0; i < 100 && !isChannelClosed(done); i++ {
			mu.Lock()
			acquisitions[1]++
			// Hold the lock longer
			time.Sleep(1 * time.Millisecond)
			mu.Unlock()
			// Give other goroutines a chance
			runtime.Gosched()
		}
		done <- true
	}()
	
	// Let them run for a bit
	time.Sleep(100 * time.Millisecond)
	close(done)
	
	// Give goroutines time to finish
	time.Sleep(10 * time.Millisecond)
	
	fmt.Printf("Lock acquisition distribution:\n")
	fmt.Printf("- Fast goroutine: %d acquisitions\n", acquisitions[0])
	fmt.Printf("- Slow goroutine: %d acquisitions\n", acquisitions[1])
	fmt.Printf("- Ratio (fast/slow): %.2f\n", float64(acquisitions[0])/float64(acquisitions[1]))
}

// Helper function to check if a channel is closed
func isChannelClosed(ch chan bool) bool {
	select {
	case <-ch:
		return true
	default:
		return false
	}
}

func main() {
	mutexInternals()
}

This example demonstrates several important aspects of mutex behavior:

  1. Mutual Exclusion Guarantee: The atomic counter verifies that only one goroutine holds the lock at any time.
  2. FIFO vs. Fairness: Go’s mutex doesn’t guarantee first-in-first-out ordering for waiting goroutines.
  3. Starvation Potential: The second part demonstrates how goroutines that hold locks briefly can starve those that need them for longer periods.

Under the hood, Go’s mutex uses a combination of atomic operations and goroutine queuing. It employs a “barging” model where a releasing goroutine may immediately reacquire the lock if it attempts to do so before other waiting goroutines are scheduled, which can lead to starvation in certain scenarios.

RWMutex: Optimizing for Read-Heavy Workloads

The sync.RWMutex extends the basic mutex concept by distinguishing between read and write operations, allowing multiple readers to access the protected resource simultaneously:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

func rwMutexDemo() {
	// Create a RWMutex
	var rwMu sync.RWMutex
	
	// Counters to track concurrent readers and writers
	var readers int32
	var writers int32
	
	// Function for reader goroutines
	reader := func(id int, wg *sync.WaitGroup) {
		defer wg.Done()
		
		for i := 0; i < 3; i++ {
			// Acquire read lock
			fmt.Printf("Reader %d: Attempting to acquire read lock\n", id)
			rwMu.RLock()
			
			// Increment reader count and check for writers
			currentReaders := atomic.AddInt32(&readers, 1)
			currentWriters := atomic.LoadInt32(&writers)
			fmt.Printf("Reader %d: Acquired read lock, concurrent readers: %d, writers: %d\n", 
				id, currentReaders, currentWriters)
			
			// Verify no writers are active
			if currentWriters > 0 {
				panic(fmt.Sprintf("RWMutex failure: %d writers active during read lock", currentWriters))
			}
			
			// Simulate reading
			time.Sleep(50 * time.Millisecond)
			
			// Decrement reader count and release lock
			atomic.AddInt32(&readers, -1)
			rwMu.RUnlock()
			fmt.Printf("Reader %d: Released read lock\n", id)
			
			// Small pause between operations
			time.Sleep(10 * time.Millisecond)
		}
	}
	
	// Function for writer goroutines
	writer := func(id int, wg *sync.WaitGroup) {
		defer wg.Done()
		
		for i := 0; i < 2; i++ {
			// Acquire write lock
			fmt.Printf("Writer %d: Attempting to acquire write lock\n", id)
			rwMu.Lock()
			
			// Increment writer count and check for other writers and readers
			currentWriters := atomic.AddInt32(&writers, 1)
			currentReaders := atomic.LoadInt32(&readers)
			fmt.Printf("Writer %d: Acquired write lock, concurrent writers: %d, readers: %d\n", 
				id, currentWriters, currentReaders)
			
			// Verify no other writers or readers are active
			if currentWriters != 1 {
				panic(fmt.Sprintf("RWMutex failure: %d writers active concurrently", currentWriters))
			}
			if currentReaders > 0 {
				panic(fmt.Sprintf("RWMutex failure: %d readers active during write lock", currentReaders))
			}
			
			// Simulate writing
			time.Sleep(100 * time.Millisecond)
			
			// Decrement writer count and release lock
			atomic.AddInt32(&writers, -1)
			rwMu.Unlock()
			fmt.Printf("Writer %d: Released write lock\n", id)
			
			// Small pause between operations
			time.Sleep(20 * time.Millisecond)
		}
	}
	
	// Launch multiple readers and writers
	var wg sync.WaitGroup
	
	// Start readers
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go reader(i, &wg)
	}
	
	// Start writers
	for i := 1; i <= 2; i++ {
		wg.Add(1)
		go writer(i, &wg)
	}
	
	wg.Wait()
	fmt.Println("All reader and writer goroutines completed successfully")
	
	// Demonstrate writer preference
	demonstrateWriterPreference()
}

func demonstrateWriterPreference() {
	var rwMu sync.RWMutex
	
	// Signal channels
	readerReady := make(chan bool)
	writerReady := make(chan bool)
	done := make(chan bool)
	
	// Start a long-running reader
	go func() {
		rwMu.RLock()
		readerReady <- true
		
		// Hold read lock until done
		<-done
		rwMu.RUnlock()
	}()
	
	// Wait for reader to acquire lock
	<-readerReady
	
	// Start a writer that will block
	go func() {
		writerReady <- true
		
		// This will block until reader releases lock
		rwMu.Lock()
		rwMu.Unlock()
		done <- true
	}()
	
	// Wait for writer to be ready
	<-writerReady
	
	// Try to acquire another read lock after writer is waiting
	// This demonstrates writer preference - new readers will block
	// if there's a writer waiting
	readBlocked := make(chan bool)
	go func() {
		// Small delay to ensure writer is waiting
		time.Sleep(10 * time.Millisecond)
		
		fmt.Println("New reader attempting to acquire lock while writer is waiting...")
		
		// This should block because writer is waiting
		rwMu.RLock()
		rwMu.RUnlock()
		
		readBlocked <- true
	}()
	
	// Let the system run for a bit
	select {
	case <-readBlocked:
		fmt.Println("UNEXPECTED: New reader acquired lock while writer was waiting")
	case <-time.After(50 * time.Millisecond):
		fmt.Println("EXPECTED: New reader blocked because writer is waiting (writer preference)")
	}
	
	// Signal the original reader to release the lock
	done <- true
	
	// Wait for everything to complete
	<-done
	<-readBlocked
}

func main() {
	rwMutexDemo()
}

Key insights about RWMutex:

  1. Reader Concurrency: Multiple goroutines can hold read locks simultaneously, as verified by the concurrent reader count.
  2. Writer Exclusivity: Write locks exclude both readers and other writers.
  3. Writer Preference: When both readers and writers are waiting, writers are generally given preference to prevent writer starvation in read-heavy workloads.

The RWMutex implementation uses a clever combination of a mutex and semaphore to track readers and coordinate with writers. This makes it significantly more complex than a basic mutex but enables higher throughput in read-heavy scenarios.

WaitGroup: Coordinating Task Completion

The sync.WaitGroup provides a mechanism to wait for a collection of goroutines to finish their work:

package main

import (
	"fmt"
	"sync"
	"time"
)

func waitGroupPatterns() {
	// Basic WaitGroup usage
	basicWaitGroup()
	
	// Dynamic task generation
	dynamicWaitGroup()
	
	// Hierarchical WaitGroups
	hierarchicalWaitGroups()
}

func basicWaitGroup() {
	fmt.Println("\n=== Basic WaitGroup ===")
	var wg sync.WaitGroup
	
	// Launch multiple workers
	for i := 1; i <= 3; i++ {
		// Increment counter before launching goroutine
		wg.Add(1)
		
		go func(id int) {
			// Ensure counter is decremented when goroutine completes
			defer wg.Done()
			
			fmt.Printf("Worker %d: Starting\n", id)
			time.Sleep(time.Duration(id) * 100 * time.Millisecond)
			fmt.Printf("Worker %d: Completed\n", id)
		}(i)
	}
	
	// Wait for all workers to complete
	fmt.Println("Main: Waiting for workers to complete")
	wg.Wait()
	fmt.Println("Main: All workers completed")
}

func dynamicWaitGroup() {
	fmt.Println("\n=== Dynamic WaitGroup ===")
	var wg sync.WaitGroup
	
	// Channel for dynamic task generation
	tasks := make(chan int, 10)
	
	// Launch worker pool
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			
			fmt.Printf("Worker %d: Started, processing tasks\n", workerID)
			
			// Process tasks until channel is closed
			for taskID := range tasks {
				fmt.Printf("Worker %d: Processing task %d\n", workerID, taskID)
				time.Sleep(50 * time.Millisecond)
				fmt.Printf("Worker %d: Completed task %d\n", workerID, taskID)
			}
			
			fmt.Printf("Worker %d: Shutting down\n", workerID)
		}(i)
	}
	
	// Generate tasks
	fmt.Println("Main: Generating tasks")
	for i := 1; i <= 6; i++ {
		tasks <- i
	}
	
	// Signal no more tasks
	fmt.Println("Main: Closing task channel")
	close(tasks)
	
	// Wait for all workers to complete
	fmt.Println("Main: Waiting for workers to complete")
	wg.Wait()
	fmt.Println("Main: All workers completed")
}

func hierarchicalWaitGroups() {
	fmt.Println("\n=== Hierarchical WaitGroups ===")
	var parentWg sync.WaitGroup
	
	// Launch parent tasks
	for i := 1; i <= 2; i++ {
		parentWg.Add(1)
		
		go func(parentID int) {
			defer parentWg.Done()
			
			fmt.Printf("Parent %d: Starting\n", parentID)
			
			// Each parent launches multiple child tasks
			var childWg sync.WaitGroup
			
			for j := 1; j <= 3; j++ {
				childWg.Add(1)
				
				go func(parentID, childID int) {
					defer childWg.Done()
					
					fmt.Printf("Parent %d, Child %d: Starting\n", parentID, childID)
					time.Sleep(50 * time.Millisecond)
					fmt.Printf("Parent %d, Child %d: Completed\n", parentID, childID)
				}(parentID, j)
			}
			
			// Wait for all children of this parent to complete
			fmt.Printf("Parent %d: Waiting for children\n", parentID)
			childWg.Wait()
			fmt.Printf("Parent %d: All children completed\n", parentID)
		}(i)
	}
	
	// Wait for all parent tasks to complete
	fmt.Println("Main: Waiting for parent tasks to complete")
	parentWg.Wait()
	fmt.Println("Main: All parent tasks completed")
}

func main() {
	waitGroupPatterns()
}

This example demonstrates several WaitGroup patterns:

  1. Basic Coordination: Using Add/Done/Wait to coordinate multiple goroutines.
  2. Dynamic Task Processing: Combining WaitGroup with channels for worker pools.
  3. Hierarchical Coordination: Nesting WaitGroups to coordinate multi-level task hierarchies.

Internally, WaitGroup uses atomic operations to maintain its counter, making it highly efficient for coordination across many goroutines. The implementation avoids mutex locks for the common Add/Done operations, only using synchronization when Wait is called.

Once: Ensuring Single Execution

The sync.Once primitive guarantees that a function is executed exactly once, even when called from multiple goroutines:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

func oncePatterns() {
	// Basic Once usage
	basicOnce()
	
	// Once with error handling
	onceWithErrorHandling()
	
	// Multiple Once objects
	multipleOnce()
}

func basicOnce() {
	fmt.Println("\n=== Basic Once ===")
	var once sync.Once
	var counter int32
	
	// Function to be executed once
	initFunc := func() {
		// Simulate initialization work
		time.Sleep(100 * time.Millisecond)
		atomic.AddInt32(&counter, 1)
		fmt.Println("Initialization done")
	}
	
	// Launch multiple goroutines that all try to execute initFunc
	var wg sync.WaitGroup
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			fmt.Printf("Goroutine %d: Calling initialization\n", id)
			once.Do(initFunc)
			fmt.Printf("Goroutine %d: Initialization call returned\n", id)
		}(i)
	}
	
	wg.Wait()
	fmt.Printf("Counter value: %d (should be 1)\n", atomic.LoadInt32(&counter))
}

func onceWithErrorHandling() {
	fmt.Println("\n=== Once with Error Handling ===")
	var once sync.Once
	var initErr error
	var initialized int32
	
	// Function that might fail
	initFunc := func() {
		// Simulate initialization that might fail
		fmt.Println("Performing initialization...")
		time.Sleep(100 * time.Millisecond)
		
		// Simulate a failure
		// initErr = fmt.Errorf("initialization failed")
		
		// If no error, mark as initialized
		if initErr == nil {
			atomic.StoreInt32(&initialized, 1)
			fmt.Println("Initialization succeeded")
		} else {
			fmt.Printf("Initialization failed: %v\n", initErr)
		}
	}
	
	// Wrapper function for initialization with error checking
	initWithErrorCheck := func() error {
		once.Do(initFunc)
		return initErr
	}
	
	// Try initialization from multiple goroutines
	var wg sync.WaitGroup
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			fmt.Printf("Goroutine %d: Attempting initialization\n", id)
			err := initWithErrorCheck()
			if err != nil {
				fmt.Printf("Goroutine %d: Initialization error: %v\n", id, err)
			} else {
				fmt.Printf("Goroutine %d: Initialization successful\n", id)
			}
		}(i)
	}
	
	wg.Wait()
	fmt.Printf("Final initialization state: %v\n", atomic.LoadInt32(&initialized) == 1)
}

func multipleOnce() {
	fmt.Println("\n=== Multiple Once Objects ===")
	
	// Create a map of Once objects for different resources
	var mu sync.Mutex
	resources := make(map[string]*sync.Once)
	
	// Function to get or create a Once object for a resource
	getResourceOnce := func(name string) *sync.Once {
		mu.Lock()
		defer mu.Unlock()
		
		if once, exists := resources[name]; exists {
			return once
		}
		
		once := new(sync.Once)
		resources[name] = once
		return once
	}
	
	// Function to initialize a resource
	initResource := func(name string) {
		fmt.Printf("Initializing resource: %s\n", name)
		time.Sleep(50 * time.Millisecond)
		fmt.Printf("Resource initialized: %s\n", name)
	}
	
	// Launch goroutines that initialize different resources
	var wg sync.WaitGroup
	
	resourceNames := []string{"db", "cache", "logger", "db", "cache"}
	for i, name := range resourceNames {
		wg.Add(1)
		go func(id int, resourceName string) {
			defer wg.Done()
			
			fmt.Printf("Goroutine %d: Initializing %s\n", id, resourceName)
			once := getResourceOnce(resourceName)
			once.Do(func() { initResource(resourceName) })
			fmt.Printf("Goroutine %d: %s initialization call returned\n", id, resourceName)
		}(i, name)
	}
	
	wg.Wait()
	
	// Check how many unique resources were initialized
	mu.Lock()
	fmt.Printf("Number of unique resources: %d\n", len(resources))
	mu.Unlock()
}

func main() {
	oncePatterns()
}

Key insights about Once:

  1. Guaranteed Single Execution: The function passed to Do() executes exactly once, regardless of how many goroutines call it.
  2. Blocking Behavior: All calls to Do() block until the first call completes.
  3. Error Handling: Once doesn’t provide built-in error handling, but patterns can be implemented to address this.
  4. Multiple Once Objects: Different initialization tasks require separate Once instances.

Internally, Once uses a combination of a mutex and an atomic flag to ensure the function is executed exactly once while allowing concurrent calls to proceed efficiently after initialization.

Cond: Coordinating Based on Conditions

The sync.Cond primitive provides a way for goroutines to wait for a condition to become true:

package main

import (
	"fmt"
	"sync"
	"time"
)

func condPatterns() {
	// Basic condition variable usage
	basicCond()
	
	// Broadcast vs. Signal
	broadcastVsSignal()
	
	// Producer-consumer pattern
	producerConsumer()
}

func basicCond() {
	fmt.Println("\n=== Basic Condition Variable ===")
	
	// Create a condition variable with its associated mutex
	var mu sync.Mutex
	cond := sync.NewCond(&mu)
	
	// Shared state protected by the mutex
	ready := false
	
	// Launch a goroutine that will set the condition
	go func() {
		// Simulate some preparation work
		time.Sleep(300 * time.Millisecond)
		
		// Update the shared state and signal waiters
		mu.Lock()
		ready = true
		fmt.Println("Signaler: Condition is now true")
		cond.Signal()
		mu.Unlock()
	}()
	
	// Wait for the condition to become true
	fmt.Println("Waiter: Waiting for condition")
	
	mu.Lock()
	// While loop pattern to handle spurious wakeups
	for !ready {
		cond.Wait() // Atomically releases mutex and blocks until signaled
	}
	mu.Unlock()
	
	fmt.Println("Waiter: Condition is true, proceeding")
}

func broadcastVsSignal() {
	fmt.Println("\n=== Broadcast vs. Signal ===")
	
	var mu sync.Mutex
	cond := sync.NewCond(&mu)
	
	// Shared state
	ready := false
	
	// Launch multiple waiters
	var wg sync.WaitGroup
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			fmt.Printf("Waiter %d: Waiting for condition\n", id)
			
			mu.Lock()
			for !ready {
				cond.Wait()
				fmt.Printf("Waiter %d: Woke up, checking condition\n", id)
			}
			fmt.Printf("Waiter %d: Condition is true, proceeding\n", id)
			mu.Unlock()
		}(i)
	}
	
	// Give waiters time to start waiting
	time.Sleep(100 * time.Millisecond)
	
	// Signal vs. Broadcast demonstration
	fmt.Println("\nDemonstrating Signal (wakes only one waiter):")
	mu.Lock()
	cond.Signal()
	mu.Unlock()
	
	// Give signaled waiter time to process
	time.Sleep(100 * time.Millisecond)
	
	fmt.Println("\nDemonstrating Broadcast (wakes all waiters):")
	mu.Lock()
	ready = true
	cond.Broadcast()
	mu.Unlock()
	
	wg.Wait()
}

func producerConsumer() {
	fmt.Println("\n=== Producer-Consumer with Cond ===")
	
	var mu sync.Mutex
	notEmpty := sync.NewCond(&mu)
	notFull := sync.NewCond(&mu)
	
	// Bounded buffer
	const capacity = 3
	buffer := make([]int, 0, capacity)
	
	// Producer function
	producer := func(items int, wg *sync.WaitGroup) {
		defer wg.Done()
		
		for i := 1; i <= items; i++ {
			mu.Lock()
			
			// Wait while buffer is full
			for len(buffer) == capacity {
				fmt.Println("Producer: Buffer full, waiting")
				notFull.Wait()
			}
			
			// Add item to buffer
			buffer = append(buffer, i)
			fmt.Printf("Producer: Added item %d, buffer size: %d\n", i, len(buffer))
			
			// Signal that buffer is not empty
			notEmpty.Signal()
			
			mu.Unlock()
			
			// Simulate variable production rate
			time.Sleep(50 * time.Millisecond)
		}
		
		fmt.Println("Producer: Finished producing items")
	}
	
	// Consumer function
	consumer := func(wg *sync.WaitGroup) {
		defer wg.Done()
		
		// Consume 8 items (more than producer will produce)
		for consumed := 0; consumed < 8; consumed++ {
			mu.Lock()
			
			// Wait while buffer is empty
			for len(buffer) == 0 {
				fmt.Println("Consumer: Buffer empty, waiting")
				notEmpty.Wait()
				
				// Check if we should exit (producer is done and buffer is empty)
				if producerDone && len(buffer) == 0 {
					fmt.Println("Consumer: No more items, exiting")
					mu.Unlock()
					return
				}
			}
			
			// Remove item from buffer
			item := buffer[0]
			buffer = buffer[1:]
			fmt.Printf("Consumer: Removed item %d, buffer size: %d\n", item, len(buffer))
			
			// Signal that buffer is not full
			notFull.Signal()
			
			mu.Unlock()
			
			// Simulate processing
			time.Sleep(100 * time.Millisecond)
		}
		
		fmt.Println("Consumer: Finished consuming items")
	}
	
	// Flag to indicate producer is done
	var producerDone bool
	
	// Start producer and consumer
	var wg sync.WaitGroup
	wg.Add(2)
	
	go producer(5, &wg)
	go consumer(&wg)
	
	// Wait for producer to finish
	go func() {
		time.Sleep(500 * time.Millisecond)
		mu.Lock()
		producerDone = true
		// Wake up consumer if it's waiting
		notEmpty.Signal()
		mu.Unlock()
	}()
	
	wg.Wait()
	fmt.Println("Main: Producer and consumer have finished")
}

func main() {
	condPatterns()
}

Key insights about Cond:

  1. Wait Atomically Releases Lock: The Wait method atomically releases the mutex and blocks the goroutine.
  2. Spurious Wakeups: Always use a loop to check the condition when waiting.
  3. Signal vs. Broadcast: Signal wakes one waiter, while Broadcast wakes all waiters.
  4. Producer-Consumer Pattern: Cond variables are ideal for implementing bounded buffer patterns with multiple conditions.

Internally, Cond maintains a linked list of waiting goroutines. When Signal is called, it moves the first waiter to the ready queue. When Broadcast is called, it moves all waiters to the ready queue. This implementation ensures efficient signaling without unnecessary context switches.

Pool: Managing Reusable Resources

The sync.Pool provides a way to reuse temporary objects, reducing allocation pressure on the garbage collector:

package main

import (
	"bytes"
	"fmt"
	"sync"
	"time"
)

func poolPatterns() {
	// Basic pool usage
	basicPool()
	
	// Pool with custom initialization
	customInitPool()
	
	// Pool performance benchmark
	poolPerformance()
}

func basicPool() {
	fmt.Println("\n=== Basic Pool Usage ===")
	
	// Create a pool of byte buffers
	var bufferPool = sync.Pool{
		New: func() interface{} {
			buffer := new(bytes.Buffer)
			fmt.Println("Creating new buffer")
			return buffer
		},
	}
	
	// Get a buffer from the pool
	fmt.Println("Getting first buffer:")
	buffer1 := bufferPool.Get().(*bytes.Buffer)
	
	// Use the buffer
	buffer1.WriteString("Hello, World!")
	fmt.Printf("Buffer 1 contents: %s\n", buffer1.String())
	
	// Reset and return the buffer to the pool
	buffer1.Reset()
	bufferPool.Put(buffer1)
	fmt.Println("Returned buffer 1 to pool")
	
	// Get another buffer (should reuse the one we just put back)
	fmt.Println("Getting second buffer:")
	buffer2 := bufferPool.Get().(*bytes.Buffer)
	
	// Check if it's the same buffer
	fmt.Printf("Buffer 2 is buffer 1: %v\n", buffer1 == buffer2)
	
	// Use the buffer again
	buffer2.WriteString("Reused buffer")
	fmt.Printf("Buffer 2 contents: %s\n", buffer2.String())
	
	// Return to pool
	buffer2.Reset()
	bufferPool.Put(buffer2)
}

func customInitPool() {
	fmt.Println("\n=== Pool with Custom Initialization ===")
	
	// Create a pool of slices with custom capacity
	var slicePool = sync.Pool{
		New: func() interface{} {
			slice := make([]int, 0, 1024)
			fmt.Println("Created new slice with capacity 1024")
			return &slice
		},
	}
	
	// Function that uses a slice from the pool
	processData := func(id int, data []int) {
		// Get a slice from the pool
		slicePtr := slicePool.Get().(*[]int)
		slice := (*slicePtr)[:0] // Reset length but keep capacity
		
		// Use the slice
		slice = append(slice, data...)
		fmt.Printf("Worker %d: Processing %d items with slice capacity %d\n",
			id, len(slice), cap(slice))
		
		// Return slice to pool
		*slicePtr = slice[:0] // Clear again before returning
		slicePool.Put(slicePtr)
	}
	
	// Use the pool from multiple goroutines
	var wg sync.WaitGroup
	for i := 1; i <= 4; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			// Generate some test data
			data := make([]int, id*100)
			for j := range data {
				data[j] = j
			}
			
			processData(id, data)
		}(i)
	}
	
	wg.Wait()
}

func poolPerformance() {
	fmt.Println("\n=== Pool Performance Benchmark ===")
	
	// Create a pool of large buffers
	var bufferPool = sync.Pool{
		New: func() interface{} {
			return make([]byte, 1024*1024) // 1MB buffer
		},
	}
	
	// Function that either uses the pool or allocates directly
	benchmark := func(usePool bool, iterations int) time.Duration {
		start := time.Now()
		
		for i := 0; i < iterations; i++ {
			var buffer []byte
			
			if usePool {
				// Get from pool
				buffer = bufferPool.Get().([]byte)
				
				// Use the buffer
				buffer[0] = 1
				
				// Return to pool
				bufferPool.Put(buffer)
			} else {
				// Allocate directly
				buffer = make([]byte, 1024*1024)
				buffer[0] = 1
				// Buffer will be garbage collected
			}
		}
		
		return time.Since(start)
	}
	
	// Run benchmarks
	iterations := 1000
	
	// Warm up the pool
	benchmark(true, 100)
	
	// Benchmark with and without pool
	withoutPoolTime := benchmark(false, iterations)
	withPoolTime := benchmark(true, iterations)
	
	fmt.Printf("Without Pool: %v for %d iterations\n", withoutPoolTime, iterations)
	fmt.Printf("With Pool: %v for %d iterations\n", withPoolTime, iterations)
	fmt.Printf("Performance improvement: %.2fx\n",
		float64(withoutPoolTime)/float64(withPoolTime))
}

func main() {
	poolPatterns()
}

Key insights about Pool:

  1. Temporary Object Reuse: Pool reduces garbage collection pressure by reusing objects.
  2. No Guarantees: Items may be removed from the pool at any time, especially during garbage collection.
  3. Thread Safety: Pool is safe for concurrent use by multiple goroutines.
  4. Reset Before Reuse: Always reset objects before returning them to the pool.

Internally, Pool uses per-P (processor) caches to minimize contention. Each P has its own local pool, and items are stolen from other Ps only when the local pool is empty. This design makes Pool highly efficient for concurrent access patterns.

Map: Concurrent Access to Maps

The sync.Map provides a concurrent map implementation optimized for specific access patterns:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func mapPatterns() {
	// Basic Map usage
	basicMap()
	
	// Load or store pattern
	loadOrStore()
	
	// Range iteration
	rangeMap()
}

func basicMap() {
	fmt.Println("\n=== Basic Map Operations ===")
	
	// Create a concurrent map
	var m sync.Map
	
	// Store values
	m.Store("key1", 100)
	m.Store("key2", 200)
	m.Store("key3", 300)
	
	// Load values
	value, ok := m.Load("key2")
	fmt.Printf("Load key2: value=%v, found=%v\n", value, ok)
	
	value, ok = m.Load("nonexistent")
	fmt.Printf("Load nonexistent: value=%v, found=%v\n", value, ok)
	
	// Delete a value
	m.Delete("key1")
	value, ok = m.Load("key1")
	fmt.Printf("After delete, load key1: value=%v, found=%v\n", value, ok)
}

func loadOrStore() {
	fmt.Println("\n=== LoadOrStore Pattern ===")
	
	// Create a concurrent map
	var m sync.Map
	
	// Function to get or initialize a counter
	getCounter := func(key string) *int64 {
		// Try to load existing counter
		value, ok := m.Load(key)
		if ok {
			return value.(*int64)
		}
		
		// Create new counter
		counter := new(int64)
		
		// Try to store it (might race with another goroutine)
		actual, loaded := m.LoadOrStore(key, counter)
		if loaded {
			// Another goroutine beat us to it
			return actual.(*int64)
		}
		
		// We stored our counter
		return counter
	}
	
	// Use counters from multiple goroutines
	var wg sync.WaitGroup
	keys := []string{"a", "b", "a", "c", "b", "a"}
	
	for i, key := range keys {
		wg.Add(1)
		go func(id int, k string) {
			defer wg.Done()
			
			// Get or initialize counter for this key
			counter := getCounter(k)
			
			// Increment counter
			newValue := atomic.AddInt64(counter, 1)
			fmt.Printf("Goroutine %d: Incremented counter for key %s to %d\n",
				id, k, newValue)
		}(i, key)
	}
	
	wg.Wait()
	
	// Print final counter values
	fmt.Println("Final counter values:")
	m.Range(func(key, value interface{}) bool {
		fmt.Printf("  %s: %d\n", key, *value.(*int64))
		return true
	})
}

func rangeMap() {
	fmt.Println("\n=== Range Iteration ===")
	
	// Create and populate a map
	var m sync.Map
	for i := 1; i <= 5; i++ {
		key := fmt.Sprintf("key%d", i)
		m.Store(key, i*10)
	}
	
	// Iterate over all key-value pairs
	fmt.Println("Map contents:")
	m.Range(func(key, value interface{}) bool {
		fmt.Printf("  %v: %v\n", key, value)
		return true // continue iteration
	})
	
	// Selective iteration (stop after finding a specific key)
	fmt.Println("Searching for key3:")
	found := false
	m.Range(func(key, value interface{}) bool {
		fmt.Printf("  Checking %v\n", key)
		if key.(string) == "key3" {
			fmt.Printf("  Found key3 with value %v\n", value)
			found = true
			return false // stop iteration
		}
		return true // continue iteration
	})
	
	if !found {
		fmt.Println("  key3 not found")
	}
}

func main() {
	mapPatterns()
}

Key insights about Map:

  1. Optimized Use Cases: Sync.Map is optimized for cases where keys are stable (few deletions) and either:
    • Each key is accessed by only one goroutine (partitioned access)
    • Many goroutines read and few write (read-heavy workloads)
  2. No Length Method: Unlike built-in maps, sync.Map doesn’t provide a length method.
  3. Type Assertions Required: Values are stored as interface{}, requiring type assertions when retrieved.
  4. Atomic Operations: LoadOrStore provides an atomic “check and set” operation.

Internally, sync.Map uses a clever combination of an immutable read-only map and a dirty map with a mutex. This design allows for lock-free reads in the common case, falling back to the dirty map only when necessary.

Advanced Synchronization Patterns

Beyond the basic primitives, Go enables sophisticated synchronization patterns by combining multiple primitives and leveraging channels.

Semaphore Implementation

Semaphores are not provided directly in the sync package but can be implemented using channels or a combination of mutex and condition variable:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// ChannelSemaphore implements a semaphore using a buffered channel
type ChannelSemaphore struct {
	permits chan struct{}
}

// NewChannelSemaphore creates a new semaphore with the given number of permits
func NewChannelSemaphore(n int) *ChannelSemaphore {
	return &ChannelSemaphore{
		permits: make(chan struct{}, n),
	}
}

// Acquire acquires a permit, blocking until one is available
func (s *ChannelSemaphore) Acquire() {
	s.permits <- struct{}{}
}

// TryAcquire attempts to acquire a permit without blocking
func (s *ChannelSemaphore) TryAcquire() bool {
	select {
	case s.permits <- struct{}{}:
		return true
	default:
		return false
	}
}

// AcquireWithTimeout attempts to acquire a permit with a timeout
func (s *ChannelSemaphore) AcquireWithTimeout(timeout time.Duration) bool {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	
	select {
	case s.permits <- struct{}{}:
		return true
	case <-ctx.Done():
		return false
	}
}

// Release releases a permit
func (s *ChannelSemaphore) Release() {
	select {
	case <-s.permits:
	default:
		panic("Semaphore release without acquire")
	}
}

// MutexSemaphore implements a semaphore using mutex and condition variable
type MutexSemaphore struct {
	mu      sync.Mutex
	cond    *sync.Cond
	count   int
	maxSize int
}

// NewMutexSemaphore creates a new semaphore with the given number of permits
func NewMutexSemaphore(n int) *MutexSemaphore {
	s := &MutexSemaphore{
		maxSize: n,
		count:   n,
	}
	s.cond = sync.NewCond(&s.mu)
	return s
}

// Acquire acquires a permit, blocking until one is available
func (s *MutexSemaphore) Acquire() {
	s.mu.Lock()
	defer s.mu.Unlock()
	
	for s.count == 0 {
		s.cond.Wait()
	}
	
	s.count--
}

// TryAcquire attempts to acquire a permit without blocking
func (s *MutexSemaphore) TryAcquire() bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	
	if s.count == 0 {
		return false
	}
	
	s.count--
	return true
}

// Release releases a permit
func (s *MutexSemaphore) Release() {
	s.mu.Lock()
	defer s.mu.Unlock()
	
	if s.count >= s.maxSize {
		panic("Semaphore release without acquire")
	}
	
	s.count++
	s.cond.Signal()
}

func demonstrateSemaphores() {
	fmt.Println("\n=== Channel-based Semaphore ===")
	
	// Create a semaphore with 3 permits
	sem := NewChannelSemaphore(3)
	
	// Launch workers that need to acquire the semaphore
	var wg sync.WaitGroup
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			fmt.Printf("Worker %d: Attempting to acquire semaphore\n", id)
			
			// Try with timeout first
			if sem.AcquireWithTimeout(time.Duration(id) * 50 * time.Millisecond) {
				fmt.Printf("Worker %d: Acquired with timeout\n", id)
			} else {
				fmt.Printf("Worker %d: Timeout expired, acquiring with blocking call\n", id)
				sem.Acquire()
				fmt.Printf("Worker %d: Acquired after blocking\n", id)
			}
			
			// Simulate work
			fmt.Printf("Worker %d: Working...\n", id)
			time.Sleep(200 * time.Millisecond)
			
			// Release the semaphore
			fmt.Printf("Worker %d: Releasing semaphore\n", id)
			sem.Release()
		}(i)
		
		// Small delay between starting workers
		time.Sleep(10 * time.Millisecond)
	}
	
	wg.Wait()
	
	fmt.Println("\n=== Mutex-based Semaphore ===")
	
	// Create a mutex-based semaphore with 2 permits
	mutexSem := NewMutexSemaphore(2)
	
	// Launch workers
	for i := 1; i <= 4; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			fmt.Printf("Worker %d: Attempting to acquire mutex semaphore\n", id)
			
			// Try non-blocking acquire first
			if mutexSem.TryAcquire() {
				fmt.Printf("Worker %d: Acquired without blocking\n", id)
			} else {
				fmt.Printf("Worker %d: Could not acquire immediately, blocking...\n", id)
				mutexSem.Acquire()
				fmt.Printf("Worker %d: Acquired after blocking\n", id)
			}
			
			// Simulate work
			fmt.Printf("Worker %d: Working...\n", id)
			time.Sleep(150 * time.Millisecond)
			
			// Release the semaphore
			fmt.Printf("Worker %d: Releasing mutex semaphore\n", id)
			mutexSem.Release()
		}(i)
		
		// Small delay between starting workers
		time.Sleep(10 * time.Millisecond)
	}
	
	wg.Wait()
}

func main() {
	demonstrateSemaphores()
}

This example demonstrates two semaphore implementations:

  1. Channel-based Semaphore: Uses a buffered channel to represent permits.
  2. Mutex-based Semaphore: Uses a mutex and condition variable for more complex control.

Both implementations provide similar functionality but have different performance characteristics and capabilities.

Read-Write Preference Control

While sync.RWMutex gives preference to writers, sometimes we need more control over access patterns:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// ReaderPreferenceRWMutex gives preference to readers over writers
type ReaderPreferenceRWMutex struct {
	mu          sync.Mutex
	readerCount int32
	readerWait  sync.Cond
	writerWait  sync.Cond
	writing     bool
}

// NewReaderPreferenceRWMutex creates a new reader-preference RW mutex
func NewReaderPreferenceRWMutex() *ReaderPreferenceRWMutex {
	rw := &ReaderPreferenceRWMutex{}
	rw.readerWait.L = &rw.mu
	rw.writerWait.L = &rw.mu
	return rw
}

// RLock acquires a read lock
func (rw *ReaderPreferenceRWMutex) RLock() {
	rw.mu.Lock()
	// Wait if someone is writing
	for rw.writing {
		rw.readerWait.Wait()
	}
	atomic.AddInt32(&rw.readerCount, 1)
	rw.mu.Unlock()
}

// RUnlock releases a read lock
func (rw *ReaderPreferenceRWMutex) RUnlock() {
	rw.mu.Lock()
	atomic.AddInt32(&rw.readerCount, -1)
	// If no more readers, signal a waiting writer
	if atomic.LoadInt32(&rw.readerCount) == 0 {
		rw.writerWait.Signal()
	}
	rw.mu.Unlock()
}

// Lock acquires a write lock
func (rw *ReaderPreferenceRWMutex) Lock() {
	rw.mu.Lock()
	// Wait until there are no readers and no one is writing
	for atomic.LoadInt32(&rw.readerCount) > 0 || rw.writing {
		rw.writerWait.Wait()
	}
	rw.writing = true
	rw.mu.Unlock()
}

// Unlock releases a write lock
func (rw *ReaderPreferenceRWMutex) Unlock() {
	rw.mu.Lock()
	rw.writing = false
	// Signal all waiting readers (preference to readers)
	rw.readerWait.Broadcast()
	// Also signal one writer
	rw.writerWait.Signal()
	rw.mu.Unlock()
}

func demonstrateRWPreference() {
	fmt.Println("\n=== Reader Preference RWMutex ===")
	
	// Create a reader-preference RW mutex
	rwMutex := NewReaderPreferenceRWMutex()
	
	// Shared resource
	var sharedData int
	
	// Track operation counts
	var readCount, writeCount int32
	
	// Start time for measuring operations
	startTime := time.Now()
	endTime := startTime.Add(1 * time.Second)
	
	// Launch reader goroutines
	var wg sync.WaitGroup
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for time.Now().Before(endTime) {
				// Acquire read lock
				rwMutex.RLock()
				
				// Read shared data
				_ = sharedData
				atomic.AddInt32(&readCount, 1)
				
				// Hold lock briefly
				time.Sleep(5 * time.Millisecond)
				
				// Release read lock
				rwMutex.RUnlock()
				
				// Small pause between reads
				time.Sleep(5 * time.Millisecond)
			}
		}(i)
	}
	
	// Launch writer goroutines
	for i := 1; i <= 2; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for time.Now().Before(endTime) {
				// Acquire write lock
				rwMutex.Lock()
				
				// Update shared data
				sharedData++
				atomic.AddInt32(&writeCount, 1)
				
				// Hold lock briefly
				time.Sleep(10 * time.Millisecond)
				
				// Release write lock
				rwMutex.Unlock()
				
				// Small pause between writes
				time.Sleep(20 * time.Millisecond)
			}
		}(i)
	}
	
	wg.Wait()
	
	fmt.Printf("Reader operations: %d\n", atomic.LoadInt32(&readCount))
	fmt.Printf("Writer operations: %d\n", atomic.LoadInt32(&writeCount))
	fmt.Printf("Ratio (reads/writes): %.2f\n",
		float64(atomic.LoadInt32(&readCount))/float64(atomic.LoadInt32(&writeCount)))
	
	// Compare with standard RWMutex (which has writer preference)
	compareWithStandardRWMutex()
}

func compareWithStandardRWMutex() {
	fmt.Println("\n=== Standard RWMutex (Writer Preference) ===")
	
	// Create a standard RW mutex
	var rwMutex sync.RWMutex
	
	// Shared resource
	var sharedData int
	
	// Track operation counts
	var readCount, writeCount int32
	
	// Start time for measuring operations
	startTime := time.Now()
	endTime := startTime.Add(1 * time.Second)
	
	// Launch reader goroutines
	var wg sync.WaitGroup
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for time.Now().Before(endTime) {
				// Acquire read lock
				rwMutex.RLock()
				
				// Read shared data
				_ = sharedData
				atomic.AddInt32(&readCount, 1)
				
				// Hold lock briefly
				time.Sleep(5 * time.Millisecond)
				
				// Release read lock
				rwMutex.RUnlock()
				
				// Small pause between reads
				time.Sleep(5 * time.Millisecond)
			}
		}(i)
	}
	
	// Launch writer goroutines
	for i := 1; i <= 2; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for time.Now().Before(endTime) {
				// Acquire write lock
				rwMutex.Lock()
				
				// Update shared data
				sharedData++
				atomic.AddInt32(&writeCount, 1)
				
				// Hold lock briefly
				time.Sleep(10 * time.Millisecond)
				
				// Release write lock
				rwMutex.Unlock()
				
				// Small pause between writes
				time.Sleep(20 * time.Millisecond)
			}
		}(i)
	}
	
	wg.Wait()
	
	fmt.Printf("Reader operations: %d\n", atomic.LoadInt32(&readCount))
	fmt.Printf("Writer operations: %d\n", atomic.LoadInt32(&writeCount))
	fmt.Printf("Ratio (reads/writes): %.2f\n",
		float64(atomic.LoadInt32(&readCount))/float64(atomic.LoadInt32(&writeCount)))
}

func main() {
	demonstrateRWPreference()
}

This example demonstrates:

  1. Custom RWMutex Implementation: A reader-preference RWMutex that prioritizes readers over writers.
  2. Performance Comparison: Comparing the custom implementation with the standard RWMutex.

The choice between reader and writer preference depends on the specific workload and requirements of your application.

Atomic Operations and Lock-Free Programming

For simple shared state, atomic operations can provide synchronization without locks:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

func atomicPatterns() {
	// Basic atomic operations
	basicAtomic()
	
	// Compare-and-swap patterns
	casPatterns()
	
	// Lock-free counter
	lockFreeCounter()
}

func basicAtomic() {
	fmt.Println("\n=== Basic Atomic Operations ===")
	
	// Atomic integer operations
	var counter int64
	
	// Add
	atomic.AddInt64(&counter, 10)
	fmt.Printf("After adding 10: %d\n", atomic.LoadInt64(&counter))
	
	// Subtract (add negative)
	atomic.AddInt64(&counter, -5)
	fmt.Printf("After subtracting 5: %d\n", atomic.LoadInt64(&counter))
	
	// Store
	atomic.StoreInt64(&counter, 42)
	fmt.Printf("After storing 42: %d\n", atomic.LoadInt64(&counter))
	
	// Swap (returns old value)
	old := atomic.SwapInt64(&counter, 100)
	fmt.Printf("Swapped %d with 100, now: %d\n", old, atomic.LoadInt64(&counter))
}

func casPatterns() {
	fmt.Println("\n=== Compare-And-Swap Patterns ===")
	
	var value int64 = 100
	
	// Basic CAS
	swapped := atomic.CompareAndSwapInt64(&value, 100, 200)
	fmt.Printf("CAS 100->200: success=%v, value=%d\n", swapped, atomic.LoadInt64(&value))
	
	// Failed CAS (wrong expected value)
	swapped = atomic.CompareAndSwapInt64(&value, 100, 300)
	fmt.Printf("CAS 100->300: success=%v, value=%d\n", swapped, atomic.LoadInt64(&value))
	
	// Increment with CAS (retry loop)
	incrementWithCAS := func(addr *int64, delta int64) {
		for {
			old := atomic.LoadInt64(addr)
			new := old + delta
			if atomic.CompareAndSwapInt64(addr, old, new) {
				return
			}
			// If CAS failed, loop and try again
		}
	}
	
	incrementWithCAS(&value, 50)
	fmt.Printf("After CAS increment by 50: %d\n", atomic.LoadInt64(&value))
}

// LockFreeCounter is a counter that can be incremented concurrently without locks
type LockFreeCounter struct {
	value uint64
}

// Increment atomically increments the counter and returns the new value
func (c *LockFreeCounter) Increment() uint64 {
	return atomic.AddUint64(&c.value, 1)
}

// Load atomically loads the counter value
func (c *LockFreeCounter) Load() uint64 {
	return atomic.LoadUint64(&c.value)
}

// LockBasedCounter is a counter that uses a mutex for synchronization
type LockBasedCounter struct {
	mu    sync.Mutex
	value uint64
}

// Increment increments the counter and returns the new value
func (c *LockBasedCounter) Increment() uint64 {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.value++
	return c.value
}

// Load returns the counter value
func (c *LockBasedCounter) Load() uint64 {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.value
}

func lockFreeCounter() {
	fmt.Println("\n=== Lock-Free vs. Lock-Based Counter Benchmark ===")
	
	// Create counters
	lockFree := &LockFreeCounter{}
	lockBased := &LockBasedCounter{}
	
	// Benchmark function
	benchmark := func(name string, counter interface{}, goroutines, iterations int) time.Duration {
		var wg sync.WaitGroup
		
		// Create a function that increments the appropriate counter
		var incrementFn func()
		
		switch c := counter.(type) {
		case *LockFreeCounter:
			incrementFn = func() { c.Increment() }
		case *LockBasedCounter:
			incrementFn = func() { c.Increment() }
		}
		
		// Start timing
		start := time.Now()
		
		// Launch goroutines
		for i := 0; i < goroutines; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()
				
				// Perform increments
				for j := 0; j < iterations; j++ {
					incrementFn()
				}
			}()
		}
		
		// Wait for all goroutines to complete
		wg.Wait()
		
		// Return elapsed time
		return time.Since(start)
	}
	
	// Run benchmarks
	goroutines := 4
	iterations := 1000000
	
	lockFreeTime := benchmark("Lock-Free", lockFree, goroutines, iterations)
	lockBasedTime := benchmark("Lock-Based", lockBased, goroutines, iterations)
	
	// Verify results
	fmt.Printf("Lock-Free Counter: %d\n", lockFree.Load())
	fmt.Printf("Lock-Based Counter: %d\n", lockBased.Load())
	
	// Compare performance
	fmt.Printf("Lock-Free Time: %v\n", lockFreeTime)
	fmt.Printf("Lock-Based Time: %v\n", lockBasedTime)
	fmt.Printf("Performance Ratio: %.2fx\n", float64(lockBasedTime)/float64(lockFreeTime))
}

func main() {
	atomicPatterns()
}

Key insights about atomic operations:

  1. Hardware Support: Atomic operations are implemented using special CPU instructions.
  2. No Locks Required: They provide synchronization without the overhead of locks.
  3. Limited Operations: Only simple operations like load, store, add, and compare-and-swap are available.
  4. Performance Advantage: Atomic operations are significantly faster than mutex-based synchronization for simple cases.

Lock-free programming using atomic operations can provide substantial performance benefits but requires careful design to ensure correctness.

Custom Synchronization Primitives

While Go’s standard library provides a solid foundation, some scenarios require specialized synchronization primitives. Let’s explore how to build custom primitives that address specific needs.

Countdown Latch

A countdown latch allows one or more goroutines to wait until a set of operations completes:

package main

import (
	"fmt"
	"sync"
	"time"
)

// CountdownLatch is a synchronization aid that allows one or more goroutines
// to wait until a set of operations being performed in other goroutines completes.
type CountdownLatch struct {
	count int
	mu    sync.Mutex
	cond  *sync.Cond
}

// NewCountdownLatch creates a new countdown latch initialized with the given count.
func NewCountdownLatch(count int) *CountdownLatch {
	latch := &CountdownLatch{
		count: count,
	}
	latch.cond = sync.NewCond(&latch.mu)
	return latch
}

// CountDown decrements the count of the latch, releasing all waiting goroutines
// when the count reaches zero.
func (l *CountdownLatch) CountDown() {
	l.mu.Lock()
	defer l.mu.Unlock()
	
	if l.count <= 0 {
		return
	}
	
	l.count--
	if l.count == 0 {
		l.cond.Broadcast()
	}
}

// Await causes the current goroutine to wait until the latch has counted down to zero.
func (l *CountdownLatch) Await() {
	l.mu.Lock()
	defer l.mu.Unlock()
	
	for l.count > 0 {
		l.cond.Wait()
	}
}

// TryAwait causes the current goroutine to wait until the latch has counted down to zero
// or the specified timeout elapses.
func (l *CountdownLatch) TryAwait(timeout time.Duration) bool {
	deadline := time.Now().Add(timeout)
	
	l.mu.Lock()
	defer l.mu.Unlock()
	
	for l.count > 0 {
		if time.Now().After(deadline) {
			return false
		}
		
		// Set a short timeout for the condition variable wait
		waitTimer := time.NewTimer(50 * time.Millisecond)
		waitCh := make(chan struct{})
		
		go func() {
			l.cond.Wait()
			close(waitCh)
		}()
		
		l.mu.Unlock()
		select {
		case <-waitCh:
			// Condition was signaled
			waitTimer.Stop()
			l.mu.Lock()
		case <-waitTimer.C:
			// Timer expired, reacquire lock and check condition again
			l.mu.Lock()
		}
	}
	
	return true
}

// GetCount returns the current count.
func (l *CountdownLatch) GetCount() int {
	l.mu.Lock()
	defer l.mu.Unlock()
	return l.count
}

func demonstrateCountdownLatch() {
	fmt.Println("\n=== Countdown Latch ===")
	
	// Create a countdown latch with count 3
	latch := NewCountdownLatch(3)
	
	// Start a goroutine that waits on the latch
	go func() {
		fmt.Println("Waiter: Waiting for latch to reach zero")
		latch.Await()
		fmt.Println("Waiter: Latch reached zero, proceeding")
	}()
	
	// Start another goroutine that waits with timeout
	go func() {
		fmt.Println("Timeout Waiter: Waiting with 2 second timeout")
		if latch.TryAwait(2 * time.Second) {
			fmt.Println("Timeout Waiter: Latch reached zero within timeout")
		} else {
			fmt.Println("Timeout Waiter: Timeout expired before latch reached zero")
		}
	}()
	
	// Simulate work being completed
	for i := 1; i <= 3; i++ {
		time.Sleep(500 * time.Millisecond)
		fmt.Printf("Main: Counting down latch (%d remaining)\n", latch.GetCount()-1)
		latch.CountDown()
	}
	
	// Give waiters time to print their messages
	time.Sleep(100 * time.Millisecond)
}

func main() {
	demonstrateCountdownLatch()
}

Cyclic Barrier

A cyclic barrier enables a group of goroutines to wait for each other to reach a common execution point:

package main

import (
	"fmt"
	"sync"
	"time"
)

// CyclicBarrier is a synchronization aid that allows a set of goroutines to all
// wait for each other to reach a common barrier point.
type CyclicBarrier struct {
	parties       int
	count         int
	generation    int
	barrierAction func()
	mu            sync.Mutex
	cond          *sync.Cond
}

// NewCyclicBarrier creates a new cyclic barrier that will trip when the given
// number of parties are waiting upon it.
func NewCyclicBarrier(parties int, barrierAction func()) *CyclicBarrier {
	barrier := &CyclicBarrier{
		parties:       parties,
		count:         parties,
		barrierAction: barrierAction,
	}
	barrier.cond = sync.NewCond(&barrier.mu)
	return barrier
}

// Await causes the current goroutine to wait until all parties have invoked await
// on this barrier. If the current goroutine is the last to arrive, the barrier action
// is executed and the barrier is reset.
func (b *CyclicBarrier) Await() int {
	b.mu.Lock()
	defer b.mu.Unlock()
	
	generation := b.generation
	
	// Decrement count and check if we're the last to arrive
	b.count--
	index := b.parties - b.count - 1
	
	if b.count == 0 {
		// We're the last to arrive
		if b.barrierAction != nil {
			// Execute the barrier action
			b.barrierAction()
		}
		
		// Reset the barrier
		b.count = b.parties
		b.generation++
		
		// Wake up all waiting goroutines
		b.cond.Broadcast()
		return index
	}
	
	// Wait until the barrier is tripped or reset
	for generation == b.generation {
		b.cond.Wait()
	}
	
	return index
}

// Reset resets the barrier to its initial state.
func (b *CyclicBarrier) Reset() {
	b.mu.Lock()
	defer b.mu.Unlock()
	
	b.count = b.parties
	b.generation++
	b.cond.Broadcast()
}

// GetNumberWaiting returns the number of parties currently waiting at the barrier.
func (b *CyclicBarrier) GetNumberWaiting() int {
	b.mu.Lock()
	defer b.mu.Unlock()
	return b.parties - b.count
}

func demonstrateCyclicBarrier() {
	fmt.Println("\n=== Cyclic Barrier ===")
	
	// Number of workers
	numWorkers := 3
	
	// Create a barrier action
	barrierAction := func() {
		fmt.Println("Barrier Action: All workers reached the barrier, executing barrier action")
	}
	
	// Create a cyclic barrier
	barrier := NewCyclicBarrier(numWorkers, barrierAction)
	
	// Create a wait group to wait for all workers to complete
	var wg sync.WaitGroup
	
	// Launch workers
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for phase := 1; phase <= 3; phase++ {
				// Simulate work before reaching the barrier
				workTime := time.Duration(id*100) * time.Millisecond
				fmt.Printf("Worker %d: Working on phase %d for %v\n", id, phase, workTime)
				time.Sleep(workTime)
				
				fmt.Printf("Worker %d: Reached barrier for phase %d\n", id, phase)
				index := barrier.Await()
				fmt.Printf("Worker %d: Crossed barrier for phase %d (arrival index: %d)\n", id, phase, index)
				
				// Small pause between phases
				time.Sleep(50 * time.Millisecond)
			}
		}(i)
	}
	
	wg.Wait()
	fmt.Println("All workers completed all phases")
}

func main() {
	demonstrateCyclicBarrier()
}

These custom synchronization primitives demonstrate how Go’s basic primitives can be combined to create more specialized tools. Each addresses specific coordination patterns that aren’t directly provided by the standard library.

Performance Optimization Techniques

Effective synchronization isn’t just about correctness—it’s also about performance. Let’s explore techniques to optimize synchronization in Go applications.

Lock Granularity

The granularity of locks significantly impacts performance:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// CoarseGrainedMap uses a single lock for the entire map
type CoarseGrainedMap struct {
	mu   sync.RWMutex
	data map[string]int
}

// NewCoarseGrainedMap creates a new coarse-grained map
func NewCoarseGrainedMap() *CoarseGrainedMap {
	return &CoarseGrainedMap{
		data: make(map[string]int),
	}
}

// Get retrieves a value from the map
func (m *CoarseGrainedMap) Get(key string) (int, bool) {
	m.mu.RLock()
	defer m.mu.RUnlock()
	val, ok := m.data[key]
	return val, ok
}

// Set stores a value in the map
func (m *CoarseGrainedMap) Set(key string, value int) {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.data[key] = value
}

// FineGrainedMap uses a separate lock for each shard
type FineGrainedMap struct {
	shards    []*mapShard
	shardMask int
}

type mapShard struct {
	mu   sync.RWMutex
	data map[string]int
}

// NewFineGrainedMap creates a new fine-grained map with the specified number of shards
func NewFineGrainedMap(numShards int) *FineGrainedMap {
	// Round up to power of 2
	shardCount := 1
	for shardCount < numShards {
		shardCount *= 2
	}
	
	m := &FineGrainedMap{
		shards:    make([]*mapShard, shardCount),
		shardMask: shardCount - 1,
	}
	
	for i := 0; i < shardCount; i++ {
		m.shards[i] = &mapShard{
			data: make(map[string]int),
		}
	}
	
	return m
}

// getShard returns the shard for the given key
func (m *FineGrainedMap) getShard(key string) *mapShard {
	// Simple hash function
	hash := 0
	for i := 0; i < len(key); i++ {
		hash = 31*hash + int(key[i])
	}
	return m.shards[hash&m.shardMask]
}

// Get retrieves a value from the map
func (m *FineGrainedMap) Get(key string) (int, bool) {
	shard := m.getShard(key)
	shard.mu.RLock()
	defer shard.mu.RUnlock()
	val, ok := shard.data[key]
	return val, ok
}

// Set stores a value in the map
func (m *FineGrainedMap) Set(key string, value int) {
	shard := m.getShard(key)
	shard.mu.Lock()
	defer shard.mu.Unlock()
	shard.data[key] = value
}

func benchmarkMaps() {
	fmt.Println("\n=== Lock Granularity Benchmark ===")
	
	// Create maps
	coarse := NewCoarseGrainedMap()
	fine := NewFineGrainedMap(16)
	
	// Prepare keys
	const keyCount = 1000
	keys := make([]string, keyCount)
	for i := 0; i < keyCount; i++ {
		keys[i] = fmt.Sprintf("key%d", i)
	}
	
	// Benchmark function
	benchmark := func(name string, m interface{}, readPct int, goroutines int) time.Duration {
		var wg sync.WaitGroup
		var ops int64
		
		// Create getter and setter functions
		var getFn func(string) (int, bool)
		var setFn func(string, int)
		
		switch mp := m.(type) {
		case *CoarseGrainedMap:
			getFn = mp.Get
			setFn = mp.Set
		case *FineGrainedMap:
			getFn = mp.Get
			setFn = mp.Set
		}
		
		// Start timing
		start := time.Now()
		
		// Launch goroutines
		for i := 0; i < goroutines; i++ {
			wg.Add(1)
			go func(id int) {
				defer wg.Done()
				
				// Local random number generator
				seed := int64(id)
				
				// Perform operations
				for j := 0; j < 10000; j++ {
					// Determine operation type
					seed = (seed*1103515245 + 12345) & 0x7fffffff
					isRead := int(seed%100) < readPct
					
					// Select a key
					seed = (seed*1103515245 + 12345) & 0x7fffffff
					key := keys[seed%keyCount]
					
					if isRead {
						// Read operation
						_, _ = getFn(key)
					} else {
						// Write operation
						setFn(key, int(seed))
					}
					
					atomic.AddInt64(&ops, 1)
				}
			}(i)
		}
		
		// Wait for all goroutines to complete
		wg.Wait()
		
		// Return elapsed time
		elapsed := time.Since(start)
		fmt.Printf("%s: %v for %d operations (%.2f ops/ms)\n",
			name, elapsed, ops, float64(ops)/(float64(elapsed)/float64(time.Millisecond)))
		
		return elapsed
	}
	
	// Run benchmarks with different read/write ratios
	fmt.Println("Read-heavy workload (95% reads):")
	coarseTime := benchmark("Coarse-grained", coarse, 95, 8)
	fineTime := benchmark("Fine-grained", fine, 95, 8)
	fmt.Printf("Improvement: %.2fx\n", float64(coarseTime)/float64(fineTime))
	
	fmt.Println("\nBalanced workload (50% reads):")
	coarseTime = benchmark("Coarse-grained", coarse, 50, 8)
	fineTime = benchmark("Fine-grained", fine, 50, 8)
	fmt.Printf("Improvement: %.2fx\n", float64(coarseTime)/float64(fineTime))
}

func main() {
	benchmarkMaps()
}

Key insights about lock granularity:

  1. Coarse-grained Locking: Uses fewer locks but causes more contention.
  2. Fine-grained Locking: Uses more locks but reduces contention.
  3. Sharding: Divides data into independent partitions, each with its own lock.
  4. Performance Impact: The optimal granularity depends on the access pattern and concurrency level.

Lock Contention Profiling

Identifying lock contention is crucial for optimizing synchronization:

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

// MutexWithMetrics adds instrumentation to track lock contention
type MutexWithMetrics struct {
	mu            sync.Mutex
	name          string
	acquisitions  int64
	contentions   int64
	totalWaitTime time.Duration
	maxWaitTime   time.Duration
}

// NewMutexWithMetrics creates a new instrumented mutex
func NewMutexWithMetrics(name string) *MutexWithMetrics {
	return &MutexWithMetrics{
		name: name,
	}
}

// Lock acquires the mutex, tracking contention metrics
func (m *MutexWithMetrics) Lock() {
	start := time.Now()
	
	// Try to acquire the lock without blocking
	if m.mu.TryLock() {
		// Acquired without contention
		m.acquisitions++
		return
	}
	
	// Contention occurred, acquire with blocking
	m.contentions++
	m.mu.Lock()
	
	// Calculate wait time
	waitTime := time.Since(start)
	m.totalWaitTime += waitTime
	if waitTime > m.maxWaitTime {
		m.maxWaitTime = waitTime
	}
	
	m.acquisitions++
}

// Unlock releases the mutex
func (m *MutexWithMetrics) Unlock() {
	m.mu.Unlock()
}

// ReportMetrics prints contention statistics
func (m *MutexWithMetrics) ReportMetrics() {
	fmt.Printf("=== Mutex Metrics: %s ===\n", m.name)
	fmt.Printf("Total acquisitions: %d\n", m.acquisitions)
	fmt.Printf("Contentions: %d (%.2f%%)\n", m.contentions,
		float64(m.contentions)*100/float64(m.acquisitions))
	
	if m.contentions > 0 {
		fmt.Printf("Average wait time: %v\n", m.totalWaitTime/time.Duration(m.contentions))
		fmt.Printf("Maximum wait time: %v\n", m.maxWaitTime)
	}
}

func demonstrateMutexMetrics() {
	fmt.Println("\n=== Lock Contention Profiling ===")
	
	// Create instrumented mutexes
	highContentionMutex := NewMutexWithMetrics("High Contention")
	lowContentionMutex := NewMutexWithMetrics("Low Contention")
	
	// Function that simulates work with the mutex
	simulateWork := func(m *MutexWithMetrics, holdTime time.Duration, iterations int) {
		for i := 0; i < iterations; i++ {
			m.Lock()
			time.Sleep(holdTime)
			m.Unlock()
			
			// Pause between acquisitions
			time.Sleep(1 * time.Millisecond)
		}
	}
	
	// Launch goroutines with high contention
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			simulateWork(highContentionMutex, 5*time.Millisecond, 20)
		}()
	}
	
	// Launch goroutines with low contention
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			simulateWork(lowContentionMutex, 1*time.Millisecond, 20)
		}()
	}
	
	wg.Wait()
	
	// Report metrics
	highContentionMutex.ReportMetrics()
	lowContentionMutex.ReportMetrics()
}

func main() {
	demonstrateMutexMetrics()
}

Lock-Free Techniques

For some data structures, lock-free approaches can provide significant performance benefits:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// LockFreeQueue is a simple lock-free queue implementation
type LockFreeQueue struct {
	head  atomic.Pointer[node]
	tail  atomic.Pointer[node]
	count int64
}

type node struct {
	value interface{}
	next  atomic.Pointer[node]
}

// NewLockFreeQueue creates a new lock-free queue
func NewLockFreeQueue() *LockFreeQueue {
	q := &LockFreeQueue{}
	
	// Create sentinel node
	sentinel := &node{}
	q.head.Store(sentinel)
	q.tail.Store(sentinel)
	
	return q
}

// Enqueue adds an item to the queue
func (q *LockFreeQueue) Enqueue(value interface{}) {
	// Create new node
	newNode := &node{value: value}
	
	for {
		// Get current tail
		tail := q.tail.Load()
		next := tail.next.Load()
		
		// Check if tail is still valid
		if tail != q.tail.Load() {
			continue
		}
		
		// If tail.next is not nil, tail is falling behind
		if next != nil {
			// Try to advance tail
			q.tail.CompareAndSwap(tail, next)
			continue
		}
		
		// Try to link the new node
		if tail.next.CompareAndSwap(nil, newNode) {
			// Success, try to advance tail
			q.tail.CompareAndSwap(tail, newNode)
			atomic.AddInt64(&q.count, 1)
			return
		}
	}
}

// Dequeue removes and returns an item from the queue
func (q *LockFreeQueue) Dequeue() (interface{}, bool) {
	for {
		// Get current head and tail
		head := q.head.Load()
		tail := q.tail.Load()
		next := head.next.Load()
		
		// Check if head is still valid
		if head != q.head.Load() {
			continue
		}
		
		// If queue is empty
		if head == tail {
			if next == nil {
				return nil, false
			}
			
			// Tail is falling behind, try to advance it
			q.tail.CompareAndSwap(tail, next)
			continue
		}
		
		// Queue is not empty, try to dequeue
		if q.head.CompareAndSwap(head, next) {
			value := next.value
			atomic.AddInt64(&q.count, -1)
			return value, true
		}
	}
}

// Size returns the approximate size of the queue
func (q *LockFreeQueue) Size() int {
	return int(atomic.LoadInt64(&q.count))
}

// LockBasedQueue is a simple lock-based queue implementation
type LockBasedQueue struct {
	mu    sync.Mutex
	head  *node
	tail  *node
	count int
}

// NewLockBasedQueue creates a new lock-based queue
func NewLockBasedQueue() *LockBasedQueue {
	sentinel := &node{}
	return &LockBasedQueue{
		head: sentinel,
		tail: sentinel,
	}
}

// Enqueue adds an item to the queue
func (q *LockBasedQueue) Enqueue(value interface{}) {
	q.mu.Lock()
	defer q.mu.Unlock()
	
	newNode := &node{value: value}
	q.tail.next.Store(newNode)
	q.tail = newNode
	q.count++
}

// Dequeue removes and returns an item from the queue
func (q *LockBasedQueue) Dequeue() (interface{}, bool) {
	q.mu.Lock()
	defer q.mu.Unlock()
	
	if q.head == q.tail {
		return nil, false
	}
	
	next := q.head.next.Load()
	q.head = next
	q.count--
	
	return next.value, true
}

// Size returns the size of the queue
func (q *LockBasedQueue) Size() int {
	q.mu.Lock()
	defer q.mu.Unlock()
	return q.count
}

func benchmarkQueues() {
	fmt.Println("\n=== Lock-Free vs. Lock-Based Queue Benchmark ===")
	
	// Create queues
	lockFree := NewLockFreeQueue()
	lockBased := NewLockBasedQueue()
	
	// Benchmark function
	benchmark := func(name string, enqueue, dequeue func(int) bool, iterations, goroutines int) time.Duration {
		var wg sync.WaitGroup
		
		// Start timing
		start := time.Now()
		
		// Launch producer goroutines
		for i := 0; i < goroutines/2; i++ {
			wg.Add(1)
			go func(id int) {
				defer wg.Done()
				
				// Produce items
				for j := 0; j < iterations; j++ {
					enqueue(id*iterations + j)
				}
			}(i)
		}
		
		// Launch consumer goroutines
		for i := 0; i < goroutines/2; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()
				
				// Consume items
				for j := 0; j < iterations; j++ {
					for !dequeue(j) {
						// Retry if queue is empty
						runtime.Gosched()
					}
				}
			}()
		}
		
		// Wait for all goroutines to complete
		wg.Wait()
		
		// Return elapsed time
		return time.Since(start)
	}
	
	// Run benchmarks
	iterations := 10000
	goroutines := 8
	
	lockFreeTime := benchmark("Lock-Free Queue",
		func(i int) bool { lockFree.Enqueue(i); return true },
		func(i int) bool { _, ok := lockFree.Dequeue(); return ok },
		iterations, goroutines)
	
	lockBasedTime := benchmark("Lock-Based Queue",
		func(i int) bool { lockBased.Enqueue(i); return true },
		func(i int) bool { _, ok := lockBased.Dequeue(); return ok },
		iterations, goroutines)
	
	// Compare performance
	fmt.Printf("Lock-Free Queue Time: %v\n", lockFreeTime)
	fmt.Printf("Lock-Based Queue Time: %v\n", lockBasedTime)
	fmt.Printf("Performance Ratio: %.2fx\n", float64(lockBasedTime)/float64(lockFreeTime))
}

func main() {
	benchmarkQueues()
}

Debugging and Monitoring Synchronization

Effective debugging and monitoring are essential for maintaining reliable concurrent systems.

Race Detection

Go’s race detector is a powerful tool for identifying data races:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

// This function contains a data race
func demonstrateRace() {
	fmt.Println("\n=== Data Race Example ===")
	
	// Shared counter
	counter := 0
	
	// Launch goroutines that increment the counter
	var wg sync.WaitGroup
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			counter++ // Race condition: read-modify-write without synchronization
		}()
	}
	
	wg.Wait()
	fmt.Printf("Final counter value: %d (expected 1000)\n", counter)
}

// This function fixes the race with a mutex
func demonstrateRaceFixed() {
	fmt.Println("\n=== Data Race Fixed with Mutex ===")
	
	// Shared counter with mutex
	var counter int
	var mu sync.Mutex
	
	// Launch goroutines that increment the counter
	var wg sync.WaitGroup
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			mu.Lock()
			counter++ // Protected by mutex
			mu.Unlock()
		}()
	}
	
	wg.Wait()
	fmt.Printf("Final counter value: %d (expected 1000)\n", counter)
}

// This function fixes the race with atomic operations
func demonstrateRaceFixedAtomic() {
	fmt.Println("\n=== Data Race Fixed with Atomic Operations ===")
	
	// Shared counter using atomic operations
	var counter int64
	
	// Launch goroutines that increment the counter
	var wg sync.WaitGroup
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			atomic.AddInt64(&counter, 1) // Atomic increment
		}()
	}
	
	wg.Wait()
	fmt.Printf("Final counter value: %d (expected 1000)\n", counter)
}

func main() {
	// To detect races, run with: go run -race main.go
	demonstrateRace()
	demonstrateRaceFixed()
	demonstrateRaceFixedAtomic()
}

To use the race detector, compile or run your program with the -race flag:

go run -race main.go
go test -race ./...
go build -race main.go

Deadlock Detection

Go provides built-in deadlock detection for goroutines blocked in channel operations, but not for mutex-based deadlocks. Here’s a simple deadlock detector:

package main

import (
	"fmt"
	"sync"
	"time"
)

// DeadlockDetector monitors mutex acquisitions to detect potential deadlocks
type DeadlockDetector struct {
	mu         sync.Mutex
	goroutines map[int64][]string // goroutine ID -> held locks
	locks      map[string]int64   // lock name -> goroutine ID holding it
}

// NewDeadlockDetector creates a new deadlock detector
func NewDeadlockDetector() *DeadlockDetector {
	return &DeadlockDetector{
		goroutines: make(map[int64][]string),
		locks:      make(map[string]int64),
	}
}

// BeforeLock is called before acquiring a lock
func (d *DeadlockDetector) BeforeLock(goroutineID int64, lockName string) {
	d.mu.Lock()
	defer d.mu.Unlock()
	
	// Check if this lock is held by another goroutine
	if holderID, exists := d.locks[lockName]; exists {
		// Check what locks the holder has
		holderLocks := d.goroutines[holderID]
		
		// Check if the holder is waiting for any locks held by this goroutine
		for _, heldLock := range d.goroutines[goroutineID] {
			if holderWaiting, exists := d.locks[heldLock]; exists && holderWaiting == holderID {
				fmt.Printf("POTENTIAL DEADLOCK DETECTED:\n")
				fmt.Printf("  Goroutine %d holds %v and wants %s\n", goroutineID, d.goroutines[goroutineID], lockName)
				fmt.Printf("  Goroutine %d holds %s and wants %v\n", holderID, lockName, holderLocks)
				return
			}
		}
	}
}

// AfterLock is called after acquiring a lock
func (d *DeadlockDetector) AfterLock(goroutineID int64, lockName string) {
	d.mu.Lock()
	defer d.mu.Unlock()
	
	// Record that this goroutine holds this lock
	d.goroutines[goroutineID] = append(d.goroutines[goroutineID], lockName)
	d.locks[lockName] = goroutineID
}

// BeforeUnlock is called before releasing a lock
func (d *DeadlockDetector) BeforeUnlock(goroutineID int64, lockName string) {
	d.mu.Lock()
	defer d.mu.Unlock()
	
	// Remove this lock from the goroutine's held locks
	locks := d.goroutines[goroutineID]
	for i, l := range locks {
		if l == lockName {
			d.goroutines[goroutineID] = append(locks[:i], locks[i+1:]...)
			break
		}
	}
	
	// Remove this lock from the locks map
	delete(d.locks, lockName)
}

// InstrumentedMutex is a mutex with deadlock detection
type InstrumentedMutex struct {
	mu       sync.Mutex
	name     string
	detector *DeadlockDetector
}

// NewInstrumentedMutex creates a new instrumented mutex
func NewInstrumentedMutex(name string, detector *DeadlockDetector) *InstrumentedMutex {
	return &InstrumentedMutex{
		name:     name,
		detector: detector,
	}
}

// Lock acquires the mutex
func (m *InstrumentedMutex) Lock() {
	goroutineID := 123 // In a real implementation, get the actual goroutine ID
	
	m.detector.BeforeLock(goroutineID, m.name)
	m.mu.Lock()
	m.detector.AfterLock(goroutineID, m.name)
}

// Unlock releases the mutex
func (m *InstrumentedMutex) Unlock() {
	goroutineID := 123 // In a real implementation, get the actual goroutine ID
	
	m.detector.BeforeUnlock(goroutineID, m.name)
	m.mu.Unlock()
}

func demonstrateDeadlockDetection() {
	fmt.Println("\n=== Deadlock Detection ===")
	
	// Create a deadlock detector
	detector := NewDeadlockDetector()
	
	// Create instrumented mutexes
	mutex1 := NewInstrumentedMutex("mutex1", detector)
	mutex2 := NewInstrumentedMutex("mutex2", detector)
	
	// Simulate a potential deadlock scenario
	go func() {
		goroutineID := int64(1)
		
		fmt.Println("Goroutine 1: Acquiring mutex1")
		detector.BeforeLock(goroutineID, "mutex1")
		// mutex1.Lock() - simulated
		detector.AfterLock(goroutineID, "mutex1")
		
		time.Sleep(100 * time.Millisecond)
		
		fmt.Println("Goroutine 1: Acquiring mutex2")
		detector.BeforeLock(goroutineID, "mutex2")
		// This would block in a real deadlock
	}()
	
	go func() {
		goroutineID := int64(2)
		
		fmt.Println("Goroutine 2: Acquiring mutex2")
		detector.BeforeLock(goroutineID, "mutex2")
		// mutex2.Lock() - simulated
		detector.AfterLock(goroutineID, "mutex2")
		
		time.Sleep(100 * time.Millisecond)
		
		fmt.Println("Goroutine 2: Acquiring mutex1")
		detector.BeforeLock(goroutineID, "mutex1")
		// This would block in a real deadlock
	}()
	
	// Give time for the simulation to run
	time.Sleep(300 * time.Millisecond)
}

func main() {
	demonstrateDeadlockDetection()
}

Production Implementation Strategies

Implementing synchronization in production systems requires careful consideration of various factors.

Choosing the Right Primitive

Selecting the appropriate synchronization primitive depends on the specific requirements:

Primitive Use Case Advantages Disadvantages
Mutex Exclusive access to shared resources Simple, well-understood Can cause contention
RWMutex Read-heavy workloads Higher throughput for reads More complex, writer starvation
atomic Simple counters and flags Low overhead, no blocking Limited operations
WaitGroup Waiting for multiple goroutines Simple coordination One-time use pattern
Once One-time initialization Thread-safe singleton Limited to single execution
Cond Producer-consumer patterns Efficient signaling Complex usage patterns
Pool Object reuse Reduces GC pressure No size control
Map Concurrent map access Optimized for specific patterns Type assertions required
Channel Communication between goroutines Clear ownership semantics Can be less efficient

Balancing Performance and Simplicity

When implementing synchronization in production systems, consider these guidelines:

  1. Start Simple: Begin with the simplest approach that ensures correctness.
  2. Measure First: Profile your application to identify actual bottlenecks before optimizing.
  3. Consider Contention: High-contention scenarios may benefit from more sophisticated approaches.
  4. Readability Matters: Complex synchronization is error-prone; prioritize clarity.
  5. Test Thoroughly: Use the race detector and stress testing to verify correctness.

Scaling Considerations

As your application scales, synchronization strategies may need to evolve:

  1. Partitioning: Divide data and workloads to minimize contention.
  2. Locality: Keep related data and operations together to reduce synchronization needs.
  3. Batching: Process multiple items in a single critical section to amortize synchronization costs.
  4. Relaxed Consistency: Consider whether eventual consistency is acceptable for some operations.
  5. Lock-Free Techniques: For extreme performance requirements, consider lock-free algorithms.

Mastering the Synchronization Spectrum

Throughout this exploration of Go’s synchronization primitives, we’ve journeyed from basic mutual exclusion to sophisticated custom synchronization tools. We’ve examined the internal workings of core primitives, explored advanced patterns that combine multiple mechanisms, and analyzed performance optimization techniques.

The key to effective synchronization in Go isn’t just knowing which primitive to use, but understanding the trade-offs each one presents. By mastering these primitives and patterns, you can build concurrent systems that are both correct and efficient, striking the right balance between safety and performance.

As you apply these techniques in your own applications, remember that synchronization is both an art and a science. The science lies in understanding the guarantees and limitations of each primitive; the art lies in designing systems that minimize contention while maintaining correctness. With the knowledge gained from this guide, you’re well-equipped to navigate the complexities of concurrent programming in Go and build systems that leverage the full power of modern hardware.