Distributed Coordination Patterns

In distributed systems, coordinating activities across multiple nodes is a common challenge. Go’s concurrency primitives can be extended to implement sophisticated coordination patterns.

Distributed Mutex with etcd

This example demonstrates implementing a distributed mutex using etcd:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

// DistributedMutex wraps etcd's mutex for distributed locking
type DistributedMutex struct {
	client    *clientv3.Client
	session   *concurrency.Session
	mutex     *concurrency.Mutex
	lockPath  string
	nodeID    string
	isLocked  bool
	lockCount int
}

// NewDistributedMutex creates a new distributed mutex
func NewDistributedMutex(endpoints []string, lockPath string) (*DistributedMutex, error) {
	// Create etcd client
	client, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to create etcd client: %w", err)
	}

	// Generate a unique node ID
	hostname, err := os.Hostname()
	if err != nil {
		hostname = "unknown-host"
	}
	nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())

	// Create a session with keep-alive
	session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
	if err != nil {
		client.Close()
		return nil, fmt.Errorf("failed to create etcd session: %w", err)
	}

	// Create the mutex
	mutex := concurrency.NewMutex(session, lockPath)

	return &DistributedMutex{
		client:   client,
		session:  session,
		mutex:    mutex,
		lockPath: lockPath,
		nodeID:   nodeID,
		isLocked: false,
	}, nil
}

// Lock acquires the distributed mutex
func (dm *DistributedMutex) Lock(ctx context.Context) error {
	if dm.isLocked {
		dm.lockCount++
		return nil // Already locked by this instance
	}

	// Try to acquire the lock
	if err := dm.mutex.Lock(ctx); err != nil {
		return fmt.Errorf("failed to acquire lock: %w", err)
	}

	dm.isLocked = true
	dm.lockCount = 1
	log.Printf("Node %s acquired lock on %s", dm.nodeID, dm.lockPath)
	return nil
}

// Unlock releases the distributed mutex
func (dm *DistributedMutex) Unlock(ctx context.Context) error {
	if !dm.isLocked {
		return fmt.Errorf("mutex is not locked")
	}

	dm.lockCount--
	if dm.lockCount > 0 {
		return nil // Still held by other operations
	}

	// Release the lock
	if err := dm.mutex.Unlock(ctx); err != nil {
		return fmt.Errorf("failed to release lock: %w", err)
	}

	dm.isLocked = false
	log.Printf("Node %s released lock on %s", dm.nodeID, dm.lockPath)
	return nil
}

// Close releases resources
func (dm *DistributedMutex) Close() {
	if dm.isLocked {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		_ = dm.Unlock(ctx)
	}
	dm.session.Close()
	dm.client.Close()
}

// IsLocked returns whether this instance holds the lock
func (dm *DistributedMutex) IsLocked() bool {
	return dm.isLocked
}

// GetLockOwner returns the current owner of the lock
func (dm *DistributedMutex) GetLockOwner(ctx context.Context) (string, error) {
	resp, err := dm.client.Get(ctx, dm.lockPath, clientv3.WithPrefix())
	if err != nil {
		return "", fmt.Errorf("failed to get lock info: %w", err)
	}

	if len(resp.Kvs) == 0 {
		return "", nil // No lock owner
	}

	return string(resp.Kvs[0].Value), nil
}

func main() {
	// Connect to etcd
	endpoints := []string{"localhost:2379"}
	lockPath := "/locks/my-critical-section"

	// Create the distributed mutex
	mutex, err := NewDistributedMutex(endpoints, lockPath)
	if err != nil {
		log.Fatalf("Failed to create distributed mutex: %v", err)
	}
	defer mutex.Close()

	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// Try to acquire the lock
	log.Printf("Attempting to acquire lock...")
	if err := mutex.Lock(ctx); err != nil {
		log.Fatalf("Failed to acquire lock: %v", err)
	}

	// Simulate doing work while holding the lock
	log.Printf("Lock acquired! Performing critical section work...")
	time.Sleep(5 * time.Second)

	// Release the lock
	if err := mutex.Unlock(ctx); err != nil {
		log.Fatalf("Failed to release lock: %v", err)
	}
	log.Printf("Lock released")
}

