Distributed Coordination Patterns
In distributed systems, coordinating activities across multiple nodes is a common challenge. Go’s concurrency primitives can be extended to implement sophisticated coordination patterns.
Distributed Mutex with etcd
This example demonstrates implementing a distributed mutex using etcd:
package main
import (
"context"
"fmt"
"log"
"os"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
// DistributedMutex wraps etcd's mutex for distributed locking
type DistributedMutex struct {
client *clientv3.Client
session *concurrency.Session
mutex *concurrency.Mutex
lockPath string
nodeID string
isLocked bool
lockCount int
}
// NewDistributedMutex creates a new distributed mutex
func NewDistributedMutex(endpoints []string, lockPath string) (*DistributedMutex, error) {
// Create etcd client
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
// Generate a unique node ID
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown-host"
}
nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())
// Create a session with keep-alive
session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
if err != nil {
client.Close()
return nil, fmt.Errorf("failed to create etcd session: %w", err)
}
// Create the mutex
mutex := concurrency.NewMutex(session, lockPath)
return &DistributedMutex{
client: client,
session: session,
mutex: mutex,
lockPath: lockPath,
nodeID: nodeID,
isLocked: false,
}, nil
}
// Lock acquires the distributed mutex
func (dm *DistributedMutex) Lock(ctx context.Context) error {
if dm.isLocked {
dm.lockCount++
return nil // Already locked by this instance
}
// Try to acquire the lock
if err := dm.mutex.Lock(ctx); err != nil {
return fmt.Errorf("failed to acquire lock: %w", err)
}
dm.isLocked = true
dm.lockCount = 1
log.Printf("Node %s acquired lock on %s", dm.nodeID, dm.lockPath)
return nil
}
// Unlock releases the distributed mutex
func (dm *DistributedMutex) Unlock(ctx context.Context) error {
if !dm.isLocked {
return fmt.Errorf("mutex is not locked")
}
dm.lockCount--
if dm.lockCount > 0 {
return nil // Still held by other operations
}
// Release the lock
if err := dm.mutex.Unlock(ctx); err != nil {
return fmt.Errorf("failed to release lock: %w", err)
}
dm.isLocked = false
log.Printf("Node %s released lock on %s", dm.nodeID, dm.lockPath)
return nil
}
// Close releases resources
func (dm *DistributedMutex) Close() {
if dm.isLocked {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = dm.Unlock(ctx)
}
dm.session.Close()
dm.client.Close()
}
// IsLocked returns whether this instance holds the lock
func (dm *DistributedMutex) IsLocked() bool {
return dm.isLocked
}
// GetLockOwner returns the current owner of the lock
func (dm *DistributedMutex) GetLockOwner(ctx context.Context) (string, error) {
resp, err := dm.client.Get(ctx, dm.lockPath, clientv3.WithPrefix())
if err != nil {
return "", fmt.Errorf("failed to get lock info: %w", err)
}
if len(resp.Kvs) == 0 {
return "", nil // No lock owner
}
return string(resp.Kvs[0].Value), nil
}
func main() {
// Connect to etcd
endpoints := []string{"localhost:2379"}
lockPath := "/locks/my-critical-section"
// Create the distributed mutex
mutex, err := NewDistributedMutex(endpoints, lockPath)
if err != nil {
log.Fatalf("Failed to create distributed mutex: %v", err)
}
defer mutex.Close()
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Try to acquire the lock
log.Printf("Attempting to acquire lock...")
if err := mutex.Lock(ctx); err != nil {
log.Fatalf("Failed to acquire lock: %v", err)
}
// Simulate doing work while holding the lock
log.Printf("Lock acquired! Performing critical section work...")
time.Sleep(5 * time.Second)
// Release the lock
if err := mutex.Unlock(ctx); err != nil {
log.Fatalf("Failed to release lock: %v", err)
}
log.Printf("Lock released")
}
Leader Election Pattern
Leader election is essential for coordinating distributed systems:
package main
import (
"context"
"fmt"
"log"
"os"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
// LeaderElection manages the leader election process
type LeaderElection struct {
client *clientv3.Client
session *concurrency.Session
election *concurrency.Election
nodeID string
electionID string
isLeader bool
leaderCh chan bool
stopCh chan struct{}
}
// NewLeaderElection creates a new leader election instance
func NewLeaderElection(endpoints []string, electionID string) (*LeaderElection, error) {
// Create etcd client
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
// Generate a unique node ID
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown-host"
}
nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())
// Create a session with keep-alive
session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
if err != nil {
client.Close()
return nil, fmt.Errorf("failed to create etcd session: %w", err)
}
// Create the election
election := concurrency.NewElection(session, electionID)
return &LeaderElection{
client: client,
session: session,
election: election,
nodeID: nodeID,
electionID: electionID,
isLeader: false,
leaderCh: make(chan bool, 1),
stopCh: make(chan struct{}),
}, nil
}
// Campaign starts the leader election process
func (le *LeaderElection) Campaign(ctx context.Context) error {
// Start campaigning for leadership
if err := le.election.Campaign(ctx, le.nodeID); err != nil {
return fmt.Errorf("failed to campaign: %w", err)
}
le.isLeader = true
le.leaderCh <- true
log.Printf("Node %s became leader for %s", le.nodeID, le.electionID)
return nil
}
// Resign gives up leadership
func (le *LeaderElection) Resign(ctx context.Context) error {
if !le.isLeader {
return nil // Not the leader
}
if err := le.election.Resign(ctx); err != nil {
return fmt.Errorf("failed to resign: %w", err)
}
le.isLeader = false
le.leaderCh <- false
log.Printf("Node %s resigned leadership for %s", le.nodeID, le.electionID)
return nil
}
// IsLeader returns whether this node is the leader
func (le *LeaderElection) IsLeader() bool {
return le.isLeader
}
// LeaderChanges returns a channel that receives leadership change notifications
func (le *LeaderElection) LeaderChanges() <-chan bool {
return le.leaderCh
}
// WatchLeader watches for leader changes
func (le *LeaderElection) WatchLeader(ctx context.Context) {
go func() {
defer close(le.leaderCh)
for {
select {
case <-le.stopCh:
return
case <-ctx.Done():
return
default:
// Get the current leader
resp, err := le.election.Leader(ctx)
if err != nil {
log.Printf("Error getting leader: %v", err)
time.Sleep(1 * time.Second)
continue
}
currentLeader := string(resp.Kvs[0].Value)
isLeader := currentLeader == le.nodeID
// If leadership status changed, notify
if isLeader != le.isLeader {
le.isLeader = isLeader
le.leaderCh <- isLeader
if isLeader {
log.Printf("Node %s became leader for %s", le.nodeID, le.electionID)
} else {
log.Printf("Node %s lost leadership for %s", le.nodeID, le.electionID)
}
}
// Watch for changes
watchCh := le.client.Watch(ctx, string(resp.Kvs[0].Key))
select {
case <-watchCh:
// Leader changed, loop and check again
case <-le.stopCh:
return
case <-ctx.Done():
return
}
}
}
}()
}
// Close releases resources
func (le *LeaderElection) Close() {
close(le.stopCh)
if le.isLeader {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = le.Resign(ctx)
}
le.session.Close()
le.client.Close()
}
// GetCurrentLeader returns the current leader's ID
func (le *LeaderElection) GetCurrentLeader(ctx context.Context) (string, error) {
resp, err := le.election.Leader(ctx)
if err != nil {
return "", fmt.Errorf("failed to get leader: %w", err)
}
if len(resp.Kvs) == 0 {
return "", nil // No leader
}
return string(resp.Kvs[0].Value), nil
}
func main() {
// Connect to etcd
endpoints := []string{"localhost:2379"}
electionID := "/elections/my-service-leader"
// Create the leader election
election, err := NewLeaderElection(endpoints, electionID)
if err != nil {
log.Fatalf("Failed to create leader election: %v", err)
}
defer election.Close()
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Start watching for leader changes
election.WatchLeader(ctx)
// Start campaigning for leadership
log.Printf("Node %s starting campaign...", election.nodeID)
go func() {
if err := election.Campaign(ctx); err != nil {
log.Printf("Campaign error: %v", err)
}
}()
// Handle leadership changes
for isLeader := range election.LeaderChanges() {
if isLeader {
log.Printf("I am now the leader! Starting leader tasks...")
// Perform leader-specific work
time.Sleep(10 * time.Second)
// Simulate stepping down
log.Printf("Resigning leadership...")
if err := election.Resign(ctx); err != nil {
log.Printf("Error resigning: %v", err)
}
} else {
log.Printf("I am now a follower. Waiting for leadership...")
// Perform follower-specific work
// After some time, campaign again
time.Sleep(5 * time.Second)
log.Printf("Starting new campaign...")
go func() {
if err := election.Campaign(ctx); err != nil {
log.Printf("Campaign error: %v", err)
}
}()
}
}
}
Distributed Semaphore
A distributed semaphore allows limiting concurrent access across multiple nodes:
package main
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
// DistributedSemaphore implements a semaphore that works across multiple nodes
type DistributedSemaphore struct {
client *clientv3.Client
session *concurrency.Session
semPath string
nodeID string
count int
resources map[string]struct{}
mu sync.Mutex
}
// NewDistributedSemaphore creates a new distributed semaphore
func NewDistributedSemaphore(endpoints []string, semPath string, count int) (*DistributedSemaphore, error) {
// Create etcd client
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
// Generate a unique node ID
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown-host"
}
nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())
// Create a session with keep-alive
session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
if err != nil {
client.Close()
return nil, fmt.Errorf("failed to create etcd session: %w", err)
}
return &DistributedSemaphore{
client: client,
session: session,
semPath: semPath,
nodeID: nodeID,
count: count,
resources: make(map[string]struct{}),
}, nil
}
// Acquire attempts to acquire a resource from the semaphore
func (ds *DistributedSemaphore) Acquire(ctx context.Context) (string, error) {
ds.mu.Lock()
defer ds.mu.Unlock()
// Generate a unique resource ID
resourceID := fmt.Sprintf("%s/%s-%d", ds.semPath, ds.nodeID, time.Now().UnixNano())
// Try to create the resource key with a lease
_, err := ds.client.Put(ctx, resourceID, ds.nodeID, clientv3.WithLease(ds.session.Lease()))
if err != nil {
return "", fmt.Errorf("failed to create resource: %w", err)
}
// Get all resources to check if we're within the limit
resp, err := ds.client.Get(ctx, ds.semPath, clientv3.WithPrefix())
if err != nil {
// Try to clean up
_, _ = ds.client.Delete(ctx, resourceID)
return "", fmt.Errorf("failed to get resources: %w", err)
}
// Check if we're within the semaphore limit
if len(resp.Kvs) > ds.count {
// We need to release our resource and wait
_, _ = ds.client.Delete(ctx, resourceID)
return "", fmt.Errorf("semaphore limit reached")
}
// We successfully acquired a resource
ds.resources[resourceID] = struct{}{}
log.Printf("Node %s acquired resource %s", ds.nodeID, resourceID)
return resourceID, nil
}
// Release releases a previously acquired resource
func (ds *DistributedSemaphore) Release(ctx context.Context, resourceID string) error {
ds.mu.Lock()
defer ds.mu.Unlock()
// Check if we own this resource
if _, exists := ds.resources[resourceID]; !exists {
return fmt.Errorf("resource not owned by this semaphore instance")
}
// Delete the resource key
_, err := ds.client.Delete(ctx, resourceID)
if err != nil {
return fmt.Errorf("failed to delete resource: %w", err)
}
// Remove from our local tracking
delete(ds.resources, resourceID)
log.Printf("Node %s released resource %s", ds.nodeID, resourceID)
return nil
}
// TryAcquireWithTimeout attempts to acquire a resource with a timeout
func (ds *DistributedSemaphore) TryAcquireWithTimeout(ctx context.Context, timeout time.Duration) (string, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// Try to acquire immediately first
resourceID, err := ds.Acquire(ctx)
if err == nil {
return resourceID, nil
}
// If that fails, retry with exponential backoff
backoff := 50 * time.Millisecond
maxBackoff := 1 * time.Second
for {
select {
case <-timeoutCtx.Done():
return "", fmt.Errorf("timeout waiting for semaphore: %w", timeoutCtx.Err())
case <-time.After(backoff):
// Try again
resourceID, err := ds.Acquire(ctx)
if err == nil {
return resourceID, nil
}
// Increase backoff for next attempt
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
}
// Close releases all resources and cleans up
func (ds *DistributedSemaphore) Close() {
ds.mu.Lock()
defer ds.mu.Unlock()
// Release all acquired resources
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for resourceID := range ds.resources {
_, _ = ds.client.Delete(ctx, resourceID)
delete(ds.resources, resourceID)
}
ds.session.Close()
ds.client.Close()
}
// GetAvailable returns the number of available resources
func (ds *DistributedSemaphore) GetAvailable(ctx context.Context) (int, error) {
resp, err := ds.client.Get(ctx, ds.semPath, clientv3.WithPrefix())
if err != nil {
return 0, fmt.Errorf("failed to get resources: %w", err)
}
return ds.count - len(resp.Kvs), nil
}
func main() {
// Connect to etcd
endpoints := []string{"localhost:2379"}
semPath := "/semaphores/connection-limit"
maxConnections := 3
// Create the distributed semaphore
sem, err := NewDistributedSemaphore(endpoints, semPath, maxConnections)
if err != nil {
log.Fatalf("Failed to create distributed semaphore: %v", err)
}
defer sem.Close()
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Simulate multiple workers trying to acquire resources
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
log.Printf("Worker %d trying to acquire resource...", workerID)
resourceID, err := sem.TryAcquireWithTimeout(ctx, 5*time.Second)
if err != nil {
log.Printf("Worker %d failed to acquire resource: %v", workerID, err)
return
}
log.Printf("Worker %d acquired resource %s", workerID, resourceID)
// Simulate doing work
time.Sleep(2 * time.Second)
// Release the resource
if err := sem.Release(ctx, resourceID); err != nil {
log.Printf("Worker %d failed to release resource: %v", workerID, err)
} else {
log.Printf("Worker %d released resource %s", workerID, resourceID)
}
}(i)
}
// Wait for all workers to finish
wg.Wait()
}