Advanced Synchronization Patterns
Beyond the basic primitives, Go enables sophisticated synchronization patterns by combining multiple primitives and leveraging channels.
Semaphore Implementation
Semaphores are not provided directly in the sync package but can be implemented using channels or a combination of mutex and condition variable:
package main
import (
"context"
"fmt"
"sync"
"time"
)
// ChannelSemaphore implements a semaphore using a buffered channel
type ChannelSemaphore struct {
permits chan struct{}
}
// NewChannelSemaphore creates a new semaphore with the given number of permits
func NewChannelSemaphore(n int) *ChannelSemaphore {
return &ChannelSemaphore{
permits: make(chan struct{}, n),
}
}
// Acquire acquires a permit, blocking until one is available
func (s *ChannelSemaphore) Acquire() {
s.permits <- struct{}{}
}
// TryAcquire attempts to acquire a permit without blocking
func (s *ChannelSemaphore) TryAcquire() bool {
select {
case s.permits <- struct{}{}:
return true
default:
return false
}
}
// AcquireWithTimeout attempts to acquire a permit with a timeout
func (s *ChannelSemaphore) AcquireWithTimeout(timeout time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case s.permits <- struct{}{}:
return true
case <-ctx.Done():
return false
}
}
// Release releases a permit
func (s *ChannelSemaphore) Release() {
select {
case <-s.permits:
default:
panic("Semaphore release without acquire")
}
}
// MutexSemaphore implements a semaphore using mutex and condition variable
type MutexSemaphore struct {
mu sync.Mutex
cond *sync.Cond
count int
maxSize int
}
// NewMutexSemaphore creates a new semaphore with the given number of permits
func NewMutexSemaphore(n int) *MutexSemaphore {
s := &MutexSemaphore{
maxSize: n,
count: n,
}
s.cond = sync.NewCond(&s.mu)
return s
}
// Acquire acquires a permit, blocking until one is available
func (s *MutexSemaphore) Acquire() {
s.mu.Lock()
defer s.mu.Unlock()
for s.count == 0 {
s.cond.Wait()
}
s.count--
}
// TryAcquire attempts to acquire a permit without blocking
func (s *MutexSemaphore) TryAcquire() bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.count == 0 {
return false
}
s.count--
return true
}
// Release releases a permit
func (s *MutexSemaphore) Release() {
s.mu.Lock()
defer s.mu.Unlock()
if s.count >= s.maxSize {
panic("Semaphore release without acquire")
}
s.count++
s.cond.Signal()
}
func demonstrateSemaphores() {
fmt.Println("\n=== Channel-based Semaphore ===")
// Create a semaphore with 3 permits
sem := NewChannelSemaphore(3)
// Launch workers that need to acquire the semaphore
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d: Attempting to acquire semaphore\n", id)
// Try with timeout first
if sem.AcquireWithTimeout(time.Duration(id) * 50 * time.Millisecond) {
fmt.Printf("Worker %d: Acquired with timeout\n", id)
} else {
fmt.Printf("Worker %d: Timeout expired, acquiring with blocking call\n", id)
sem.Acquire()
fmt.Printf("Worker %d: Acquired after blocking\n", id)
}
// Simulate work
fmt.Printf("Worker %d: Working...\n", id)
time.Sleep(200 * time.Millisecond)
// Release the semaphore
fmt.Printf("Worker %d: Releasing semaphore\n", id)
sem.Release()
}(i)
// Small delay between starting workers
time.Sleep(10 * time.Millisecond)
}
wg.Wait()
fmt.Println("\n=== Mutex-based Semaphore ===")
// Create a mutex-based semaphore with 2 permits
mutexSem := NewMutexSemaphore(2)
// Launch workers
for i := 1; i <= 4; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d: Attempting to acquire mutex semaphore\n", id)
// Try non-blocking acquire first
if mutexSem.TryAcquire() {
fmt.Printf("Worker %d: Acquired without blocking\n", id)
} else {
fmt.Printf("Worker %d: Could not acquire immediately, blocking...\n", id)
mutexSem.Acquire()
fmt.Printf("Worker %d: Acquired after blocking\n", id)
}
// Simulate work
fmt.Printf("Worker %d: Working...\n", id)
time.Sleep(150 * time.Millisecond)
// Release the semaphore
fmt.Printf("Worker %d: Releasing mutex semaphore\n", id)
mutexSem.Release()
}(i)
// Small delay between starting workers
time.Sleep(10 * time.Millisecond)
}
wg.Wait()
}
func main() {
demonstrateSemaphores()
}
This example demonstrates two semaphore implementations:
- Channel-based Semaphore: Uses a buffered channel to represent permits.
- Mutex-based Semaphore: Uses a mutex and condition variable for more complex control.
Both implementations provide similar functionality but have different performance characteristics and capabilities.
Read-Write Preference Control
While sync.RWMutex
gives preference to writers, sometimes we need more control over access patterns:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// ReaderPreferenceRWMutex gives preference to readers over writers
type ReaderPreferenceRWMutex struct {
mu sync.Mutex
readerCount int32
readerWait sync.Cond
writerWait sync.Cond
writing bool
}
// NewReaderPreferenceRWMutex creates a new reader-preference RW mutex
func NewReaderPreferenceRWMutex() *ReaderPreferenceRWMutex {
rw := &ReaderPreferenceRWMutex{}
rw.readerWait.L = &rw.mu
rw.writerWait.L = &rw.mu
return rw
}
// RLock acquires a read lock
func (rw *ReaderPreferenceRWMutex) RLock() {
rw.mu.Lock()
// Wait if someone is writing
for rw.writing {
rw.readerWait.Wait()
}
atomic.AddInt32(&rw.readerCount, 1)
rw.mu.Unlock()
}
// RUnlock releases a read lock
func (rw *ReaderPreferenceRWMutex) RUnlock() {
rw.mu.Lock()
atomic.AddInt32(&rw.readerCount, -1)
// If no more readers, signal a waiting writer
if atomic.LoadInt32(&rw.readerCount) == 0 {
rw.writerWait.Signal()
}
rw.mu.Unlock()
}
// Lock acquires a write lock
func (rw *ReaderPreferenceRWMutex) Lock() {
rw.mu.Lock()
// Wait until there are no readers and no one is writing
for atomic.LoadInt32(&rw.readerCount) > 0 || rw.writing {
rw.writerWait.Wait()
}
rw.writing = true
rw.mu.Unlock()
}
// Unlock releases a write lock
func (rw *ReaderPreferenceRWMutex) Unlock() {
rw.mu.Lock()
rw.writing = false
// Signal all waiting readers (preference to readers)
rw.readerWait.Broadcast()
// Also signal one writer
rw.writerWait.Signal()
rw.mu.Unlock()
}
func demonstrateRWPreference() {
fmt.Println("\n=== Reader Preference RWMutex ===")
// Create a reader-preference RW mutex
rwMutex := NewReaderPreferenceRWMutex()
// Shared resource
var sharedData int
// Track operation counts
var readCount, writeCount int32
// Start time for measuring operations
startTime := time.Now()
endTime := startTime.Add(1 * time.Second)
// Launch reader goroutines
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for time.Now().Before(endTime) {
// Acquire read lock
rwMutex.RLock()
// Read shared data
_ = sharedData
atomic.AddInt32(&readCount, 1)
// Hold lock briefly
time.Sleep(5 * time.Millisecond)
// Release read lock
rwMutex.RUnlock()
// Small pause between reads
time.Sleep(5 * time.Millisecond)
}
}(i)
}
// Launch writer goroutines
for i := 1; i <= 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for time.Now().Before(endTime) {
// Acquire write lock
rwMutex.Lock()
// Update shared data
sharedData++
atomic.AddInt32(&writeCount, 1)
// Hold lock briefly
time.Sleep(10 * time.Millisecond)
// Release write lock
rwMutex.Unlock()
// Small pause between writes
time.Sleep(20 * time.Millisecond)
}
}(i)
}
wg.Wait()
fmt.Printf("Reader operations: %d\n", atomic.LoadInt32(&readCount))
fmt.Printf("Writer operations: %d\n", atomic.LoadInt32(&writeCount))
fmt.Printf("Ratio (reads/writes): %.2f\n",
float64(atomic.LoadInt32(&readCount))/float64(atomic.LoadInt32(&writeCount)))
// Compare with standard RWMutex (which has writer preference)
compareWithStandardRWMutex()
}
func compareWithStandardRWMutex() {
fmt.Println("\n=== Standard RWMutex (Writer Preference) ===")
// Create a standard RW mutex
var rwMutex sync.RWMutex
// Shared resource
var sharedData int
// Track operation counts
var readCount, writeCount int32
// Start time for measuring operations
startTime := time.Now()
endTime := startTime.Add(1 * time.Second)
// Launch reader goroutines
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for time.Now().Before(endTime) {
// Acquire read lock
rwMutex.RLock()
// Read shared data
_ = sharedData
atomic.AddInt32(&readCount, 1)
// Hold lock briefly
time.Sleep(5 * time.Millisecond)
// Release read lock
rwMutex.RUnlock()
// Small pause between reads
time.Sleep(5 * time.Millisecond)
}
}(i)
}
// Launch writer goroutines
for i := 1; i <= 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for time.Now().Before(endTime) {
// Acquire write lock
rwMutex.Lock()
// Update shared data
sharedData++
atomic.AddInt32(&writeCount, 1)
// Hold lock briefly
time.Sleep(10 * time.Millisecond)
// Release write lock
rwMutex.Unlock()
// Small pause between writes
time.Sleep(20 * time.Millisecond)
}
}(i)
}
wg.Wait()
fmt.Printf("Reader operations: %d\n", atomic.LoadInt32(&readCount))
fmt.Printf("Writer operations: %d\n", atomic.LoadInt32(&writeCount))
fmt.Printf("Ratio (reads/writes): %.2f\n",
float64(atomic.LoadInt32(&readCount))/float64(atomic.LoadInt32(&writeCount)))
}
func main() {
demonstrateRWPreference()
}
This example demonstrates:
- Custom RWMutex Implementation: A reader-preference RWMutex that prioritizes readers over writers.
- Performance Comparison: Comparing the custom implementation with the standard RWMutex.
The choice between reader and writer preference depends on the specific workload and requirements of your application.
Atomic Operations and Lock-Free Programming
For simple shared state, atomic operations can provide synchronization without locks:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func atomicPatterns() {
// Basic atomic operations
basicAtomic()
// Compare-and-swap patterns
casPatterns()
// Lock-free counter
lockFreeCounter()
}
func basicAtomic() {
fmt.Println("\n=== Basic Atomic Operations ===")
// Atomic integer operations
var counter int64
// Add
atomic.AddInt64(&counter, 10)
fmt.Printf("After adding 10: %d\n", atomic.LoadInt64(&counter))
// Subtract (add negative)
atomic.AddInt64(&counter, -5)
fmt.Printf("After subtracting 5: %d\n", atomic.LoadInt64(&counter))
// Store
atomic.StoreInt64(&counter, 42)
fmt.Printf("After storing 42: %d\n", atomic.LoadInt64(&counter))
// Swap (returns old value)
old := atomic.SwapInt64(&counter, 100)
fmt.Printf("Swapped %d with 100, now: %d\n", old, atomic.LoadInt64(&counter))
}
func casPatterns() {
fmt.Println("\n=== Compare-And-Swap Patterns ===")
var value int64 = 100
// Basic CAS
swapped := atomic.CompareAndSwapInt64(&value, 100, 200)
fmt.Printf("CAS 100->200: success=%v, value=%d\n", swapped, atomic.LoadInt64(&value))
// Failed CAS (wrong expected value)
swapped = atomic.CompareAndSwapInt64(&value, 100, 300)
fmt.Printf("CAS 100->300: success=%v, value=%d\n", swapped, atomic.LoadInt64(&value))
// Increment with CAS (retry loop)
incrementWithCAS := func(addr *int64, delta int64) {
for {
old := atomic.LoadInt64(addr)
new := old + delta
if atomic.CompareAndSwapInt64(addr, old, new) {
return
}
// If CAS failed, loop and try again
}
}
incrementWithCAS(&value, 50)
fmt.Printf("After CAS increment by 50: %d\n", atomic.LoadInt64(&value))
}
// LockFreeCounter is a counter that can be incremented concurrently without locks
type LockFreeCounter struct {
value uint64
}
// Increment atomically increments the counter and returns the new value
func (c *LockFreeCounter) Increment() uint64 {
return atomic.AddUint64(&c.value, 1)
}
// Load atomically loads the counter value
func (c *LockFreeCounter) Load() uint64 {
return atomic.LoadUint64(&c.value)
}
// LockBasedCounter is a counter that uses a mutex for synchronization
type LockBasedCounter struct {
mu sync.Mutex
value uint64
}
// Increment increments the counter and returns the new value
func (c *LockBasedCounter) Increment() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
return c.value
}
// Load returns the counter value
func (c *LockBasedCounter) Load() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func lockFreeCounter() {
fmt.Println("\n=== Lock-Free vs. Lock-Based Counter Benchmark ===")
// Create counters
lockFree := &LockFreeCounter{}
lockBased := &LockBasedCounter{}
// Benchmark function
benchmark := func(name string, counter interface{}, goroutines, iterations int) time.Duration {
var wg sync.WaitGroup
// Create a function that increments the appropriate counter
var incrementFn func()
switch c := counter.(type) {
case *LockFreeCounter:
incrementFn = func() { c.Increment() }
case *LockBasedCounter:
incrementFn = func() { c.Increment() }
}
// Start timing
start := time.Now()
// Launch goroutines
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Perform increments
for j := 0; j < iterations; j++ {
incrementFn()
}
}()
}
// Wait for all goroutines to complete
wg.Wait()
// Return elapsed time
return time.Since(start)
}
// Run benchmarks
goroutines := 4
iterations := 1000000
lockFreeTime := benchmark("Lock-Free", lockFree, goroutines, iterations)
lockBasedTime := benchmark("Lock-Based", lockBased, goroutines, iterations)
// Verify results
fmt.Printf("Lock-Free Counter: %d\n", lockFree.Load())
fmt.Printf("Lock-Based Counter: %d\n", lockBased.Load())
// Compare performance
fmt.Printf("Lock-Free Time: %v\n", lockFreeTime)
fmt.Printf("Lock-Based Time: %v\n", lockBasedTime)
fmt.Printf("Performance Ratio: %.2fx\n", float64(lockBasedTime)/float64(lockFreeTime))
}
func main() {
atomicPatterns()
}
Key insights about atomic operations:
- Hardware Support: Atomic operations are implemented using special CPU instructions.
- No Locks Required: They provide synchronization without the overhead of locks.
- Limited Operations: Only simple operations like load, store, add, and compare-and-swap are available.
- Performance Advantage: Atomic operations are significantly faster than mutex-based synchronization for simple cases.
Lock-free programming using atomic operations can provide substantial performance benefits but requires careful design to ensure correctness.