Leader Election Pattern

Leader election is essential for coordinating distributed systems:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

// LeaderElection manages the leader election process
type LeaderElection struct {
	client     *clientv3.Client
	session    *concurrency.Session
	election   *concurrency.Election
	nodeID     string
	electionID string
	isLeader   bool
	leaderCh   chan bool
	stopCh     chan struct{}
}

// NewLeaderElection creates a new leader election instance
func NewLeaderElection(endpoints []string, electionID string) (*LeaderElection, error) {
	// Create etcd client
	client, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to create etcd client: %w", err)
	}

	// Generate a unique node ID
	hostname, err := os.Hostname()
	if err != nil {
		hostname = "unknown-host"
	}
	nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())

	// Create a session with keep-alive
	session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
	if err != nil {
		client.Close()
		return nil, fmt.Errorf("failed to create etcd session: %w", err)
	}

	// Create the election
	election := concurrency.NewElection(session, electionID)

	return &LeaderElection{
		client:     client,
		session:    session,
		election:   election,
		nodeID:     nodeID,
		electionID: electionID,
		isLeader:   false,
		leaderCh:   make(chan bool, 1),
		stopCh:     make(chan struct{}),
	}, nil
}

// Campaign starts the leader election process
func (le *LeaderElection) Campaign(ctx context.Context) error {
	// Start campaigning for leadership
	if err := le.election.Campaign(ctx, le.nodeID); err != nil {
		return fmt.Errorf("failed to campaign: %w", err)
	}

	le.isLeader = true
	le.leaderCh <- true
	log.Printf("Node %s became leader for %s", le.nodeID, le.electionID)
	return nil
}

// Resign gives up leadership
func (le *LeaderElection) Resign(ctx context.Context) error {
	if !le.isLeader {
		return nil // Not the leader
	}

	if err := le.election.Resign(ctx); err != nil {
		return fmt.Errorf("failed to resign: %w", err)
	}

	le.isLeader = false
	le.leaderCh <- false
	log.Printf("Node %s resigned leadership for %s", le.nodeID, le.electionID)
	return nil
}

// IsLeader returns whether this node is the leader
func (le *LeaderElection) IsLeader() bool {
	return le.isLeader
}

// LeaderChanges returns a channel that receives leadership change notifications
func (le *LeaderElection) LeaderChanges() <-chan bool {
	return le.leaderCh
}

// WatchLeader watches for leader changes
func (le *LeaderElection) WatchLeader(ctx context.Context) {
	go func() {
		defer close(le.leaderCh)

		for {
			select {
			case <-le.stopCh:
				return
			case <-ctx.Done():
				return
			default:
				// Get the current leader
				resp, err := le.election.Leader(ctx)
				if err != nil {
					log.Printf("Error getting leader: %v", err)
					time.Sleep(1 * time.Second)
					continue
				}

				currentLeader := string(resp.Kvs[0].Value)
				isLeader := currentLeader == le.nodeID

				// If leadership status changed, notify
				if isLeader != le.isLeader {
					le.isLeader = isLeader
					le.leaderCh <- isLeader
					if isLeader {
						log.Printf("Node %s became leader for %s", le.nodeID, le.electionID)
					} else {
						log.Printf("Node %s lost leadership for %s", le.nodeID, le.electionID)
					}
				}

				// Watch for changes
				watchCh := le.client.Watch(ctx, string(resp.Kvs[0].Key))
				select {
				case <-watchCh:
					// Leader changed, loop and check again
				case <-le.stopCh:
					return
				case <-ctx.Done():
					return
				}
			}
		}
	}()
}

// Close releases resources
func (le *LeaderElection) Close() {
	close(le.stopCh)
	if le.isLeader {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		_ = le.Resign(ctx)
	}
	le.session.Close()
	le.client.Close()
}

