Go Sync Package Mastery: Synchronization Primitives
Master Go’s synchronization primitives including mutexes.
#Go’s sync package contains the building blocks for safe concurrent programming. When multiple goroutines access shared data, you need synchronization to prevent race conditions and data corruption.
The sync package provides several key primitives:
- Mutex: Mutual exclusion locks for protecting shared resources
- RWMutex: Reader-writer locks for read-heavy workloads
- WaitGroup: Waiting for a collection of goroutines to finish
- Once: Ensuring a function runs exactly once
- Cond: Condition variables for complex coordination
- Pool: Object pools for reducing garbage collection pressure
When to Use Sync Primitives
Go’s philosophy is “Don’t communicate by sharing memory; share memory by communicating.” Channels are often the right choice for goroutine coordination. However, sync primitives are essential when:
- Protecting shared state that multiple goroutines access
- Implementing low-level concurrent data structures
- Optimizing performance-critical code paths
- Building higher-level synchronization abstractions
Common Pitfalls
Synchronization bugs are notoriously difficult to debug. Common mistakes include:
- Deadlocks: Goroutines waiting for each other indefinitely
- Race conditions: Unsynchronized access to shared data
- Lock contention: Too many goroutines competing for the same lock
- Forgetting to unlock: Causing permanent blocking
This guide covers each primitive in detail, with practical examples and guidance on when to use each approach.
Core Sync Package Primitives
The sync package provides several fundamental primitives that form the building blocks for concurrent programming in Go. Understanding their implementation details and behavior characteristics is essential for effective usage.
Mutex: Beyond Simple Locking
At its core, sync.Mutex
provides mutual exclusion, ensuring that only one goroutine can access a protected resource at a time. However, its implementation and behavior have nuances worth exploring:
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
func mutexInternals() {
// Create a mutex
var mu sync.Mutex
// Counter to track goroutines that have acquired the lock
var counter int32
// Function that acquires the lock, increments counter, waits, then decrements
acquireLock := func(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Goroutine %d: Attempting to acquire lock\n", id)
mu.Lock()
// Atomically increment the counter
newCount := atomic.AddInt32(&counter, 1)
fmt.Printf("Goroutine %d: Acquired lock, active holders: %d\n", id, newCount)
// This should always be 1 if mutex is working correctly
if newCount != 1 {
panic(fmt.Sprintf("Mutex failure: %d goroutines holding lock", newCount))
}
// Simulate work while holding the lock
time.Sleep(50 * time.Millisecond)
// Atomically decrement the counter
atomic.AddInt32(&counter, -1)
fmt.Printf("Goroutine %d: Releasing lock\n", id)
mu.Unlock()
}
// Launch multiple goroutines to compete for the lock
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go acquireLock(i, &wg)
}
wg.Wait()
fmt.Println("All goroutines completed successfully")
// Demonstrate mutex starvation scenario
demonstrateMutexStarvation()
}
func demonstrateMutexStarvation() {
var mu sync.Mutex
// Track how many times each goroutine acquires the lock
acquisitions := make([]int, 2)
// Done channel to signal completion
done := make(chan bool)
// Launch two goroutines with different behaviors
// Goroutine 1: Acquires lock briefly and releases quickly
go func() {
for i := 0; i < 1000 && !isChannelClosed(done); i++ {
mu.Lock()
acquisitions[0]++
mu.Unlock()
// Give other goroutines a chance
runtime.Gosched()
}
done <- true
}()
// Goroutine 2: Holds lock for longer periods
go func() {
for i := 0; i < 100 && !isChannelClosed(done); i++ {
mu.Lock()
acquisitions[1]++
// Hold the lock longer
time.Sleep(1 * time.Millisecond)
mu.Unlock()
// Give other goroutines a chance
runtime.Gosched()
}
done <- true
}()
// Let them run for a bit
time.Sleep(100 * time.Millisecond)
close(done)
// Give goroutines time to finish
time.Sleep(10 * time.Millisecond)
fmt.Printf("Lock acquisition distribution:\n")
fmt.Printf("- Fast goroutine: %d acquisitions\n", acquisitions[0])
fmt.Printf("- Slow goroutine: %d acquisitions\n", acquisitions[1])
fmt.Printf("- Ratio (fast/slow): %.2f\n", float64(acquisitions[0])/float64(acquisitions[1]))
}
// Helper function to check if a channel is closed
func isChannelClosed(ch chan bool) bool {
select {
case <-ch:
return true
default:
return false
}
}
func main() {
mutexInternals()
}
This example demonstrates several important aspects of mutex behavior:
- Mutual Exclusion Guarantee: The atomic counter verifies that only one goroutine holds the lock at any time.
- FIFO vs. Fairness: Go’s mutex doesn’t guarantee first-in-first-out ordering for waiting goroutines.
- Starvation Potential: The second part demonstrates how goroutines that hold locks briefly can starve those that need them for longer periods.
Under the hood, Go’s mutex uses a combination of atomic operations and goroutine queuing. It employs a “barging” model where a releasing goroutine may immediately reacquire the lock if it attempts to do so before other waiting goroutines are scheduled, which can lead to starvation in certain scenarios.
RWMutex: Optimizing for Read-Heavy Workloads
The sync.RWMutex
extends the basic mutex concept by distinguishing between read and write operations, allowing multiple readers to access the protected resource simultaneously:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func rwMutexDemo() {
// Create a RWMutex
var rwMu sync.RWMutex
// Counters to track concurrent readers and writers
var readers int32
var writers int32
// Function for reader goroutines
reader := func(id int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 3; i++ {
// Acquire read lock
fmt.Printf("Reader %d: Attempting to acquire read lock\n", id)
rwMu.RLock()
// Increment reader count and check for writers
currentReaders := atomic.AddInt32(&readers, 1)
currentWriters := atomic.LoadInt32(&writers)
fmt.Printf("Reader %d: Acquired read lock, concurrent readers: %d, writers: %d\n",
id, currentReaders, currentWriters)
// Verify no writers are active
if currentWriters > 0 {
panic(fmt.Sprintf("RWMutex failure: %d writers active during read lock", currentWriters))
}
// Simulate reading
time.Sleep(50 * time.Millisecond)
// Decrement reader count and release lock
atomic.AddInt32(&readers, -1)
rwMu.RUnlock()
fmt.Printf("Reader %d: Released read lock\n", id)
// Small pause between operations
time.Sleep(10 * time.Millisecond)
}
}
// Function for writer goroutines
writer := func(id int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 2; i++ {
// Acquire write lock
fmt.Printf("Writer %d: Attempting to acquire write lock\n", id)
rwMu.Lock()
// Increment writer count and check for other writers and readers
currentWriters := atomic.AddInt32(&writers, 1)
currentReaders := atomic.LoadInt32(&readers)
fmt.Printf("Writer %d: Acquired write lock, concurrent writers: %d, readers: %d\n",
id, currentWriters, currentReaders)
// Verify no other writers or readers are active
if currentWriters != 1 {
panic(fmt.Sprintf("RWMutex failure: %d writers active concurrently", currentWriters))
}
if currentReaders > 0 {
panic(fmt.Sprintf("RWMutex failure: %d readers active during write lock", currentReaders))
}
// Simulate writing
time.Sleep(100 * time.Millisecond)
// Decrement writer count and release lock
atomic.AddInt32(&writers, -1)
rwMu.Unlock()
fmt.Printf("Writer %d: Released write lock\n", id)
// Small pause between operations
time.Sleep(20 * time.Millisecond)
}
}
// Launch multiple readers and writers
var wg sync.WaitGroup
// Start readers
for i := 1; i <= 5; i++ {
wg.Add(1)
go reader(i, &wg)
}
// Start writers
for i := 1; i <= 2; i++ {
wg.Add(1)
go writer(i, &wg)
}
wg.Wait()
fmt.Println("All reader and writer goroutines completed successfully")
// Demonstrate writer preference
demonstrateWriterPreference()
}
func demonstrateWriterPreference() {
var rwMu sync.RWMutex
// Signal channels
readerReady := make(chan bool)
writerReady := make(chan bool)
done := make(chan bool)
// Start a long-running reader
go func() {
rwMu.RLock()
readerReady <- true
// Hold read lock until done
<-done
rwMu.RUnlock()
}()
// Wait for reader to acquire lock
<-readerReady
// Start a writer that will block
go func() {
writerReady <- true
// This will block until reader releases lock
rwMu.Lock()
rwMu.Unlock()
done <- true
}()
// Wait for writer to be ready
<-writerReady
// Try to acquire another read lock after writer is waiting
// This demonstrates writer preference - new readers will block
// if there's a writer waiting
readBlocked := make(chan bool)
go func() {
// Small delay to ensure writer is waiting
time.Sleep(10 * time.Millisecond)
fmt.Println("New reader attempting to acquire lock while writer is waiting...")
// This should block because writer is waiting
rwMu.RLock()
rwMu.RUnlock()
readBlocked <- true
}()
// Let the system run for a bit
select {
case <-readBlocked:
fmt.Println("UNEXPECTED: New reader acquired lock while writer was waiting")
case <-time.After(50 * time.Millisecond):
fmt.Println("EXPECTED: New reader blocked because writer is waiting (writer preference)")
}
// Signal the original reader to release the lock
done <- true
// Wait for everything to complete
<-done
<-readBlocked
}
func main() {
rwMutexDemo()
}
Key insights about RWMutex:
- Reader Concurrency: Multiple goroutines can hold read locks simultaneously, as verified by the concurrent reader count.
- Writer Exclusivity: Write locks exclude both readers and other writers.
- Writer Preference: When both readers and writers are waiting, writers are generally given preference to prevent writer starvation in read-heavy workloads.
The RWMutex implementation uses a clever combination of a mutex and semaphore to track readers and coordinate with writers. This makes it significantly more complex than a basic mutex but enables higher throughput in read-heavy scenarios.
WaitGroup: Coordinating Task Completion
The sync.WaitGroup
provides a mechanism to wait for a collection of goroutines to finish their work:
package main
import (
"fmt"
"sync"
"time"
)
func waitGroupPatterns() {
// Basic WaitGroup usage
basicWaitGroup()
// Dynamic task generation
dynamicWaitGroup()
// Hierarchical WaitGroups
hierarchicalWaitGroups()
}
func basicWaitGroup() {
fmt.Println("\n=== Basic WaitGroup ===")
var wg sync.WaitGroup
// Launch multiple workers
for i := 1; i <= 3; i++ {
// Increment counter before launching goroutine
wg.Add(1)
go func(id int) {
// Ensure counter is decremented when goroutine completes
defer wg.Done()
fmt.Printf("Worker %d: Starting\n", id)
time.Sleep(time.Duration(id) * 100 * time.Millisecond)
fmt.Printf("Worker %d: Completed\n", id)
}(i)
}
// Wait for all workers to complete
fmt.Println("Main: Waiting for workers to complete")
wg.Wait()
fmt.Println("Main: All workers completed")
}
func dynamicWaitGroup() {
fmt.Println("\n=== Dynamic WaitGroup ===")
var wg sync.WaitGroup
// Channel for dynamic task generation
tasks := make(chan int, 10)
// Launch worker pool
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
fmt.Printf("Worker %d: Started, processing tasks\n", workerID)
// Process tasks until channel is closed
for taskID := range tasks {
fmt.Printf("Worker %d: Processing task %d\n", workerID, taskID)
time.Sleep(50 * time.Millisecond)
fmt.Printf("Worker %d: Completed task %d\n", workerID, taskID)
}
fmt.Printf("Worker %d: Shutting down\n", workerID)
}(i)
}
// Generate tasks
fmt.Println("Main: Generating tasks")
for i := 1; i <= 6; i++ {
tasks <- i
}
// Signal no more tasks
fmt.Println("Main: Closing task channel")
close(tasks)
// Wait for all workers to complete
fmt.Println("Main: Waiting for workers to complete")
wg.Wait()
fmt.Println("Main: All workers completed")
}
func hierarchicalWaitGroups() {
fmt.Println("\n=== Hierarchical WaitGroups ===")
var parentWg sync.WaitGroup
// Launch parent tasks
for i := 1; i <= 2; i++ {
parentWg.Add(1)
go func(parentID int) {
defer parentWg.Done()
fmt.Printf("Parent %d: Starting\n", parentID)
// Each parent launches multiple child tasks
var childWg sync.WaitGroup
for j := 1; j <= 3; j++ {
childWg.Add(1)
go func(parentID, childID int) {
defer childWg.Done()
fmt.Printf("Parent %d, Child %d: Starting\n", parentID, childID)
time.Sleep(50 * time.Millisecond)
fmt.Printf("Parent %d, Child %d: Completed\n", parentID, childID)
}(parentID, j)
}
// Wait for all children of this parent to complete
fmt.Printf("Parent %d: Waiting for children\n", parentID)
childWg.Wait()
fmt.Printf("Parent %d: All children completed\n", parentID)
}(i)
}
// Wait for all parent tasks to complete
fmt.Println("Main: Waiting for parent tasks to complete")
parentWg.Wait()
fmt.Println("Main: All parent tasks completed")
}
func main() {
waitGroupPatterns()
}
This example demonstrates several WaitGroup patterns:
- Basic Coordination: Using Add/Done/Wait to coordinate multiple goroutines.
- Dynamic Task Processing: Combining WaitGroup with channels for worker pools.
- Hierarchical Coordination: Nesting WaitGroups to coordinate multi-level task hierarchies.
Internally, WaitGroup uses atomic operations to maintain its counter, making it highly efficient for coordination across many goroutines. The implementation avoids mutex locks for the common Add/Done operations, only using synchronization when Wait is called.
Once: Ensuring Single Execution
The sync.Once
primitive guarantees that a function is executed exactly once, even when called from multiple goroutines:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func oncePatterns() {
// Basic Once usage
basicOnce()
// Once with error handling
onceWithErrorHandling()
// Multiple Once objects
multipleOnce()
}
func basicOnce() {
fmt.Println("\n=== Basic Once ===")
var once sync.Once
var counter int32
// Function to be executed once
initFunc := func() {
// Simulate initialization work
time.Sleep(100 * time.Millisecond)
atomic.AddInt32(&counter, 1)
fmt.Println("Initialization done")
}
// Launch multiple goroutines that all try to execute initFunc
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Goroutine %d: Calling initialization\n", id)
once.Do(initFunc)
fmt.Printf("Goroutine %d: Initialization call returned\n", id)
}(i)
}
wg.Wait()
fmt.Printf("Counter value: %d (should be 1)\n", atomic.LoadInt32(&counter))
}
func onceWithErrorHandling() {
fmt.Println("\n=== Once with Error Handling ===")
var once sync.Once
var initErr error
var initialized int32
// Function that might fail
initFunc := func() {
// Simulate initialization that might fail
fmt.Println("Performing initialization...")
time.Sleep(100 * time.Millisecond)
// Simulate a failure
// initErr = fmt.Errorf("initialization failed")
// If no error, mark as initialized
if initErr == nil {
atomic.StoreInt32(&initialized, 1)
fmt.Println("Initialization succeeded")
} else {
fmt.Printf("Initialization failed: %v\n", initErr)
}
}
// Wrapper function for initialization with error checking
initWithErrorCheck := func() error {
once.Do(initFunc)
return initErr
}
// Try initialization from multiple goroutines
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Goroutine %d: Attempting initialization\n", id)
err := initWithErrorCheck()
if err != nil {
fmt.Printf("Goroutine %d: Initialization error: %v\n", id, err)
} else {
fmt.Printf("Goroutine %d: Initialization successful\n", id)
}
}(i)
}
wg.Wait()
fmt.Printf("Final initialization state: %v\n", atomic.LoadInt32(&initialized) == 1)
}
func multipleOnce() {
fmt.Println("\n=== Multiple Once Objects ===")
// Create a map of Once objects for different resources
var mu sync.Mutex
resources := make(map[string]*sync.Once)
// Function to get or create a Once object for a resource
getResourceOnce := func(name string) *sync.Once {
mu.Lock()
defer mu.Unlock()
if once, exists := resources[name]; exists {
return once
}
once := new(sync.Once)
resources[name] = once
return once
}
// Function to initialize a resource
initResource := func(name string) {
fmt.Printf("Initializing resource: %s\n", name)
time.Sleep(50 * time.Millisecond)
fmt.Printf("Resource initialized: %s\n", name)
}
// Launch goroutines that initialize different resources
var wg sync.WaitGroup
resourceNames := []string{"db", "cache", "logger", "db", "cache"}
for i, name := range resourceNames {
wg.Add(1)
go func(id int, resourceName string) {
defer wg.Done()
fmt.Printf("Goroutine %d: Initializing %s\n", id, resourceName)
once := getResourceOnce(resourceName)
once.Do(func() { initResource(resourceName) })
fmt.Printf("Goroutine %d: %s initialization call returned\n", id, resourceName)
}(i, name)
}
wg.Wait()
// Check how many unique resources were initialized
mu.Lock()
fmt.Printf("Number of unique resources: %d\n", len(resources))
mu.Unlock()
}
func main() {
oncePatterns()
}
Key insights about Once:
- Guaranteed Single Execution: The function passed to Do() executes exactly once, regardless of how many goroutines call it.
- Blocking Behavior: All calls to Do() block until the first call completes.
- Error Handling: Once doesn’t provide built-in error handling, but patterns can be implemented to address this.
- Multiple Once Objects: Different initialization tasks require separate Once instances.
Internally, Once uses a combination of a mutex and an atomic flag to ensure the function is executed exactly once while allowing concurrent calls to proceed efficiently after initialization.
Cond: Coordinating Based on Conditions
The sync.Cond
primitive provides a way for goroutines to wait for a condition to become true:
package main
import (
"fmt"
"sync"
"time"
)
func condPatterns() {
// Basic condition variable usage
basicCond()
// Broadcast vs. Signal
broadcastVsSignal()
// Producer-consumer pattern
producerConsumer()
}
func basicCond() {
fmt.Println("\n=== Basic Condition Variable ===")
// Create a condition variable with its associated mutex
var mu sync.Mutex
cond := sync.NewCond(&mu)
// Shared state protected by the mutex
ready := false
// Launch a goroutine that will set the condition
go func() {
// Simulate some preparation work
time.Sleep(300 * time.Millisecond)
// Update the shared state and signal waiters
mu.Lock()
ready = true
fmt.Println("Signaler: Condition is now true")
cond.Signal()
mu.Unlock()
}()
// Wait for the condition to become true
fmt.Println("Waiter: Waiting for condition")
mu.Lock()
// While loop pattern to handle spurious wakeups
for !ready {
cond.Wait() // Atomically releases mutex and blocks until signaled
}
mu.Unlock()
fmt.Println("Waiter: Condition is true, proceeding")
}
func broadcastVsSignal() {
fmt.Println("\n=== Broadcast vs. Signal ===")
var mu sync.Mutex
cond := sync.NewCond(&mu)
// Shared state
ready := false
// Launch multiple waiters
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Waiter %d: Waiting for condition\n", id)
mu.Lock()
for !ready {
cond.Wait()
fmt.Printf("Waiter %d: Woke up, checking condition\n", id)
}
fmt.Printf("Waiter %d: Condition is true, proceeding\n", id)
mu.Unlock()
}(i)
}
// Give waiters time to start waiting
time.Sleep(100 * time.Millisecond)
// Signal vs. Broadcast demonstration
fmt.Println("\nDemonstrating Signal (wakes only one waiter):")
mu.Lock()
cond.Signal()
mu.Unlock()
// Give signaled waiter time to process
time.Sleep(100 * time.Millisecond)
fmt.Println("\nDemonstrating Broadcast (wakes all waiters):")
mu.Lock()
ready = true
cond.Broadcast()
mu.Unlock()
wg.Wait()
}
func producerConsumer() {
fmt.Println("\n=== Producer-Consumer with Cond ===")
var mu sync.Mutex
notEmpty := sync.NewCond(&mu)
notFull := sync.NewCond(&mu)
// Bounded buffer
const capacity = 3
buffer := make([]int, 0, capacity)
// Producer function
producer := func(items int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= items; i++ {
mu.Lock()
// Wait while buffer is full
for len(buffer) == capacity {
fmt.Println("Producer: Buffer full, waiting")
notFull.Wait()
}
// Add item to buffer
buffer = append(buffer, i)
fmt.Printf("Producer: Added item %d, buffer size: %d\n", i, len(buffer))
// Signal that buffer is not empty
notEmpty.Signal()
mu.Unlock()
// Simulate variable production rate
time.Sleep(50 * time.Millisecond)
}
fmt.Println("Producer: Finished producing items")
}
// Consumer function
consumer := func(wg *sync.WaitGroup) {
defer wg.Done()
// Consume 8 items (more than producer will produce)
for consumed := 0; consumed < 8; consumed++ {
mu.Lock()
// Wait while buffer is empty
for len(buffer) == 0 {
fmt.Println("Consumer: Buffer empty, waiting")
notEmpty.Wait()
// Check if we should exit (producer is done and buffer is empty)
if producerDone && len(buffer) == 0 {
fmt.Println("Consumer: No more items, exiting")
mu.Unlock()
return
}
}
// Remove item from buffer
item := buffer[0]
buffer = buffer[1:]
fmt.Printf("Consumer: Removed item %d, buffer size: %d\n", item, len(buffer))
// Signal that buffer is not full
notFull.Signal()
mu.Unlock()
// Simulate processing
time.Sleep(100 * time.Millisecond)
}
fmt.Println("Consumer: Finished consuming items")
}
// Flag to indicate producer is done
var producerDone bool
// Start producer and consumer
var wg sync.WaitGroup
wg.Add(2)
go producer(5, &wg)
go consumer(&wg)
// Wait for producer to finish
go func() {
time.Sleep(500 * time.Millisecond)
mu.Lock()
producerDone = true
// Wake up consumer if it's waiting
notEmpty.Signal()
mu.Unlock()
}()
wg.Wait()
fmt.Println("Main: Producer and consumer have finished")
}
func main() {
condPatterns()
}
Key insights about Cond:
- Wait Atomically Releases Lock: The Wait method atomically releases the mutex and blocks the goroutine.
- Spurious Wakeups: Always use a loop to check the condition when waiting.
- Signal vs. Broadcast: Signal wakes one waiter, while Broadcast wakes all waiters.
- Producer-Consumer Pattern: Cond variables are ideal for implementing bounded buffer patterns with multiple conditions.
Internally, Cond maintains a linked list of waiting goroutines. When Signal is called, it moves the first waiter to the ready queue. When Broadcast is called, it moves all waiters to the ready queue. This implementation ensures efficient signaling without unnecessary context switches.
Pool: Managing Reusable Resources
The sync.Pool
provides a way to reuse temporary objects, reducing allocation pressure on the garbage collector:
package main
import (
"bytes"
"fmt"
"sync"
"time"
)
func poolPatterns() {
// Basic pool usage
basicPool()
// Pool with custom initialization
customInitPool()
// Pool performance benchmark
poolPerformance()
}
func basicPool() {
fmt.Println("\n=== Basic Pool Usage ===")
// Create a pool of byte buffers
var bufferPool = sync.Pool{
New: func() interface{} {
buffer := new(bytes.Buffer)
fmt.Println("Creating new buffer")
return buffer
},
}
// Get a buffer from the pool
fmt.Println("Getting first buffer:")
buffer1 := bufferPool.Get().(*bytes.Buffer)
// Use the buffer
buffer1.WriteString("Hello, World!")
fmt.Printf("Buffer 1 contents: %s\n", buffer1.String())
// Reset and return the buffer to the pool
buffer1.Reset()
bufferPool.Put(buffer1)
fmt.Println("Returned buffer 1 to pool")
// Get another buffer (should reuse the one we just put back)
fmt.Println("Getting second buffer:")
buffer2 := bufferPool.Get().(*bytes.Buffer)
// Check if it's the same buffer
fmt.Printf("Buffer 2 is buffer 1: %v\n", buffer1 == buffer2)
// Use the buffer again
buffer2.WriteString("Reused buffer")
fmt.Printf("Buffer 2 contents: %s\n", buffer2.String())
// Return to pool
buffer2.Reset()
bufferPool.Put(buffer2)
}
func customInitPool() {
fmt.Println("\n=== Pool with Custom Initialization ===")
// Create a pool of slices with custom capacity
var slicePool = sync.Pool{
New: func() interface{} {
slice := make([]int, 0, 1024)
fmt.Println("Created new slice with capacity 1024")
return &slice
},
}
// Function that uses a slice from the pool
processData := func(id int, data []int) {
// Get a slice from the pool
slicePtr := slicePool.Get().(*[]int)
slice := (*slicePtr)[:0] // Reset length but keep capacity
// Use the slice
slice = append(slice, data...)
fmt.Printf("Worker %d: Processing %d items with slice capacity %d\n",
id, len(slice), cap(slice))
// Return slice to pool
*slicePtr = slice[:0] // Clear again before returning
slicePool.Put(slicePtr)
}
// Use the pool from multiple goroutines
var wg sync.WaitGroup
for i := 1; i <= 4; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Generate some test data
data := make([]int, id*100)
for j := range data {
data[j] = j
}
processData(id, data)
}(i)
}
wg.Wait()
}
func poolPerformance() {
fmt.Println("\n=== Pool Performance Benchmark ===")
// Create a pool of large buffers
var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024*1024) // 1MB buffer
},
}
// Function that either uses the pool or allocates directly
benchmark := func(usePool bool, iterations int) time.Duration {
start := time.Now()
for i := 0; i < iterations; i++ {
var buffer []byte
if usePool {
// Get from pool
buffer = bufferPool.Get().([]byte)
// Use the buffer
buffer[0] = 1
// Return to pool
bufferPool.Put(buffer)
} else {
// Allocate directly
buffer = make([]byte, 1024*1024)
buffer[0] = 1
// Buffer will be garbage collected
}
}
return time.Since(start)
}
// Run benchmarks
iterations := 1000
// Warm up the pool
benchmark(true, 100)
// Benchmark with and without pool
withoutPoolTime := benchmark(false, iterations)
withPoolTime := benchmark(true, iterations)
fmt.Printf("Without Pool: %v for %d iterations\n", withoutPoolTime, iterations)
fmt.Printf("With Pool: %v for %d iterations\n", withPoolTime, iterations)
fmt.Printf("Performance improvement: %.2fx\n",
float64(withoutPoolTime)/float64(withPoolTime))
}
func main() {
poolPatterns()
}
Key insights about Pool:
- Temporary Object Reuse: Pool reduces garbage collection pressure by reusing objects.
- No Guarantees: Items may be removed from the pool at any time, especially during garbage collection.
- Thread Safety: Pool is safe for concurrent use by multiple goroutines.
- Reset Before Reuse: Always reset objects before returning them to the pool.
Internally, Pool uses per-P (processor) caches to minimize contention. Each P has its own local pool, and items are stolen from other Ps only when the local pool is empty. This design makes Pool highly efficient for concurrent access patterns.
Map: Concurrent Access to Maps
The sync.Map
provides a concurrent map implementation optimized for specific access patterns:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func mapPatterns() {
// Basic Map usage
basicMap()
// Load or store pattern
loadOrStore()
// Range iteration
rangeMap()
}
func basicMap() {
fmt.Println("\n=== Basic Map Operations ===")
// Create a concurrent map
var m sync.Map
// Store values
m.Store("key1", 100)
m.Store("key2", 200)
m.Store("key3", 300)
// Load values
value, ok := m.Load("key2")
fmt.Printf("Load key2: value=%v, found=%v\n", value, ok)
value, ok = m.Load("nonexistent")
fmt.Printf("Load nonexistent: value=%v, found=%v\n", value, ok)
// Delete a value
m.Delete("key1")
value, ok = m.Load("key1")
fmt.Printf("After delete, load key1: value=%v, found=%v\n", value, ok)
}
func loadOrStore() {
fmt.Println("\n=== LoadOrStore Pattern ===")
// Create a concurrent map
var m sync.Map
// Function to get or initialize a counter
getCounter := func(key string) *int64 {
// Try to load existing counter
value, ok := m.Load(key)
if ok {
return value.(*int64)
}
// Create new counter
counter := new(int64)
// Try to store it (might race with another goroutine)
actual, loaded := m.LoadOrStore(key, counter)
if loaded {
// Another goroutine beat us to it
return actual.(*int64)
}
// We stored our counter
return counter
}
// Use counters from multiple goroutines
var wg sync.WaitGroup
keys := []string{"a", "b", "a", "c", "b", "a"}
for i, key := range keys {
wg.Add(1)
go func(id int, k string) {
defer wg.Done()
// Get or initialize counter for this key
counter := getCounter(k)
// Increment counter
newValue := atomic.AddInt64(counter, 1)
fmt.Printf("Goroutine %d: Incremented counter for key %s to %d\n",
id, k, newValue)
}(i, key)
}
wg.Wait()
// Print final counter values
fmt.Println("Final counter values:")
m.Range(func(key, value interface{}) bool {
fmt.Printf(" %s: %d\n", key, *value.(*int64))
return true
})
}
func rangeMap() {
fmt.Println("\n=== Range Iteration ===")
// Create and populate a map
var m sync.Map
for i := 1; i <= 5; i++ {
key := fmt.Sprintf("key%d", i)
m.Store(key, i*10)
}
// Iterate over all key-value pairs
fmt.Println("Map contents:")
m.Range(func(key, value interface{}) bool {
fmt.Printf(" %v: %v\n", key, value)
return true // continue iteration
})
// Selective iteration (stop after finding a specific key)
fmt.Println("Searching for key3:")
found := false
m.Range(func(key, value interface{}) bool {
fmt.Printf(" Checking %v\n", key)
if key.(string) == "key3" {
fmt.Printf(" Found key3 with value %v\n", value)
found = true
return false // stop iteration
}
return true // continue iteration
})
if !found {
fmt.Println(" key3 not found")
}
}
func main() {
mapPatterns()
}
Key insights about Map:
- Optimized Use Cases: Sync.Map is optimized for cases where keys are stable (few deletions) and either:
- Each key is accessed by only one goroutine (partitioned access)
- Many goroutines read and few write (read-heavy workloads)
- No Length Method: Unlike built-in maps, sync.Map doesn’t provide a length method.
- Type Assertions Required: Values are stored as interface{}, requiring type assertions when retrieved.
- Atomic Operations: LoadOrStore provides an atomic “check and set” operation.
Internally, sync.Map uses a clever combination of an immutable read-only map and a dirty map with a mutex. This design allows for lock-free reads in the common case, falling back to the dirty map only when necessary.
Advanced Synchronization Patterns
Beyond the basic primitives, Go enables sophisticated synchronization patterns by combining multiple primitives and leveraging channels.
Semaphore Implementation
Semaphores are not provided directly in the sync package but can be implemented using channels or a combination of mutex and condition variable:
package main
import (
"context"
"fmt"
"sync"
"time"
)
// ChannelSemaphore implements a semaphore using a buffered channel
type ChannelSemaphore struct {
permits chan struct{}
}
// NewChannelSemaphore creates a new semaphore with the given number of permits
func NewChannelSemaphore(n int) *ChannelSemaphore {
return &ChannelSemaphore{
permits: make(chan struct{}, n),
}
}
// Acquire acquires a permit, blocking until one is available
func (s *ChannelSemaphore) Acquire() {
s.permits <- struct{}{}
}
// TryAcquire attempts to acquire a permit without blocking
func (s *ChannelSemaphore) TryAcquire() bool {
select {
case s.permits <- struct{}{}:
return true
default:
return false
}
}
// AcquireWithTimeout attempts to acquire a permit with a timeout
func (s *ChannelSemaphore) AcquireWithTimeout(timeout time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case s.permits <- struct{}{}:
return true
case <-ctx.Done():
return false
}
}
// Release releases a permit
func (s *ChannelSemaphore) Release() {
select {
case <-s.permits:
default:
panic("Semaphore release without acquire")
}
}
// MutexSemaphore implements a semaphore using mutex and condition variable
type MutexSemaphore struct {
mu sync.Mutex
cond *sync.Cond
count int
maxSize int
}
// NewMutexSemaphore creates a new semaphore with the given number of permits
func NewMutexSemaphore(n int) *MutexSemaphore {
s := &MutexSemaphore{
maxSize: n,
count: n,
}
s.cond = sync.NewCond(&s.mu)
return s
}
// Acquire acquires a permit, blocking until one is available
func (s *MutexSemaphore) Acquire() {
s.mu.Lock()
defer s.mu.Unlock()
for s.count == 0 {
s.cond.Wait()
}
s.count--
}
// TryAcquire attempts to acquire a permit without blocking
func (s *MutexSemaphore) TryAcquire() bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.count == 0 {
return false
}
s.count--
return true
}
// Release releases a permit
func (s *MutexSemaphore) Release() {
s.mu.Lock()
defer s.mu.Unlock()
if s.count >= s.maxSize {
panic("Semaphore release without acquire")
}
s.count++
s.cond.Signal()
}
func demonstrateSemaphores() {
fmt.Println("\n=== Channel-based Semaphore ===")
// Create a semaphore with 3 permits
sem := NewChannelSemaphore(3)
// Launch workers that need to acquire the semaphore
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d: Attempting to acquire semaphore\n", id)
// Try with timeout first
if sem.AcquireWithTimeout(time.Duration(id) * 50 * time.Millisecond) {
fmt.Printf("Worker %d: Acquired with timeout\n", id)
} else {
fmt.Printf("Worker %d: Timeout expired, acquiring with blocking call\n", id)
sem.Acquire()
fmt.Printf("Worker %d: Acquired after blocking\n", id)
}
// Simulate work
fmt.Printf("Worker %d: Working...\n", id)
time.Sleep(200 * time.Millisecond)
// Release the semaphore
fmt.Printf("Worker %d: Releasing semaphore\n", id)
sem.Release()
}(i)
// Small delay between starting workers
time.Sleep(10 * time.Millisecond)
}
wg.Wait()
fmt.Println("\n=== Mutex-based Semaphore ===")
// Create a mutex-based semaphore with 2 permits
mutexSem := NewMutexSemaphore(2)
// Launch workers
for i := 1; i <= 4; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d: Attempting to acquire mutex semaphore\n", id)
// Try non-blocking acquire first
if mutexSem.TryAcquire() {
fmt.Printf("Worker %d: Acquired without blocking\n", id)
} else {
fmt.Printf("Worker %d: Could not acquire immediately, blocking...\n", id)
mutexSem.Acquire()
fmt.Printf("Worker %d: Acquired after blocking\n", id)
}
// Simulate work
fmt.Printf("Worker %d: Working...\n", id)
time.Sleep(150 * time.Millisecond)
// Release the semaphore
fmt.Printf("Worker %d: Releasing mutex semaphore\n", id)
mutexSem.Release()
}(i)
// Small delay between starting workers
time.Sleep(10 * time.Millisecond)
}
wg.Wait()
}
func main() {
demonstrateSemaphores()
}
This example demonstrates two semaphore implementations:
- Channel-based Semaphore: Uses a buffered channel to represent permits.
- Mutex-based Semaphore: Uses a mutex and condition variable for more complex control.
Both implementations provide similar functionality but have different performance characteristics and capabilities.
Read-Write Preference Control
While sync.RWMutex
gives preference to writers, sometimes we need more control over access patterns:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// ReaderPreferenceRWMutex gives preference to readers over writers
type ReaderPreferenceRWMutex struct {
mu sync.Mutex
readerCount int32
readerWait sync.Cond
writerWait sync.Cond
writing bool
}
// NewReaderPreferenceRWMutex creates a new reader-preference RW mutex
func NewReaderPreferenceRWMutex() *ReaderPreferenceRWMutex {
rw := &ReaderPreferenceRWMutex{}
rw.readerWait.L = &rw.mu
rw.writerWait.L = &rw.mu
return rw
}
// RLock acquires a read lock
func (rw *ReaderPreferenceRWMutex) RLock() {
rw.mu.Lock()
// Wait if someone is writing
for rw.writing {
rw.readerWait.Wait()
}
atomic.AddInt32(&rw.readerCount, 1)
rw.mu.Unlock()
}
// RUnlock releases a read lock
func (rw *ReaderPreferenceRWMutex) RUnlock() {
rw.mu.Lock()
atomic.AddInt32(&rw.readerCount, -1)
// If no more readers, signal a waiting writer
if atomic.LoadInt32(&rw.readerCount) == 0 {
rw.writerWait.Signal()
}
rw.mu.Unlock()
}
// Lock acquires a write lock
func (rw *ReaderPreferenceRWMutex) Lock() {
rw.mu.Lock()
// Wait until there are no readers and no one is writing
for atomic.LoadInt32(&rw.readerCount) > 0 || rw.writing {
rw.writerWait.Wait()
}
rw.writing = true
rw.mu.Unlock()
}
// Unlock releases a write lock
func (rw *ReaderPreferenceRWMutex) Unlock() {
rw.mu.Lock()
rw.writing = false
// Signal all waiting readers (preference to readers)
rw.readerWait.Broadcast()
// Also signal one writer
rw.writerWait.Signal()
rw.mu.Unlock()
}
func demonstrateRWPreference() {
fmt.Println("\n=== Reader Preference RWMutex ===")
// Create a reader-preference RW mutex
rwMutex := NewReaderPreferenceRWMutex()
// Shared resource
var sharedData int
// Track operation counts
var readCount, writeCount int32
// Start time for measuring operations
startTime := time.Now()
endTime := startTime.Add(1 * time.Second)
// Launch reader goroutines
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for time.Now().Before(endTime) {
// Acquire read lock
rwMutex.RLock()
// Read shared data
_ = sharedData
atomic.AddInt32(&readCount, 1)
// Hold lock briefly
time.Sleep(5 * time.Millisecond)
// Release read lock
rwMutex.RUnlock()
// Small pause between reads
time.Sleep(5 * time.Millisecond)
}
}(i)
}
// Launch writer goroutines
for i := 1; i <= 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for time.Now().Before(endTime) {
// Acquire write lock
rwMutex.Lock()
// Update shared data
sharedData++
atomic.AddInt32(&writeCount, 1)
// Hold lock briefly
time.Sleep(10 * time.Millisecond)
// Release write lock
rwMutex.Unlock()
// Small pause between writes
time.Sleep(20 * time.Millisecond)
}
}(i)
}
wg.Wait()
fmt.Printf("Reader operations: %d\n", atomic.LoadInt32(&readCount))
fmt.Printf("Writer operations: %d\n", atomic.LoadInt32(&writeCount))
fmt.Printf("Ratio (reads/writes): %.2f\n",
float64(atomic.LoadInt32(&readCount))/float64(atomic.LoadInt32(&writeCount)))
// Compare with standard RWMutex (which has writer preference)
compareWithStandardRWMutex()
}
func compareWithStandardRWMutex() {
fmt.Println("\n=== Standard RWMutex (Writer Preference) ===")
// Create a standard RW mutex
var rwMutex sync.RWMutex
// Shared resource
var sharedData int
// Track operation counts
var readCount, writeCount int32
// Start time for measuring operations
startTime := time.Now()
endTime := startTime.Add(1 * time.Second)
// Launch reader goroutines
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for time.Now().Before(endTime) {
// Acquire read lock
rwMutex.RLock()
// Read shared data
_ = sharedData
atomic.AddInt32(&readCount, 1)
// Hold lock briefly
time.Sleep(5 * time.Millisecond)
// Release read lock
rwMutex.RUnlock()
// Small pause between reads
time.Sleep(5 * time.Millisecond)
}
}(i)
}
// Launch writer goroutines
for i := 1; i <= 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for time.Now().Before(endTime) {
// Acquire write lock
rwMutex.Lock()
// Update shared data
sharedData++
atomic.AddInt32(&writeCount, 1)
// Hold lock briefly
time.Sleep(10 * time.Millisecond)
// Release write lock
rwMutex.Unlock()
// Small pause between writes
time.Sleep(20 * time.Millisecond)
}
}(i)
}
wg.Wait()
fmt.Printf("Reader operations: %d\n", atomic.LoadInt32(&readCount))
fmt.Printf("Writer operations: %d\n", atomic.LoadInt32(&writeCount))
fmt.Printf("Ratio (reads/writes): %.2f\n",
float64(atomic.LoadInt32(&readCount))/float64(atomic.LoadInt32(&writeCount)))
}
func main() {
demonstrateRWPreference()
}
This example demonstrates:
- Custom RWMutex Implementation: A reader-preference RWMutex that prioritizes readers over writers.
- Performance Comparison: Comparing the custom implementation with the standard RWMutex.
The choice between reader and writer preference depends on the specific workload and requirements of your application.
Atomic Operations and Lock-Free Programming
For simple shared state, atomic operations can provide synchronization without locks:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func atomicPatterns() {
// Basic atomic operations
basicAtomic()
// Compare-and-swap patterns
casPatterns()
// Lock-free counter
lockFreeCounter()
}
func basicAtomic() {
fmt.Println("\n=== Basic Atomic Operations ===")
// Atomic integer operations
var counter int64
// Add
atomic.AddInt64(&counter, 10)
fmt.Printf("After adding 10: %d\n", atomic.LoadInt64(&counter))
// Subtract (add negative)
atomic.AddInt64(&counter, -5)
fmt.Printf("After subtracting 5: %d\n", atomic.LoadInt64(&counter))
// Store
atomic.StoreInt64(&counter, 42)
fmt.Printf("After storing 42: %d\n", atomic.LoadInt64(&counter))
// Swap (returns old value)
old := atomic.SwapInt64(&counter, 100)
fmt.Printf("Swapped %d with 100, now: %d\n", old, atomic.LoadInt64(&counter))
}
func casPatterns() {
fmt.Println("\n=== Compare-And-Swap Patterns ===")
var value int64 = 100
// Basic CAS
swapped := atomic.CompareAndSwapInt64(&value, 100, 200)
fmt.Printf("CAS 100->200: success=%v, value=%d\n", swapped, atomic.LoadInt64(&value))
// Failed CAS (wrong expected value)
swapped = atomic.CompareAndSwapInt64(&value, 100, 300)
fmt.Printf("CAS 100->300: success=%v, value=%d\n", swapped, atomic.LoadInt64(&value))
// Increment with CAS (retry loop)
incrementWithCAS := func(addr *int64, delta int64) {
for {
old := atomic.LoadInt64(addr)
new := old + delta
if atomic.CompareAndSwapInt64(addr, old, new) {
return
}
// If CAS failed, loop and try again
}
}
incrementWithCAS(&value, 50)
fmt.Printf("After CAS increment by 50: %d\n", atomic.LoadInt64(&value))
}
// LockFreeCounter is a counter that can be incremented concurrently without locks
type LockFreeCounter struct {
value uint64
}
// Increment atomically increments the counter and returns the new value
func (c *LockFreeCounter) Increment() uint64 {
return atomic.AddUint64(&c.value, 1)
}
// Load atomically loads the counter value
func (c *LockFreeCounter) Load() uint64 {
return atomic.LoadUint64(&c.value)
}
// LockBasedCounter is a counter that uses a mutex for synchronization
type LockBasedCounter struct {
mu sync.Mutex
value uint64
}
// Increment increments the counter and returns the new value
func (c *LockBasedCounter) Increment() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
return c.value
}
// Load returns the counter value
func (c *LockBasedCounter) Load() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func lockFreeCounter() {
fmt.Println("\n=== Lock-Free vs. Lock-Based Counter Benchmark ===")
// Create counters
lockFree := &LockFreeCounter{}
lockBased := &LockBasedCounter{}
// Benchmark function
benchmark := func(name string, counter interface{}, goroutines, iterations int) time.Duration {
var wg sync.WaitGroup
// Create a function that increments the appropriate counter
var incrementFn func()
switch c := counter.(type) {
case *LockFreeCounter:
incrementFn = func() { c.Increment() }
case *LockBasedCounter:
incrementFn = func() { c.Increment() }
}
// Start timing
start := time.Now()
// Launch goroutines
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Perform increments
for j := 0; j < iterations; j++ {
incrementFn()
}
}()
}
// Wait for all goroutines to complete
wg.Wait()
// Return elapsed time
return time.Since(start)
}
// Run benchmarks
goroutines := 4
iterations := 1000000
lockFreeTime := benchmark("Lock-Free", lockFree, goroutines, iterations)
lockBasedTime := benchmark("Lock-Based", lockBased, goroutines, iterations)
// Verify results
fmt.Printf("Lock-Free Counter: %d\n", lockFree.Load())
fmt.Printf("Lock-Based Counter: %d\n", lockBased.Load())
// Compare performance
fmt.Printf("Lock-Free Time: %v\n", lockFreeTime)
fmt.Printf("Lock-Based Time: %v\n", lockBasedTime)
fmt.Printf("Performance Ratio: %.2fx\n", float64(lockBasedTime)/float64(lockFreeTime))
}
func main() {
atomicPatterns()
}
Key insights about atomic operations:
- Hardware Support: Atomic operations are implemented using special CPU instructions.
- No Locks Required: They provide synchronization without the overhead of locks.
- Limited Operations: Only simple operations like load, store, add, and compare-and-swap are available.
- Performance Advantage: Atomic operations are significantly faster than mutex-based synchronization for simple cases.
Lock-free programming using atomic operations can provide substantial performance benefits but requires careful design to ensure correctness.
Custom Synchronization Primitives
While Go’s standard library provides a solid foundation, some scenarios require specialized synchronization primitives. Let’s explore how to build custom primitives that address specific needs.
Countdown Latch
A countdown latch allows one or more goroutines to wait until a set of operations completes:
package main
import (
"fmt"
"sync"
"time"
)
// CountdownLatch is a synchronization aid that allows one or more goroutines
// to wait until a set of operations being performed in other goroutines completes.
type CountdownLatch struct {
count int
mu sync.Mutex
cond *sync.Cond
}
// NewCountdownLatch creates a new countdown latch initialized with the given count.
func NewCountdownLatch(count int) *CountdownLatch {
latch := &CountdownLatch{
count: count,
}
latch.cond = sync.NewCond(&latch.mu)
return latch
}
// CountDown decrements the count of the latch, releasing all waiting goroutines
// when the count reaches zero.
func (l *CountdownLatch) CountDown() {
l.mu.Lock()
defer l.mu.Unlock()
if l.count <= 0 {
return
}
l.count--
if l.count == 0 {
l.cond.Broadcast()
}
}
// Await causes the current goroutine to wait until the latch has counted down to zero.
func (l *CountdownLatch) Await() {
l.mu.Lock()
defer l.mu.Unlock()
for l.count > 0 {
l.cond.Wait()
}
}
// TryAwait causes the current goroutine to wait until the latch has counted down to zero
// or the specified timeout elapses.
func (l *CountdownLatch) TryAwait(timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
l.mu.Lock()
defer l.mu.Unlock()
for l.count > 0 {
if time.Now().After(deadline) {
return false
}
// Set a short timeout for the condition variable wait
waitTimer := time.NewTimer(50 * time.Millisecond)
waitCh := make(chan struct{})
go func() {
l.cond.Wait()
close(waitCh)
}()
l.mu.Unlock()
select {
case <-waitCh:
// Condition was signaled
waitTimer.Stop()
l.mu.Lock()
case <-waitTimer.C:
// Timer expired, reacquire lock and check condition again
l.mu.Lock()
}
}
return true
}
// GetCount returns the current count.
func (l *CountdownLatch) GetCount() int {
l.mu.Lock()
defer l.mu.Unlock()
return l.count
}
func demonstrateCountdownLatch() {
fmt.Println("\n=== Countdown Latch ===")
// Create a countdown latch with count 3
latch := NewCountdownLatch(3)
// Start a goroutine that waits on the latch
go func() {
fmt.Println("Waiter: Waiting for latch to reach zero")
latch.Await()
fmt.Println("Waiter: Latch reached zero, proceeding")
}()
// Start another goroutine that waits with timeout
go func() {
fmt.Println("Timeout Waiter: Waiting with 2 second timeout")
if latch.TryAwait(2 * time.Second) {
fmt.Println("Timeout Waiter: Latch reached zero within timeout")
} else {
fmt.Println("Timeout Waiter: Timeout expired before latch reached zero")
}
}()
// Simulate work being completed
for i := 1; i <= 3; i++ {
time.Sleep(500 * time.Millisecond)
fmt.Printf("Main: Counting down latch (%d remaining)\n", latch.GetCount()-1)
latch.CountDown()
}
// Give waiters time to print their messages
time.Sleep(100 * time.Millisecond)
}
func main() {
demonstrateCountdownLatch()
}
Cyclic Barrier
A cyclic barrier enables a group of goroutines to wait for each other to reach a common execution point:
package main
import (
"fmt"
"sync"
"time"
)
// CyclicBarrier is a synchronization aid that allows a set of goroutines to all
// wait for each other to reach a common barrier point.
type CyclicBarrier struct {
parties int
count int
generation int
barrierAction func()
mu sync.Mutex
cond *sync.Cond
}
// NewCyclicBarrier creates a new cyclic barrier that will trip when the given
// number of parties are waiting upon it.
func NewCyclicBarrier(parties int, barrierAction func()) *CyclicBarrier {
barrier := &CyclicBarrier{
parties: parties,
count: parties,
barrierAction: barrierAction,
}
barrier.cond = sync.NewCond(&barrier.mu)
return barrier
}
// Await causes the current goroutine to wait until all parties have invoked await
// on this barrier. If the current goroutine is the last to arrive, the barrier action
// is executed and the barrier is reset.
func (b *CyclicBarrier) Await() int {
b.mu.Lock()
defer b.mu.Unlock()
generation := b.generation
// Decrement count and check if we're the last to arrive
b.count--
index := b.parties - b.count - 1
if b.count == 0 {
// We're the last to arrive
if b.barrierAction != nil {
// Execute the barrier action
b.barrierAction()
}
// Reset the barrier
b.count = b.parties
b.generation++
// Wake up all waiting goroutines
b.cond.Broadcast()
return index
}
// Wait until the barrier is tripped or reset
for generation == b.generation {
b.cond.Wait()
}
return index
}
// Reset resets the barrier to its initial state.
func (b *CyclicBarrier) Reset() {
b.mu.Lock()
defer b.mu.Unlock()
b.count = b.parties
b.generation++
b.cond.Broadcast()
}
// GetNumberWaiting returns the number of parties currently waiting at the barrier.
func (b *CyclicBarrier) GetNumberWaiting() int {
b.mu.Lock()
defer b.mu.Unlock()
return b.parties - b.count
}
func demonstrateCyclicBarrier() {
fmt.Println("\n=== Cyclic Barrier ===")
// Number of workers
numWorkers := 3
// Create a barrier action
barrierAction := func() {
fmt.Println("Barrier Action: All workers reached the barrier, executing barrier action")
}
// Create a cyclic barrier
barrier := NewCyclicBarrier(numWorkers, barrierAction)
// Create a wait group to wait for all workers to complete
var wg sync.WaitGroup
// Launch workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for phase := 1; phase <= 3; phase++ {
// Simulate work before reaching the barrier
workTime := time.Duration(id*100) * time.Millisecond
fmt.Printf("Worker %d: Working on phase %d for %v\n", id, phase, workTime)
time.Sleep(workTime)
fmt.Printf("Worker %d: Reached barrier for phase %d\n", id, phase)
index := barrier.Await()
fmt.Printf("Worker %d: Crossed barrier for phase %d (arrival index: %d)\n", id, phase, index)
// Small pause between phases
time.Sleep(50 * time.Millisecond)
}
}(i)
}
wg.Wait()
fmt.Println("All workers completed all phases")
}
func main() {
demonstrateCyclicBarrier()
}
These custom synchronization primitives demonstrate how Go’s basic primitives can be combined to create more specialized tools. Each addresses specific coordination patterns that aren’t directly provided by the standard library.
Performance Optimization Techniques
Effective synchronization isn’t just about correctness—it’s also about performance. Let’s explore techniques to optimize synchronization in Go applications.
Lock Granularity
The granularity of locks significantly impacts performance:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// CoarseGrainedMap uses a single lock for the entire map
type CoarseGrainedMap struct {
mu sync.RWMutex
data map[string]int
}
// NewCoarseGrainedMap creates a new coarse-grained map
func NewCoarseGrainedMap() *CoarseGrainedMap {
return &CoarseGrainedMap{
data: make(map[string]int),
}
}
// Get retrieves a value from the map
func (m *CoarseGrainedMap) Get(key string) (int, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
val, ok := m.data[key]
return val, ok
}
// Set stores a value in the map
func (m *CoarseGrainedMap) Set(key string, value int) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[key] = value
}
// FineGrainedMap uses a separate lock for each shard
type FineGrainedMap struct {
shards []*mapShard
shardMask int
}
type mapShard struct {
mu sync.RWMutex
data map[string]int
}
// NewFineGrainedMap creates a new fine-grained map with the specified number of shards
func NewFineGrainedMap(numShards int) *FineGrainedMap {
// Round up to power of 2
shardCount := 1
for shardCount < numShards {
shardCount *= 2
}
m := &FineGrainedMap{
shards: make([]*mapShard, shardCount),
shardMask: shardCount - 1,
}
for i := 0; i < shardCount; i++ {
m.shards[i] = &mapShard{
data: make(map[string]int),
}
}
return m
}
// getShard returns the shard for the given key
func (m *FineGrainedMap) getShard(key string) *mapShard {
// Simple hash function
hash := 0
for i := 0; i < len(key); i++ {
hash = 31*hash + int(key[i])
}
return m.shards[hash&m.shardMask]
}
// Get retrieves a value from the map
func (m *FineGrainedMap) Get(key string) (int, bool) {
shard := m.getShard(key)
shard.mu.RLock()
defer shard.mu.RUnlock()
val, ok := shard.data[key]
return val, ok
}
// Set stores a value in the map
func (m *FineGrainedMap) Set(key string, value int) {
shard := m.getShard(key)
shard.mu.Lock()
defer shard.mu.Unlock()
shard.data[key] = value
}
func benchmarkMaps() {
fmt.Println("\n=== Lock Granularity Benchmark ===")
// Create maps
coarse := NewCoarseGrainedMap()
fine := NewFineGrainedMap(16)
// Prepare keys
const keyCount = 1000
keys := make([]string, keyCount)
for i := 0; i < keyCount; i++ {
keys[i] = fmt.Sprintf("key%d", i)
}
// Benchmark function
benchmark := func(name string, m interface{}, readPct int, goroutines int) time.Duration {
var wg sync.WaitGroup
var ops int64
// Create getter and setter functions
var getFn func(string) (int, bool)
var setFn func(string, int)
switch mp := m.(type) {
case *CoarseGrainedMap:
getFn = mp.Get
setFn = mp.Set
case *FineGrainedMap:
getFn = mp.Get
setFn = mp.Set
}
// Start timing
start := time.Now()
// Launch goroutines
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Local random number generator
seed := int64(id)
// Perform operations
for j := 0; j < 10000; j++ {
// Determine operation type
seed = (seed*1103515245 + 12345) & 0x7fffffff
isRead := int(seed%100) < readPct
// Select a key
seed = (seed*1103515245 + 12345) & 0x7fffffff
key := keys[seed%keyCount]
if isRead {
// Read operation
_, _ = getFn(key)
} else {
// Write operation
setFn(key, int(seed))
}
atomic.AddInt64(&ops, 1)
}
}(i)
}
// Wait for all goroutines to complete
wg.Wait()
// Return elapsed time
elapsed := time.Since(start)
fmt.Printf("%s: %v for %d operations (%.2f ops/ms)\n",
name, elapsed, ops, float64(ops)/(float64(elapsed)/float64(time.Millisecond)))
return elapsed
}
// Run benchmarks with different read/write ratios
fmt.Println("Read-heavy workload (95% reads):")
coarseTime := benchmark("Coarse-grained", coarse, 95, 8)
fineTime := benchmark("Fine-grained", fine, 95, 8)
fmt.Printf("Improvement: %.2fx\n", float64(coarseTime)/float64(fineTime))
fmt.Println("\nBalanced workload (50% reads):")
coarseTime = benchmark("Coarse-grained", coarse, 50, 8)
fineTime = benchmark("Fine-grained", fine, 50, 8)
fmt.Printf("Improvement: %.2fx\n", float64(coarseTime)/float64(fineTime))
}
func main() {
benchmarkMaps()
}
Key insights about lock granularity:
- Coarse-grained Locking: Uses fewer locks but causes more contention.
- Fine-grained Locking: Uses more locks but reduces contention.
- Sharding: Divides data into independent partitions, each with its own lock.
- Performance Impact: The optimal granularity depends on the access pattern and concurrency level.
Lock Contention Profiling
Identifying lock contention is crucial for optimizing synchronization:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// MutexWithMetrics adds instrumentation to track lock contention
type MutexWithMetrics struct {
mu sync.Mutex
name string
acquisitions int64
contentions int64
totalWaitTime time.Duration
maxWaitTime time.Duration
}
// NewMutexWithMetrics creates a new instrumented mutex
func NewMutexWithMetrics(name string) *MutexWithMetrics {
return &MutexWithMetrics{
name: name,
}
}
// Lock acquires the mutex, tracking contention metrics
func (m *MutexWithMetrics) Lock() {
start := time.Now()
// Try to acquire the lock without blocking
if m.mu.TryLock() {
// Acquired without contention
m.acquisitions++
return
}
// Contention occurred, acquire with blocking
m.contentions++
m.mu.Lock()
// Calculate wait time
waitTime := time.Since(start)
m.totalWaitTime += waitTime
if waitTime > m.maxWaitTime {
m.maxWaitTime = waitTime
}
m.acquisitions++
}
// Unlock releases the mutex
func (m *MutexWithMetrics) Unlock() {
m.mu.Unlock()
}
// ReportMetrics prints contention statistics
func (m *MutexWithMetrics) ReportMetrics() {
fmt.Printf("=== Mutex Metrics: %s ===\n", m.name)
fmt.Printf("Total acquisitions: %d\n", m.acquisitions)
fmt.Printf("Contentions: %d (%.2f%%)\n", m.contentions,
float64(m.contentions)*100/float64(m.acquisitions))
if m.contentions > 0 {
fmt.Printf("Average wait time: %v\n", m.totalWaitTime/time.Duration(m.contentions))
fmt.Printf("Maximum wait time: %v\n", m.maxWaitTime)
}
}
func demonstrateMutexMetrics() {
fmt.Println("\n=== Lock Contention Profiling ===")
// Create instrumented mutexes
highContentionMutex := NewMutexWithMetrics("High Contention")
lowContentionMutex := NewMutexWithMetrics("Low Contention")
// Function that simulates work with the mutex
simulateWork := func(m *MutexWithMetrics, holdTime time.Duration, iterations int) {
for i := 0; i < iterations; i++ {
m.Lock()
time.Sleep(holdTime)
m.Unlock()
// Pause between acquisitions
time.Sleep(1 * time.Millisecond)
}
}
// Launch goroutines with high contention
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
simulateWork(highContentionMutex, 5*time.Millisecond, 20)
}()
}
// Launch goroutines with low contention
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
simulateWork(lowContentionMutex, 1*time.Millisecond, 20)
}()
}
wg.Wait()
// Report metrics
highContentionMutex.ReportMetrics()
lowContentionMutex.ReportMetrics()
}
func main() {
demonstrateMutexMetrics()
}
Lock-Free Techniques
For some data structures, lock-free approaches can provide significant performance benefits:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// LockFreeQueue is a simple lock-free queue implementation
type LockFreeQueue struct {
head atomic.Pointer[node]
tail atomic.Pointer[node]
count int64
}
type node struct {
value interface{}
next atomic.Pointer[node]
}
// NewLockFreeQueue creates a new lock-free queue
func NewLockFreeQueue() *LockFreeQueue {
q := &LockFreeQueue{}
// Create sentinel node
sentinel := &node{}
q.head.Store(sentinel)
q.tail.Store(sentinel)
return q
}
// Enqueue adds an item to the queue
func (q *LockFreeQueue) Enqueue(value interface{}) {
// Create new node
newNode := &node{value: value}
for {
// Get current tail
tail := q.tail.Load()
next := tail.next.Load()
// Check if tail is still valid
if tail != q.tail.Load() {
continue
}
// If tail.next is not nil, tail is falling behind
if next != nil {
// Try to advance tail
q.tail.CompareAndSwap(tail, next)
continue
}
// Try to link the new node
if tail.next.CompareAndSwap(nil, newNode) {
// Success, try to advance tail
q.tail.CompareAndSwap(tail, newNode)
atomic.AddInt64(&q.count, 1)
return
}
}
}
// Dequeue removes and returns an item from the queue
func (q *LockFreeQueue) Dequeue() (interface{}, bool) {
for {
// Get current head and tail
head := q.head.Load()
tail := q.tail.Load()
next := head.next.Load()
// Check if head is still valid
if head != q.head.Load() {
continue
}
// If queue is empty
if head == tail {
if next == nil {
return nil, false
}
// Tail is falling behind, try to advance it
q.tail.CompareAndSwap(tail, next)
continue
}
// Queue is not empty, try to dequeue
if q.head.CompareAndSwap(head, next) {
value := next.value
atomic.AddInt64(&q.count, -1)
return value, true
}
}
}
// Size returns the approximate size of the queue
func (q *LockFreeQueue) Size() int {
return int(atomic.LoadInt64(&q.count))
}
// LockBasedQueue is a simple lock-based queue implementation
type LockBasedQueue struct {
mu sync.Mutex
head *node
tail *node
count int
}
// NewLockBasedQueue creates a new lock-based queue
func NewLockBasedQueue() *LockBasedQueue {
sentinel := &node{}
return &LockBasedQueue{
head: sentinel,
tail: sentinel,
}
}
// Enqueue adds an item to the queue
func (q *LockBasedQueue) Enqueue(value interface{}) {
q.mu.Lock()
defer q.mu.Unlock()
newNode := &node{value: value}
q.tail.next.Store(newNode)
q.tail = newNode
q.count++
}
// Dequeue removes and returns an item from the queue
func (q *LockBasedQueue) Dequeue() (interface{}, bool) {
q.mu.Lock()
defer q.mu.Unlock()
if q.head == q.tail {
return nil, false
}
next := q.head.next.Load()
q.head = next
q.count--
return next.value, true
}
// Size returns the size of the queue
func (q *LockBasedQueue) Size() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.count
}
func benchmarkQueues() {
fmt.Println("\n=== Lock-Free vs. Lock-Based Queue Benchmark ===")
// Create queues
lockFree := NewLockFreeQueue()
lockBased := NewLockBasedQueue()
// Benchmark function
benchmark := func(name string, enqueue, dequeue func(int) bool, iterations, goroutines int) time.Duration {
var wg sync.WaitGroup
// Start timing
start := time.Now()
// Launch producer goroutines
for i := 0; i < goroutines/2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Produce items
for j := 0; j < iterations; j++ {
enqueue(id*iterations + j)
}
}(i)
}
// Launch consumer goroutines
for i := 0; i < goroutines/2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Consume items
for j := 0; j < iterations; j++ {
for !dequeue(j) {
// Retry if queue is empty
runtime.Gosched()
}
}
}()
}
// Wait for all goroutines to complete
wg.Wait()
// Return elapsed time
return time.Since(start)
}
// Run benchmarks
iterations := 10000
goroutines := 8
lockFreeTime := benchmark("Lock-Free Queue",
func(i int) bool { lockFree.Enqueue(i); return true },
func(i int) bool { _, ok := lockFree.Dequeue(); return ok },
iterations, goroutines)
lockBasedTime := benchmark("Lock-Based Queue",
func(i int) bool { lockBased.Enqueue(i); return true },
func(i int) bool { _, ok := lockBased.Dequeue(); return ok },
iterations, goroutines)
// Compare performance
fmt.Printf("Lock-Free Queue Time: %v\n", lockFreeTime)
fmt.Printf("Lock-Based Queue Time: %v\n", lockBasedTime)
fmt.Printf("Performance Ratio: %.2fx\n", float64(lockBasedTime)/float64(lockFreeTime))
}
func main() {
benchmarkQueues()
}
Debugging and Monitoring Synchronization
Effective debugging and monitoring are essential for maintaining reliable concurrent systems.
Race Detection
Go’s race detector is a powerful tool for identifying data races:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// This function contains a data race
func demonstrateRace() {
fmt.Println("\n=== Data Race Example ===")
// Shared counter
counter := 0
// Launch goroutines that increment the counter
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // Race condition: read-modify-write without synchronization
}()
}
wg.Wait()
fmt.Printf("Final counter value: %d (expected 1000)\n", counter)
}
// This function fixes the race with a mutex
func demonstrateRaceFixed() {
fmt.Println("\n=== Data Race Fixed with Mutex ===")
// Shared counter with mutex
var counter int
var mu sync.Mutex
// Launch goroutines that increment the counter
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++ // Protected by mutex
mu.Unlock()
}()
}
wg.Wait()
fmt.Printf("Final counter value: %d (expected 1000)\n", counter)
}
// This function fixes the race with atomic operations
func demonstrateRaceFixedAtomic() {
fmt.Println("\n=== Data Race Fixed with Atomic Operations ===")
// Shared counter using atomic operations
var counter int64
// Launch goroutines that increment the counter
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1) // Atomic increment
}()
}
wg.Wait()
fmt.Printf("Final counter value: %d (expected 1000)\n", counter)
}
func main() {
// To detect races, run with: go run -race main.go
demonstrateRace()
demonstrateRaceFixed()
demonstrateRaceFixedAtomic()
}
To use the race detector, compile or run your program with the -race
flag:
go run -race main.go
go test -race ./...
go build -race main.go
Deadlock Detection
Go provides built-in deadlock detection for goroutines blocked in channel operations, but not for mutex-based deadlocks. Here’s a simple deadlock detector:
package main
import (
"fmt"
"sync"
"time"
)
// DeadlockDetector monitors mutex acquisitions to detect potential deadlocks
type DeadlockDetector struct {
mu sync.Mutex
goroutines map[int64][]string // goroutine ID -> held locks
locks map[string]int64 // lock name -> goroutine ID holding it
}
// NewDeadlockDetector creates a new deadlock detector
func NewDeadlockDetector() *DeadlockDetector {
return &DeadlockDetector{
goroutines: make(map[int64][]string),
locks: make(map[string]int64),
}
}
// BeforeLock is called before acquiring a lock
func (d *DeadlockDetector) BeforeLock(goroutineID int64, lockName string) {
d.mu.Lock()
defer d.mu.Unlock()
// Check if this lock is held by another goroutine
if holderID, exists := d.locks[lockName]; exists {
// Check what locks the holder has
holderLocks := d.goroutines[holderID]
// Check if the holder is waiting for any locks held by this goroutine
for _, heldLock := range d.goroutines[goroutineID] {
if holderWaiting, exists := d.locks[heldLock]; exists && holderWaiting == holderID {
fmt.Printf("POTENTIAL DEADLOCK DETECTED:\n")
fmt.Printf(" Goroutine %d holds %v and wants %s\n", goroutineID, d.goroutines[goroutineID], lockName)
fmt.Printf(" Goroutine %d holds %s and wants %v\n", holderID, lockName, holderLocks)
return
}
}
}
}
// AfterLock is called after acquiring a lock
func (d *DeadlockDetector) AfterLock(goroutineID int64, lockName string) {
d.mu.Lock()
defer d.mu.Unlock()
// Record that this goroutine holds this lock
d.goroutines[goroutineID] = append(d.goroutines[goroutineID], lockName)
d.locks[lockName] = goroutineID
}
// BeforeUnlock is called before releasing a lock
func (d *DeadlockDetector) BeforeUnlock(goroutineID int64, lockName string) {
d.mu.Lock()
defer d.mu.Unlock()
// Remove this lock from the goroutine's held locks
locks := d.goroutines[goroutineID]
for i, l := range locks {
if l == lockName {
d.goroutines[goroutineID] = append(locks[:i], locks[i+1:]...)
break
}
}
// Remove this lock from the locks map
delete(d.locks, lockName)
}
// InstrumentedMutex is a mutex with deadlock detection
type InstrumentedMutex struct {
mu sync.Mutex
name string
detector *DeadlockDetector
}
// NewInstrumentedMutex creates a new instrumented mutex
func NewInstrumentedMutex(name string, detector *DeadlockDetector) *InstrumentedMutex {
return &InstrumentedMutex{
name: name,
detector: detector,
}
}
// Lock acquires the mutex
func (m *InstrumentedMutex) Lock() {
goroutineID := 123 // In a real implementation, get the actual goroutine ID
m.detector.BeforeLock(goroutineID, m.name)
m.mu.Lock()
m.detector.AfterLock(goroutineID, m.name)
}
// Unlock releases the mutex
func (m *InstrumentedMutex) Unlock() {
goroutineID := 123 // In a real implementation, get the actual goroutine ID
m.detector.BeforeUnlock(goroutineID, m.name)
m.mu.Unlock()
}
func demonstrateDeadlockDetection() {
fmt.Println("\n=== Deadlock Detection ===")
// Create a deadlock detector
detector := NewDeadlockDetector()
// Create instrumented mutexes
mutex1 := NewInstrumentedMutex("mutex1", detector)
mutex2 := NewInstrumentedMutex("mutex2", detector)
// Simulate a potential deadlock scenario
go func() {
goroutineID := int64(1)
fmt.Println("Goroutine 1: Acquiring mutex1")
detector.BeforeLock(goroutineID, "mutex1")
// mutex1.Lock() - simulated
detector.AfterLock(goroutineID, "mutex1")
time.Sleep(100 * time.Millisecond)
fmt.Println("Goroutine 1: Acquiring mutex2")
detector.BeforeLock(goroutineID, "mutex2")
// This would block in a real deadlock
}()
go func() {
goroutineID := int64(2)
fmt.Println("Goroutine 2: Acquiring mutex2")
detector.BeforeLock(goroutineID, "mutex2")
// mutex2.Lock() - simulated
detector.AfterLock(goroutineID, "mutex2")
time.Sleep(100 * time.Millisecond)
fmt.Println("Goroutine 2: Acquiring mutex1")
detector.BeforeLock(goroutineID, "mutex1")
// This would block in a real deadlock
}()
// Give time for the simulation to run
time.Sleep(300 * time.Millisecond)
}
func main() {
demonstrateDeadlockDetection()
}
Production Implementation Strategies
Implementing synchronization in production systems requires careful consideration of various factors.
Choosing the Right Primitive
Selecting the appropriate synchronization primitive depends on the specific requirements:
Primitive | Use Case | Advantages | Disadvantages |
---|---|---|---|
Mutex |
Exclusive access to shared resources | Simple, well-understood | Can cause contention |
RWMutex |
Read-heavy workloads | Higher throughput for reads | More complex, writer starvation |
atomic |
Simple counters and flags | Low overhead, no blocking | Limited operations |
WaitGroup |
Waiting for multiple goroutines | Simple coordination | One-time use pattern |
Once |
One-time initialization | Thread-safe singleton | Limited to single execution |
Cond |
Producer-consumer patterns | Efficient signaling | Complex usage patterns |
Pool |
Object reuse | Reduces GC pressure | No size control |
Map |
Concurrent map access | Optimized for specific patterns | Type assertions required |
Channel |
Communication between goroutines | Clear ownership semantics | Can be less efficient |
Balancing Performance and Simplicity
When implementing synchronization in production systems, consider these guidelines:
- Start Simple: Begin with the simplest approach that ensures correctness.
- Measure First: Profile your application to identify actual bottlenecks before optimizing.
- Consider Contention: High-contention scenarios may benefit from more sophisticated approaches.
- Readability Matters: Complex synchronization is error-prone; prioritize clarity.
- Test Thoroughly: Use the race detector and stress testing to verify correctness.
Scaling Considerations
As your application scales, synchronization strategies may need to evolve:
- Partitioning: Divide data and workloads to minimize contention.
- Locality: Keep related data and operations together to reduce synchronization needs.
- Batching: Process multiple items in a single critical section to amortize synchronization costs.
- Relaxed Consistency: Consider whether eventual consistency is acceptable for some operations.
- Lock-Free Techniques: For extreme performance requirements, consider lock-free algorithms.
Mastering the Synchronization Spectrum
Throughout this exploration of Go’s synchronization primitives, we’ve journeyed from basic mutual exclusion to sophisticated custom synchronization tools. We’ve examined the internal workings of core primitives, explored advanced patterns that combine multiple mechanisms, and analyzed performance optimization techniques.
The key to effective synchronization in Go isn’t just knowing which primitive to use, but understanding the trade-offs each one presents. By mastering these primitives and patterns, you can build concurrent systems that are both correct and efficient, striking the right balance between safety and performance.
As you apply these techniques in your own applications, remember that synchronization is both an art and a science. The science lies in understanding the guarantees and limitations of each primitive; the art lies in designing systems that minimize contention while maintaining correctness. With the knowledge gained from this guide, you’re well-equipped to navigate the complexities of concurrent programming in Go and build systems that leverage the full power of modern hardware.