Core Sync Package Primitives
The sync package provides several fundamental primitives that form the building blocks for concurrent programming in Go. Understanding their implementation details and behavior characteristics is essential for effective usage.
Mutex: Beyond Simple Locking
At its core, sync.Mutex
provides mutual exclusion, ensuring that only one goroutine can access a protected resource at a time. However, its implementation and behavior have nuances worth exploring:
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
func mutexInternals() {
// Create a mutex
var mu sync.Mutex
// Counter to track goroutines that have acquired the lock
var counter int32
// Function that acquires the lock, increments counter, waits, then decrements
acquireLock := func(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Goroutine %d: Attempting to acquire lock\n", id)
mu.Lock()
// Atomically increment the counter
newCount := atomic.AddInt32(&counter, 1)
fmt.Printf("Goroutine %d: Acquired lock, active holders: %d\n", id, newCount)
// This should always be 1 if mutex is working correctly
if newCount != 1 {
panic(fmt.Sprintf("Mutex failure: %d goroutines holding lock", newCount))
}
// Simulate work while holding the lock
time.Sleep(50 * time.Millisecond)
// Atomically decrement the counter
atomic.AddInt32(&counter, -1)
fmt.Printf("Goroutine %d: Releasing lock\n", id)
mu.Unlock()
}
// Launch multiple goroutines to compete for the lock
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go acquireLock(i, &wg)
}
wg.Wait()
fmt.Println("All goroutines completed successfully")
// Demonstrate mutex starvation scenario
demonstrateMutexStarvation()
}
func demonstrateMutexStarvation() {
var mu sync.Mutex
// Track how many times each goroutine acquires the lock
acquisitions := make([]int, 2)
// Done channel to signal completion
done := make(chan bool)
// Launch two goroutines with different behaviors
// Goroutine 1: Acquires lock briefly and releases quickly
go func() {
for i := 0; i < 1000 && !isChannelClosed(done); i++ {
mu.Lock()
acquisitions[0]++
mu.Unlock()
// Give other goroutines a chance
runtime.Gosched()
}
done <- true
}()
// Goroutine 2: Holds lock for longer periods
go func() {
for i := 0; i < 100 && !isChannelClosed(done); i++ {
mu.Lock()
acquisitions[1]++
// Hold the lock longer
time.Sleep(1 * time.Millisecond)
mu.Unlock()
// Give other goroutines a chance
runtime.Gosched()
}
done <- true
}()
// Let them run for a bit
time.Sleep(100 * time.Millisecond)
close(done)
// Give goroutines time to finish
time.Sleep(10 * time.Millisecond)
fmt.Printf("Lock acquisition distribution:\n")
fmt.Printf("- Fast goroutine: %d acquisitions\n", acquisitions[0])
fmt.Printf("- Slow goroutine: %d acquisitions\n", acquisitions[1])
fmt.Printf("- Ratio (fast/slow): %.2f\n", float64(acquisitions[0])/float64(acquisitions[1]))
}
// Helper function to check if a channel is closed
func isChannelClosed(ch chan bool) bool {
select {
case <-ch:
return true
default:
return false
}
}
func main() {
mutexInternals()
}
This example demonstrates several important aspects of mutex behavior:
- Mutual Exclusion Guarantee: The atomic counter verifies that only one goroutine holds the lock at any time.
- FIFO vs. Fairness: Go’s mutex doesn’t guarantee first-in-first-out ordering for waiting goroutines.
- Starvation Potential: The second part demonstrates how goroutines that hold locks briefly can starve those that need them for longer periods.
Under the hood, Go’s mutex uses a combination of atomic operations and goroutine queuing. It employs a “barging” model where a releasing goroutine may immediately reacquire the lock if it attempts to do so before other waiting goroutines are scheduled, which can lead to starvation in certain scenarios.
RWMutex: Optimizing for Read-Heavy Workloads
The sync.RWMutex
extends the basic mutex concept by distinguishing between read and write operations, allowing multiple readers to access the protected resource simultaneously:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func rwMutexDemo() {
// Create a RWMutex
var rwMu sync.RWMutex
// Counters to track concurrent readers and writers
var readers int32
var writers int32
// Function for reader goroutines
reader := func(id int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 3; i++ {
// Acquire read lock
fmt.Printf("Reader %d: Attempting to acquire read lock\n", id)
rwMu.RLock()
// Increment reader count and check for writers
currentReaders := atomic.AddInt32(&readers, 1)
currentWriters := atomic.LoadInt32(&writers)
fmt.Printf("Reader %d: Acquired read lock, concurrent readers: %d, writers: %d\n",
id, currentReaders, currentWriters)
// Verify no writers are active
if currentWriters > 0 {
panic(fmt.Sprintf("RWMutex failure: %d writers active during read lock", currentWriters))
}
// Simulate reading
time.Sleep(50 * time.Millisecond)
// Decrement reader count and release lock
atomic.AddInt32(&readers, -1)
rwMu.RUnlock()
fmt.Printf("Reader %d: Released read lock\n", id)
// Small pause between operations
time.Sleep(10 * time.Millisecond)
}
}
// Function for writer goroutines
writer := func(id int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 2; i++ {
// Acquire write lock
fmt.Printf("Writer %d: Attempting to acquire write lock\n", id)
rwMu.Lock()
// Increment writer count and check for other writers and readers
currentWriters := atomic.AddInt32(&writers, 1)
currentReaders := atomic.LoadInt32(&readers)
fmt.Printf("Writer %d: Acquired write lock, concurrent writers: %d, readers: %d\n",
id, currentWriters, currentReaders)
// Verify no other writers or readers are active
if currentWriters != 1 {
panic(fmt.Sprintf("RWMutex failure: %d writers active concurrently", currentWriters))
}
if currentReaders > 0 {
panic(fmt.Sprintf("RWMutex failure: %d readers active during write lock", currentReaders))
}
// Simulate writing
time.Sleep(100 * time.Millisecond)
// Decrement writer count and release lock
atomic.AddInt32(&writers, -1)
rwMu.Unlock()
fmt.Printf("Writer %d: Released write lock\n", id)
// Small pause between operations
time.Sleep(20 * time.Millisecond)
}
}
// Launch multiple readers and writers
var wg sync.WaitGroup
// Start readers
for i := 1; i <= 5; i++ {
wg.Add(1)
go reader(i, &wg)
}
// Start writers
for i := 1; i <= 2; i++ {
wg.Add(1)
go writer(i, &wg)
}
wg.Wait()
fmt.Println("All reader and writer goroutines completed successfully")
// Demonstrate writer preference
demonstrateWriterPreference()
}
func demonstrateWriterPreference() {
var rwMu sync.RWMutex
// Signal channels
readerReady := make(chan bool)
writerReady := make(chan bool)
done := make(chan bool)
// Start a long-running reader
go func() {
rwMu.RLock()
readerReady <- true
// Hold read lock until done
<-done
rwMu.RUnlock()
}()
// Wait for reader to acquire lock
<-readerReady
// Start a writer that will block
go func() {
writerReady <- true
// This will block until reader releases lock
rwMu.Lock()
rwMu.Unlock()
done <- true
}()
// Wait for writer to be ready
<-writerReady
// Try to acquire another read lock after writer is waiting
// This demonstrates writer preference - new readers will block
// if there's a writer waiting
readBlocked := make(chan bool)
go func() {
// Small delay to ensure writer is waiting
time.Sleep(10 * time.Millisecond)
fmt.Println("New reader attempting to acquire lock while writer is waiting...")
// This should block because writer is waiting
rwMu.RLock()
rwMu.RUnlock()
readBlocked <- true
}()
// Let the system run for a bit
select {
case <-readBlocked:
fmt.Println("UNEXPECTED: New reader acquired lock while writer was waiting")
case <-time.After(50 * time.Millisecond):
fmt.Println("EXPECTED: New reader blocked because writer is waiting (writer preference)")
}
// Signal the original reader to release the lock
done <- true
// Wait for everything to complete
<-done
<-readBlocked
}
func main() {
rwMutexDemo()
}
Key insights about RWMutex:
- Reader Concurrency: Multiple goroutines can hold read locks simultaneously, as verified by the concurrent reader count.
- Writer Exclusivity: Write locks exclude both readers and other writers.
- Writer Preference: When both readers and writers are waiting, writers are generally given preference to prevent writer starvation in read-heavy workloads.
The RWMutex implementation uses a clever combination of a mutex and semaphore to track readers and coordinate with writers. This makes it significantly more complex than a basic mutex but enables higher throughput in read-heavy scenarios.
WaitGroup: Coordinating Task Completion
The sync.WaitGroup
provides a mechanism to wait for a collection of goroutines to finish their work:
package main
import (
"fmt"
"sync"
"time"
)
func waitGroupPatterns() {
// Basic WaitGroup usage
basicWaitGroup()
// Dynamic task generation
dynamicWaitGroup()
// Hierarchical WaitGroups
hierarchicalWaitGroups()
}
func basicWaitGroup() {
fmt.Println("\n=== Basic WaitGroup ===")
var wg sync.WaitGroup
// Launch multiple workers
for i := 1; i <= 3; i++ {
// Increment counter before launching goroutine
wg.Add(1)
go func(id int) {
// Ensure counter is decremented when goroutine completes
defer wg.Done()
fmt.Printf("Worker %d: Starting\n", id)
time.Sleep(time.Duration(id) * 100 * time.Millisecond)
fmt.Printf("Worker %d: Completed\n", id)
}(i)
}
// Wait for all workers to complete
fmt.Println("Main: Waiting for workers to complete")
wg.Wait()
fmt.Println("Main: All workers completed")
}
func dynamicWaitGroup() {
fmt.Println("\n=== Dynamic WaitGroup ===")
var wg sync.WaitGroup
// Channel for dynamic task generation
tasks := make(chan int, 10)
// Launch worker pool
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
fmt.Printf("Worker %d: Started, processing tasks\n", workerID)
// Process tasks until channel is closed
for taskID := range tasks {
fmt.Printf("Worker %d: Processing task %d\n", workerID, taskID)
time.Sleep(50 * time.Millisecond)
fmt.Printf("Worker %d: Completed task %d\n", workerID, taskID)
}
fmt.Printf("Worker %d: Shutting down\n", workerID)
}(i)
}
// Generate tasks
fmt.Println("Main: Generating tasks")
for i := 1; i <= 6; i++ {
tasks <- i
}
// Signal no more tasks
fmt.Println("Main: Closing task channel")
close(tasks)
// Wait for all workers to complete
fmt.Println("Main: Waiting for workers to complete")
wg.Wait()
fmt.Println("Main: All workers completed")
}
func hierarchicalWaitGroups() {
fmt.Println("\n=== Hierarchical WaitGroups ===")
var parentWg sync.WaitGroup
// Launch parent tasks
for i := 1; i <= 2; i++ {
parentWg.Add(1)
go func(parentID int) {
defer parentWg.Done()
fmt.Printf("Parent %d: Starting\n", parentID)
// Each parent launches multiple child tasks
var childWg sync.WaitGroup
for j := 1; j <= 3; j++ {
childWg.Add(1)
go func(parentID, childID int) {
defer childWg.Done()
fmt.Printf("Parent %d, Child %d: Starting\n", parentID, childID)
time.Sleep(50 * time.Millisecond)
fmt.Printf("Parent %d, Child %d: Completed\n", parentID, childID)
}(parentID, j)
}
// Wait for all children of this parent to complete
fmt.Printf("Parent %d: Waiting for children\n", parentID)
childWg.Wait()
fmt.Printf("Parent %d: All children completed\n", parentID)
}(i)
}
// Wait for all parent tasks to complete
fmt.Println("Main: Waiting for parent tasks to complete")
parentWg.Wait()
fmt.Println("Main: All parent tasks completed")
}
func main() {
waitGroupPatterns()
}
This example demonstrates several WaitGroup patterns:
- Basic Coordination: Using Add/Done/Wait to coordinate multiple goroutines.
- Dynamic Task Processing: Combining WaitGroup with channels for worker pools.
- Hierarchical Coordination: Nesting WaitGroups to coordinate multi-level task hierarchies.
Internally, WaitGroup uses atomic operations to maintain its counter, making it highly efficient for coordination across many goroutines. The implementation avoids mutex locks for the common Add/Done operations, only using synchronization when Wait is called.
Once: Ensuring Single Execution
The sync.Once
primitive guarantees that a function is executed exactly once, even when called from multiple goroutines:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func oncePatterns() {
// Basic Once usage
basicOnce()
// Once with error handling
onceWithErrorHandling()
// Multiple Once objects
multipleOnce()
}
func basicOnce() {
fmt.Println("\n=== Basic Once ===")
var once sync.Once
var counter int32
// Function to be executed once
initFunc := func() {
// Simulate initialization work
time.Sleep(100 * time.Millisecond)
atomic.AddInt32(&counter, 1)
fmt.Println("Initialization done")
}
// Launch multiple goroutines that all try to execute initFunc
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Goroutine %d: Calling initialization\n", id)
once.Do(initFunc)
fmt.Printf("Goroutine %d: Initialization call returned\n", id)
}(i)
}
wg.Wait()
fmt.Printf("Counter value: %d (should be 1)\n", atomic.LoadInt32(&counter))
}
func onceWithErrorHandling() {
fmt.Println("\n=== Once with Error Handling ===")
var once sync.Once
var initErr error
var initialized int32
// Function that might fail
initFunc := func() {
// Simulate initialization that might fail
fmt.Println("Performing initialization...")
time.Sleep(100 * time.Millisecond)
// Simulate a failure
// initErr = fmt.Errorf("initialization failed")
// If no error, mark as initialized
if initErr == nil {
atomic.StoreInt32(&initialized, 1)
fmt.Println("Initialization succeeded")
} else {
fmt.Printf("Initialization failed: %v\n", initErr)
}
}
// Wrapper function for initialization with error checking
initWithErrorCheck := func() error {
once.Do(initFunc)
return initErr
}
// Try initialization from multiple goroutines
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Goroutine %d: Attempting initialization\n", id)
err := initWithErrorCheck()
if err != nil {
fmt.Printf("Goroutine %d: Initialization error: %v\n", id, err)
} else {
fmt.Printf("Goroutine %d: Initialization successful\n", id)
}
}(i)
}
wg.Wait()
fmt.Printf("Final initialization state: %v\n", atomic.LoadInt32(&initialized) == 1)
}
func multipleOnce() {
fmt.Println("\n=== Multiple Once Objects ===")
// Create a map of Once objects for different resources
var mu sync.Mutex
resources := make(map[string]*sync.Once)
// Function to get or create a Once object for a resource
getResourceOnce := func(name string) *sync.Once {
mu.Lock()
defer mu.Unlock()
if once, exists := resources[name]; exists {
return once
}
once := new(sync.Once)
resources[name] = once
return once
}
// Function to initialize a resource
initResource := func(name string) {
fmt.Printf("Initializing resource: %s\n", name)
time.Sleep(50 * time.Millisecond)
fmt.Printf("Resource initialized: %s\n", name)
}
// Launch goroutines that initialize different resources
var wg sync.WaitGroup
resourceNames := []string{"db", "cache", "logger", "db", "cache"}
for i, name := range resourceNames {
wg.Add(1)
go func(id int, resourceName string) {
defer wg.Done()
fmt.Printf("Goroutine %d: Initializing %s\n", id, resourceName)
once := getResourceOnce(resourceName)
once.Do(func() { initResource(resourceName) })
fmt.Printf("Goroutine %d: %s initialization call returned\n", id, resourceName)
}(i, name)
}
wg.Wait()
// Check how many unique resources were initialized
mu.Lock()
fmt.Printf("Number of unique resources: %d\n", len(resources))
mu.Unlock()
}
func main() {
oncePatterns()
}
Key insights about Once:
- Guaranteed Single Execution: The function passed to Do() executes exactly once, regardless of how many goroutines call it.
- Blocking Behavior: All calls to Do() block until the first call completes.
- Error Handling: Once doesn’t provide built-in error handling, but patterns can be implemented to address this.
- Multiple Once Objects: Different initialization tasks require separate Once instances.
Internally, Once uses a combination of a mutex and an atomic flag to ensure the function is executed exactly once while allowing concurrent calls to proceed efficiently after initialization.
Cond: Coordinating Based on Conditions
The sync.Cond
primitive provides a way for goroutines to wait for a condition to become true:
package main
import (
"fmt"
"sync"
"time"
)
func condPatterns() {
// Basic condition variable usage
basicCond()
// Broadcast vs. Signal
broadcastVsSignal()
// Producer-consumer pattern
producerConsumer()
}
func basicCond() {
fmt.Println("\n=== Basic Condition Variable ===")
// Create a condition variable with its associated mutex
var mu sync.Mutex
cond := sync.NewCond(&mu)
// Shared state protected by the mutex
ready := false
// Launch a goroutine that will set the condition
go func() {
// Simulate some preparation work
time.Sleep(300 * time.Millisecond)
// Update the shared state and signal waiters
mu.Lock()
ready = true
fmt.Println("Signaler: Condition is now true")
cond.Signal()
mu.Unlock()
}()
// Wait for the condition to become true
fmt.Println("Waiter: Waiting for condition")
mu.Lock()
// While loop pattern to handle spurious wakeups
for !ready {
cond.Wait() // Atomically releases mutex and blocks until signaled
}
mu.Unlock()
fmt.Println("Waiter: Condition is true, proceeding")
}
func broadcastVsSignal() {
fmt.Println("\n=== Broadcast vs. Signal ===")
var mu sync.Mutex
cond := sync.NewCond(&mu)
// Shared state
ready := false
// Launch multiple waiters
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Waiter %d: Waiting for condition\n", id)
mu.Lock()
for !ready {
cond.Wait()
fmt.Printf("Waiter %d: Woke up, checking condition\n", id)
}
fmt.Printf("Waiter %d: Condition is true, proceeding\n", id)
mu.Unlock()
}(i)
}
// Give waiters time to start waiting
time.Sleep(100 * time.Millisecond)
// Signal vs. Broadcast demonstration
fmt.Println("\nDemonstrating Signal (wakes only one waiter):")
mu.Lock()
cond.Signal()
mu.Unlock()
// Give signaled waiter time to process
time.Sleep(100 * time.Millisecond)
fmt.Println("\nDemonstrating Broadcast (wakes all waiters):")
mu.Lock()
ready = true
cond.Broadcast()
mu.Unlock()
wg.Wait()
}
func producerConsumer() {
fmt.Println("\n=== Producer-Consumer with Cond ===")
var mu sync.Mutex
notEmpty := sync.NewCond(&mu)
notFull := sync.NewCond(&mu)
// Bounded buffer
const capacity = 3
buffer := make([]int, 0, capacity)
// Producer function
producer := func(items int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= items; i++ {
mu.Lock()
// Wait while buffer is full
for len(buffer) == capacity {
fmt.Println("Producer: Buffer full, waiting")
notFull.Wait()
}
// Add item to buffer
buffer = append(buffer, i)
fmt.Printf("Producer: Added item %d, buffer size: %d\n", i, len(buffer))
// Signal that buffer is not empty
notEmpty.Signal()
mu.Unlock()
// Simulate variable production rate
time.Sleep(50 * time.Millisecond)
}
fmt.Println("Producer: Finished producing items")
}
// Consumer function
consumer := func(wg *sync.WaitGroup) {
defer wg.Done()
// Consume 8 items (more than producer will produce)
for consumed := 0; consumed < 8; consumed++ {
mu.Lock()
// Wait while buffer is empty
for len(buffer) == 0 {
fmt.Println("Consumer: Buffer empty, waiting")
notEmpty.Wait()
// Check if we should exit (producer is done and buffer is empty)
if producerDone && len(buffer) == 0 {
fmt.Println("Consumer: No more items, exiting")
mu.Unlock()
return
}
}
// Remove item from buffer
item := buffer[0]
buffer = buffer[1:]
fmt.Printf("Consumer: Removed item %d, buffer size: %d\n", item, len(buffer))
// Signal that buffer is not full
notFull.Signal()
mu.Unlock()
// Simulate processing
time.Sleep(100 * time.Millisecond)
}
fmt.Println("Consumer: Finished consuming items")
}
// Flag to indicate producer is done
var producerDone bool
// Start producer and consumer
var wg sync.WaitGroup
wg.Add(2)
go producer(5, &wg)
go consumer(&wg)
// Wait for producer to finish
go func() {
time.Sleep(500 * time.Millisecond)
mu.Lock()
producerDone = true
// Wake up consumer if it's waiting
notEmpty.Signal()
mu.Unlock()
}()
wg.Wait()
fmt.Println("Main: Producer and consumer have finished")
}
func main() {
condPatterns()
}
Key insights about Cond:
- Wait Atomically Releases Lock: The Wait method atomically releases the mutex and blocks the goroutine.
- Spurious Wakeups: Always use a loop to check the condition when waiting.
- Signal vs. Broadcast: Signal wakes one waiter, while Broadcast wakes all waiters.
- Producer-Consumer Pattern: Cond variables are ideal for implementing bounded buffer patterns with multiple conditions.
Internally, Cond maintains a linked list of waiting goroutines. When Signal is called, it moves the first waiter to the ready queue. When Broadcast is called, it moves all waiters to the ready queue. This implementation ensures efficient signaling without unnecessary context switches.
Pool: Managing Reusable Resources
The sync.Pool
provides a way to reuse temporary objects, reducing allocation pressure on the garbage collector:
package main
import (
"bytes"
"fmt"
"sync"
"time"
)
func poolPatterns() {
// Basic pool usage
basicPool()
// Pool with custom initialization
customInitPool()
// Pool performance benchmark
poolPerformance()
}
func basicPool() {
fmt.Println("\n=== Basic Pool Usage ===")
// Create a pool of byte buffers
var bufferPool = sync.Pool{
New: func() interface{} {
buffer := new(bytes.Buffer)
fmt.Println("Creating new buffer")
return buffer
},
}
// Get a buffer from the pool
fmt.Println("Getting first buffer:")
buffer1 := bufferPool.Get().(*bytes.Buffer)
// Use the buffer
buffer1.WriteString("Hello, World!")
fmt.Printf("Buffer 1 contents: %s\n", buffer1.String())
// Reset and return the buffer to the pool
buffer1.Reset()
bufferPool.Put(buffer1)
fmt.Println("Returned buffer 1 to pool")
// Get another buffer (should reuse the one we just put back)
fmt.Println("Getting second buffer:")
buffer2 := bufferPool.Get().(*bytes.Buffer)
// Check if it's the same buffer
fmt.Printf("Buffer 2 is buffer 1: %v\n", buffer1 == buffer2)
// Use the buffer again
buffer2.WriteString("Reused buffer")
fmt.Printf("Buffer 2 contents: %s\n", buffer2.String())
// Return to pool
buffer2.Reset()
bufferPool.Put(buffer2)
}
func customInitPool() {
fmt.Println("\n=== Pool with Custom Initialization ===")
// Create a pool of slices with custom capacity
var slicePool = sync.Pool{
New: func() interface{} {
slice := make([]int, 0, 1024)
fmt.Println("Created new slice with capacity 1024")
return &slice
},
}
// Function that uses a slice from the pool
processData := func(id int, data []int) {
// Get a slice from the pool
slicePtr := slicePool.Get().(*[]int)
slice := (*slicePtr)[:0] // Reset length but keep capacity
// Use the slice
slice = append(slice, data...)
fmt.Printf("Worker %d: Processing %d items with slice capacity %d\n",
id, len(slice), cap(slice))
// Return slice to pool
*slicePtr = slice[:0] // Clear again before returning
slicePool.Put(slicePtr)
}
// Use the pool from multiple goroutines
var wg sync.WaitGroup
for i := 1; i <= 4; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Generate some test data
data := make([]int, id*100)
for j := range data {
data[j] = j
}
processData(id, data)
}(i)
}
wg.Wait()
}
func poolPerformance() {
fmt.Println("\n=== Pool Performance Benchmark ===")
// Create a pool of large buffers
var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024*1024) // 1MB buffer
},
}
// Function that either uses the pool or allocates directly
benchmark := func(usePool bool, iterations int) time.Duration {
start := time.Now()
for i := 0; i < iterations; i++ {
var buffer []byte
if usePool {
// Get from pool
buffer = bufferPool.Get().([]byte)
// Use the buffer
buffer[0] = 1
// Return to pool
bufferPool.Put(buffer)
} else {
// Allocate directly
buffer = make([]byte, 1024*1024)
buffer[0] = 1
// Buffer will be garbage collected
}
}
return time.Since(start)
}
// Run benchmarks
iterations := 1000
// Warm up the pool
benchmark(true, 100)
// Benchmark with and without pool
withoutPoolTime := benchmark(false, iterations)
withPoolTime := benchmark(true, iterations)
fmt.Printf("Without Pool: %v for %d iterations\n", withoutPoolTime, iterations)
fmt.Printf("With Pool: %v for %d iterations\n", withPoolTime, iterations)
fmt.Printf("Performance improvement: %.2fx\n",
float64(withoutPoolTime)/float64(withPoolTime))
}
func main() {
poolPatterns()
}
Key insights about Pool:
- Temporary Object Reuse: Pool reduces garbage collection pressure by reusing objects.
- No Guarantees: Items may be removed from the pool at any time, especially during garbage collection.
- Thread Safety: Pool is safe for concurrent use by multiple goroutines.
- Reset Before Reuse: Always reset objects before returning them to the pool.
Internally, Pool uses per-P (processor) caches to minimize contention. Each P has its own local pool, and items are stolen from other Ps only when the local pool is empty. This design makes Pool highly efficient for concurrent access patterns.
Map: Concurrent Access to Maps
The sync.Map
provides a concurrent map implementation optimized for specific access patterns:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func mapPatterns() {
// Basic Map usage
basicMap()
// Load or store pattern
loadOrStore()
// Range iteration
rangeMap()
}
func basicMap() {
fmt.Println("\n=== Basic Map Operations ===")
// Create a concurrent map
var m sync.Map
// Store values
m.Store("key1", 100)
m.Store("key2", 200)
m.Store("key3", 300)
// Load values
value, ok := m.Load("key2")
fmt.Printf("Load key2: value=%v, found=%v\n", value, ok)
value, ok = m.Load("nonexistent")
fmt.Printf("Load nonexistent: value=%v, found=%v\n", value, ok)
// Delete a value
m.Delete("key1")
value, ok = m.Load("key1")
fmt.Printf("After delete, load key1: value=%v, found=%v\n", value, ok)
}
func loadOrStore() {
fmt.Println("\n=== LoadOrStore Pattern ===")
// Create a concurrent map
var m sync.Map
// Function to get or initialize a counter
getCounter := func(key string) *int64 {
// Try to load existing counter
value, ok := m.Load(key)
if ok {
return value.(*int64)
}
// Create new counter
counter := new(int64)
// Try to store it (might race with another goroutine)
actual, loaded := m.LoadOrStore(key, counter)
if loaded {
// Another goroutine beat us to it
return actual.(*int64)
}
// We stored our counter
return counter
}
// Use counters from multiple goroutines
var wg sync.WaitGroup
keys := []string{"a", "b", "a", "c", "b", "a"}
for i, key := range keys {
wg.Add(1)
go func(id int, k string) {
defer wg.Done()
// Get or initialize counter for this key
counter := getCounter(k)
// Increment counter
newValue := atomic.AddInt64(counter, 1)
fmt.Printf("Goroutine %d: Incremented counter for key %s to %d\n",
id, k, newValue)
}(i, key)
}
wg.Wait()
// Print final counter values
fmt.Println("Final counter values:")
m.Range(func(key, value interface{}) bool {
fmt.Printf(" %s: %d\n", key, *value.(*int64))
return true
})
}
func rangeMap() {
fmt.Println("\n=== Range Iteration ===")
// Create and populate a map
var m sync.Map
for i := 1; i <= 5; i++ {
key := fmt.Sprintf("key%d", i)
m.Store(key, i*10)
}
// Iterate over all key-value pairs
fmt.Println("Map contents:")
m.Range(func(key, value interface{}) bool {
fmt.Printf(" %v: %v\n", key, value)
return true // continue iteration
})
// Selective iteration (stop after finding a specific key)
fmt.Println("Searching for key3:")
found := false
m.Range(func(key, value interface{}) bool {
fmt.Printf(" Checking %v\n", key)
if key.(string) == "key3" {
fmt.Printf(" Found key3 with value %v\n", value)
found = true
return false // stop iteration
}
return true // continue iteration
})
if !found {
fmt.Println(" key3 not found")
}
}
func main() {
mapPatterns()
}
Key insights about Map:
- Optimized Use Cases: Sync.Map is optimized for cases where keys are stable (few deletions) and either:
- Each key is accessed by only one goroutine (partitioned access)
- Many goroutines read and few write (read-heavy workloads)
- No Length Method: Unlike built-in maps, sync.Map doesn’t provide a length method.
- Type Assertions Required: Values are stored as interface{}, requiring type assertions when retrieved.
- Atomic Operations: LoadOrStore provides an atomic “check and set” operation.
Internally, sync.Map uses a clever combination of an immutable read-only map and a dirty map with a mutex. This design allows for lock-free reads in the common case, falling back to the dirty map only when necessary.