// GetCurrentLeader returns the current leader's ID
func (le *LeaderElection) GetCurrentLeader(ctx context.Context) (string, error) {
	resp, err := le.election.Leader(ctx)
	if err != nil {
		return "", fmt.Errorf("failed to get leader: %w", err)
	}

	if len(resp.Kvs) == 0 {
		return "", nil // No leader
	}

	return string(resp.Kvs[0].Value), nil
}

func main() {
	// Connect to etcd
	endpoints := []string{"localhost:2379"}
	electionID := "/elections/my-service-leader"

	// Create the leader election
	election, err := NewLeaderElection(endpoints, electionID)
	if err != nil {
		log.Fatalf("Failed to create leader election: %v", err)
	}
	defer election.Close()

	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	// Start watching for leader changes
	election.WatchLeader(ctx)

	// Start campaigning for leadership
	log.Printf("Node %s starting campaign...", election.nodeID)
	go func() {
		if err := election.Campaign(ctx); err != nil {
			log.Printf("Campaign error: %v", err)
		}
	}()

	// Handle leadership changes
	for isLeader := range election.LeaderChanges() {
		if isLeader {
			log.Printf("I am now the leader! Starting leader tasks...")
			// Perform leader-specific work
			time.Sleep(10 * time.Second)
			
			// Simulate stepping down
			log.Printf("Resigning leadership...")
			if err := election.Resign(ctx); err != nil {
				log.Printf("Error resigning: %v", err)
			}
		} else {
			log.Printf("I am now a follower. Waiting for leadership...")
			// Perform follower-specific work
			
			// After some time, campaign again
			time.Sleep(5 * time.Second)
			log.Printf("Starting new campaign...")
			go func() {
				if err := election.Campaign(ctx); err != nil {
					log.Printf("Campaign error: %v", err)
				}
			}()
		}
	}
}

Distributed Semaphore

A distributed semaphore allows limiting concurrent access across multiple nodes:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"sync"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

// DistributedSemaphore implements a semaphore that works across multiple nodes
type DistributedSemaphore struct {
	client    *clientv3.Client
	session   *concurrency.Session
	semPath   string
	nodeID    string
	count     int
	resources map[string]struct{}
	mu        sync.Mutex
}

// NewDistributedSemaphore creates a new distributed semaphore
func NewDistributedSemaphore(endpoints []string, semPath string, count int) (*DistributedSemaphore, error) {
	// Create etcd client
	client, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to create etcd client: %w", err)
	}

	// Generate a unique node ID
	hostname, err := os.Hostname()
	if err != nil {
		hostname = "unknown-host"
	}
	nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())

	// Create a session with keep-alive
	session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
	if err != nil {
		client.Close()
		return nil, fmt.Errorf("failed to create etcd session: %w", err)
	}

	return &DistributedSemaphore{
		client:    client,
		session:   session,
		semPath:   semPath,
		nodeID:    nodeID,
		count:     count,
		resources: make(map[string]struct{}),
	}, nil
}

// Acquire attempts to acquire a resource from the semaphore
func (ds *DistributedSemaphore) Acquire(ctx context.Context) (string, error) {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	// Generate a unique resource ID
	resourceID := fmt.Sprintf("%s/%s-%d", ds.semPath, ds.nodeID, time.Now().UnixNano())

	// Try to create the resource key with a lease
	_, err := ds.client.Put(ctx, resourceID, ds.nodeID, clientv3.WithLease(ds.session.Lease()))
	if err != nil {
		return "", fmt.Errorf("failed to create resource: %w", err)
	}

	// Get all resources to check if we're within the limit
	resp, err := ds.client.Get(ctx, ds.semPath, clientv3.WithPrefix())
	if err != nil {
		// Try to clean up
		_, _ = ds.client.Delete(ctx, resourceID)
		return "", fmt.Errorf("failed to get resources: %w", err)
	}

	// Check if we're within the semaphore limit
	if len(resp.Kvs) > ds.count {
		// We need to release our resource and wait
		_, _ = ds.client.Delete(ctx, resourceID)
		return "", fmt.Errorf("semaphore limit reached")
	}

	// We successfully acquired a resource
	ds.resources[resourceID] = struct{}{}
	log.Printf("Node %s acquired resource %s", ds.nodeID, resourceID)
	return resourceID, nil
}

// Release releases a previously acquired resource
func (ds *DistributedSemaphore) Release(ctx context.Context, resourceID string) error {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	// Check if we own this resource
	if _, exists := ds.resources[resourceID]; !exists {
		return fmt.Errorf("resource not owned by this semaphore instance")
	}

	// Delete the resource key
	_, err := ds.client.Delete(ctx, resourceID)
	if err != nil {
		return fmt.Errorf("failed to delete resource: %w", err)
	}

	// Remove from our local tracking
	delete(ds.resources, resourceID)
	log.Printf("Node %s released resource %s", ds.nodeID, resourceID)
	return nil
}

// TryAcquireWithTimeout attempts to acquire a resource with a timeout
func (ds *DistributedSemaphore) TryAcquireWithTimeout(ctx context.Context, timeout time.Duration) (string, error) {
	timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()

	// Try to acquire immediately first
	resourceID, err := ds.Acquire(ctx)
	if err == nil {
		return resourceID, nil
	}

	// If that fails, retry with exponential backoff
	backoff := 50 * time.Millisecond
	maxBackoff := 1 * time.Second

	for {
		select {
		case <-timeoutCtx.Done():
			return "", fmt.Errorf("timeout waiting for semaphore: %w", timeoutCtx.Err())
		case <-time.After(backoff):
			// Try again
			resourceID, err := ds.Acquire(ctx)
			if err == nil {
				return resourceID, nil
			}

			// Increase backoff for next attempt
			backoff *= 2
			if backoff > maxBackoff {
				backoff = maxBackoff
			}
		}
	}
}

// Close releases all resources and cleans up
func (ds *DistributedSemaphore) Close() {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	// Release all acquired resources
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	for resourceID := range ds.resources {
		_, _ = ds.client.Delete(ctx, resourceID)
		delete(ds.resources, resourceID)
	}

	ds.session.Close()
	ds.client.Close()
}

// GetAvailable returns the number of available resources
func (ds *DistributedSemaphore) GetAvailable(ctx context.Context) (int, error) {
	resp, err := ds.client.Get(ctx, ds.semPath, clientv3.WithPrefix())
	if err != nil {
		return 0, fmt.Errorf("failed to get resources: %w", err)
	}

	return ds.count - len(resp.Kvs), nil
}

func main() {
	// Connect to etcd
	endpoints := []string{"localhost:2379"}
	semPath := "/semaphores/connection-limit"
	maxConnections := 3

	// Create the distributed semaphore
	sem, err := NewDistributedSemaphore(endpoints, semPath, maxConnections)
	if err != nil {
		log.Fatalf("Failed to create distributed semaphore: %v", err)
	}
	defer sem.Close()

	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// Simulate multiple workers trying to acquire resources
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()

			log.Printf("Worker %d trying to acquire resource...", workerID)
			resourceID, err := sem.TryAcquireWithTimeout(ctx, 5*time.Second)
			if err != nil {
				log.Printf("Worker %d failed to acquire resource: %v", workerID, err)
				return
			}

			log.Printf("Worker %d acquired resource %s", workerID, resourceID)

			// Simulate doing work
			time.Sleep(2 * time.Second)

			// Release the resource
			if err := sem.Release(ctx, resourceID); err != nil {
				log.Printf("Worker %d failed to release resource: %v", workerID, err)
			} else {
				log.Printf("Worker %d released resource %s", workerID, resourceID)
			}
		}(i)
	}

	// Wait for all workers to finish
	wg.Wait()
}