Advanced Channel Patterns

Channels are Go’s primary mechanism for communication between goroutines, but in distributed systems, we need to leverage more sophisticated patterns to handle complex coordination scenarios.

Fan-Out/Fan-In Pattern

The fan-out/fan-in pattern distributes work across multiple goroutines and then collects the results:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// Task represents a unit of work to be processed
type Task struct {
	ID     int
	Input  string
	Result string
	Err    error
}

// fanOut distributes tasks to multiple worker goroutines
func fanOut(ctx context.Context, tasks []Task, workerCount int) <-chan Task {
	tasksCh := make(chan Task)
	
	go func() {
		defer close(tasksCh)
		
		for _, task := range tasks {
			select {
			case tasksCh <- task:
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return tasksCh
}

// processTask simulates processing a task with potential failures
func processTask(ctx context.Context, task Task) Task {
	// Simulate processing time
	select {
	case <-time.After(100 * time.Millisecond):
		task.Result = fmt.Sprintf("Processed: %s", task.Input)
		return task
	case <-ctx.Done():
		task.Err = ctx.Err()
		return task
	}
}

// worker processes tasks from the input channel and sends results to the output channel
func worker(ctx context.Context, id int, tasksCh <-chan Task, resultsCh chan<- Task) {
	for task := range tasksCh {
		select {
		case <-ctx.Done():
			return
		default:
			task.Result = fmt.Sprintf("Worker %d processed: %s", id, task.Input)
			
			// Simulate occasional failures
			if task.ID%7 == 0 {
				task.Err = fmt.Errorf("processing error on task %d", task.ID)
				task.Result = ""
			}
			
			// Send the result
			select {
			case resultsCh <- task:
			case <-ctx.Done():
				return
			}
		}
	}
}

// fanIn collects results from multiple workers into a single channel
func fanIn(ctx context.Context, workerCount int, resultsCh chan Task) <-chan Task {
	multiplexedCh := make(chan Task)
	var wg sync.WaitGroup
	wg.Add(workerCount)
	
	// Start a goroutine for each worker to collect results
	for i := 0; i < workerCount; i++ {
		go func() {
			defer wg.Done()
			for {
				select {
				case result, ok := <-resultsCh:
					if !ok {
						return
					}
					select {
					case multiplexedCh <- result:
					case <-ctx.Done():
						return
					}
				case <-ctx.Done():
					return
				}
			}
		}()
	}
	
	// Close the multiplexed channel once all workers are done
	go func() {
		wg.Wait()
		close(multiplexedCh)
	}()
	
	return multiplexedCh
}

// distributeAndProcess implements the complete fan-out/fan-in pattern
func distributeAndProcess(ctx context.Context, tasks []Task, workerCount int) []Task {
	// Create a buffered channel for results to prevent blocking
	resultsCh := make(chan Task, len(tasks))
	
	// Fan out: distribute tasks to workers
	tasksCh := fanOut(ctx, tasks, workerCount)
	
	// Start workers
	var wg sync.WaitGroup
	wg.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go func(workerID int) {
			defer wg.Done()
			worker(ctx, workerID, tasksCh, resultsCh)
		}(i)
	}
	
	// Close results channel when all workers are done
	go func() {
		wg.Wait()
		close(resultsCh)
	}()
	
	// Fan in: collect results
	collectedResults := make([]Task, 0, len(tasks))
	for result := range resultsCh {
		collectedResults = append(collectedResults, result)
	}
	
	return collectedResults
}

func main() {
	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	// Generate sample tasks
	tasks := make([]Task, 20)
	for i := 0; i < 20; i++ {
		tasks[i] = Task{
			ID:    i,
			Input: fmt.Sprintf("Task %d input", i),
		}
	}
	
	// Process tasks using fan-out/fan-in pattern
	results := distributeAndProcess(ctx, tasks, 5)
	
	// Print results
	fmt.Println("Results:")
	successCount := 0
	failureCount := 0
	
	for _, result := range results {
		if result.Err != nil {
			fmt.Printf("Task %d failed: %v\n", result.ID, result.Err)
			failureCount++
		} else {
			fmt.Printf("Task %d succeeded: %s\n", result.ID, result.Result)
			successCount++
		}
	}
	
	fmt.Printf("\nSummary: %d succeeded, %d failed\n", successCount, failureCount)
}

This pattern is particularly useful for distributed systems where you need to:

  • Process a large number of tasks in parallel
  • Handle failures gracefully
  • Collect and aggregate results efficiently
  • Implement backpressure mechanisms

Multiplexing with Select

In distributed systems, you often need to coordinate multiple channels with different priorities and timeouts:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"
)

// Event represents a message in our system
type Event struct {
	Source  string
	Type    string
	Payload interface{}
	Time    time.Time
}

// EventSource generates events from different parts of a distributed system
func EventSource(ctx context.Context, name string, interval time.Duration, priority int) <-chan Event {
	ch := make(chan Event)
	
	go func() {
		defer close(ch)
		ticker := time.NewTicker(interval)
		defer ticker.Stop()
		
		for {
			select {
			case <-ticker.C:
				// Generate an event
				event := Event{
					Source:  name,
					Type:    fmt.Sprintf("event-type-%d", rand.Intn(3)),
					Payload: fmt.Sprintf("data from %s", name),
					Time:    time.Now(),
				}
				
				// Try to send the event
				select {
				case ch <- event:
					// Event sent successfully
				case <-ctx.Done():
					return
				case <-time.After(50 * time.Millisecond):
					// Couldn't send within timeout, log and continue
					fmt.Printf("Warning: Dropped event from %s due to backpressure\n", name)
				}
				
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return ch
}

// PriorityMultiplexer combines multiple event sources with priority handling
func PriorityMultiplexer(ctx context.Context, sources map[string]<-chan Event, priorities map[string]int) <-chan Event {
	multiplexed := make(chan Event)
	
	go func() {
		defer close(multiplexed)
		
		// Keep track of active sources
		remaining := len(sources)
		
		// Create a case for each source
		for remaining > 0 {
			// Find the highest priority source with available events
			var highestPriority int = -1
			var selectedEvent Event
			var selectedSource string
			
			for name, ch := range sources {
				if ch == nil {
					continue // This source is already closed
				}
				
				// Try to receive from this source with a short timeout
				select {
				case event, ok := <-ch:
					if !ok {
						// Source is closed, remove it
						sources[name] = nil
						remaining--
						continue
					}
					
					// Check if this source has higher priority than current selection
					priority := priorities[name]
					if highestPriority == -1 || priority > highestPriority {
						highestPriority = priority
						selectedEvent = event
						selectedSource = name
					}
				case <-time.After(1 * time.Millisecond):
					// No event available from this source right now
					continue
				}
			}
			
			// If we found an event, try to send it
			if highestPriority != -1 {
				select {
				case multiplexed <- selectedEvent:
					fmt.Printf("Forwarded event from %s (priority %d)\n", 
						selectedSource, highestPriority)
				case <-ctx.Done():
					return
				}
			} else {
				// No events available from any source, wait a bit
				select {
				case <-time.After(10 * time.Millisecond):
				case <-ctx.Done():
					return
				}
			}
		}
	}()
	
	return multiplexed
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	// Create event sources with different priorities
	sources := make(map[string]<-chan Event)
	priorities := make(map[string]int)
	
	// High priority source (critical system events)
	sources["critical"] = EventSource(ctx, "critical", 500*time.Millisecond, 10)
	priorities["critical"] = 10
	
	// Medium priority source (user actions)
	sources["user"] = EventSource(ctx, "user", 200*time.Millisecond, 5)
	priorities["user"] = 5
	
	// Low priority source (metrics)
	sources["metrics"] = EventSource(ctx, "metrics", 100*time.Millisecond, 1)
	priorities["metrics"] = 1
	
	// Multiplex the sources with priority handling
	multiplexed := PriorityMultiplexer(ctx, sources, priorities)
	
	// Process the multiplexed events
	for event := range multiplexed {
		fmt.Printf("Processed: %s event from %s at %v\n", 
			event.Type, event.Source, event.Time.Format(time.RFC3339Nano))
	}
}

This pattern enables sophisticated event handling in distributed systems by:

  • Prioritizing critical events over less important ones
  • Implementing backpressure to prevent overwhelming consumers
  • Gracefully handling source failures
  • Efficiently multiplexing multiple event streams

Timed Channel Operations

In distributed systems, timeouts are crucial for preventing deadlocks and ensuring responsiveness:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"
)

// Result represents the outcome of a distributed operation
type Result struct {
	Value interface{}
	Err   error
}

// simulateDistributedOperation mimics a call to a remote service with variable latency
func simulateDistributedOperation(ctx context.Context, name string, minLatency, maxLatency time.Duration) <-chan Result {
	resultCh := make(chan Result, 1) // Buffered to prevent goroutine leak
	
	go func() {
		// Calculate a random latency between min and max
		latency := minLatency + time.Duration(rand.Int63n(int64(maxLatency-minLatency)))
		
		// Simulate processing
		select {
		case <-time.After(latency):
			// 10% chance of error
			if rand.Intn(10) == 0 {
				resultCh <- Result{nil, fmt.Errorf("%s operation failed", name)}
			} else {
				resultCh <- Result{fmt.Sprintf("%s result", name), nil}
			}
		case <-ctx.Done():
			resultCh <- Result{nil, ctx.Err()}
		}
		
		close(resultCh)
	}()
	
	return resultCh
}

// firstResponse returns the first successful result or the last error if all fail
func firstResponse(ctx context.Context, timeout time.Duration, operations ...func(context.Context) <-chan Result) Result {
	// Create a context with timeout
	opCtx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	
	// Create a channel for the first response
	firstCh := make(chan Result, 1)
	
	// Launch all operations
	for _, op := range operations {
		go func(operation func(context.Context) <-chan Result) {
			resultCh := operation(opCtx)
			result := <-resultCh
			
			// Only forward successful results or the last error
			if result.Err == nil {
				// Try to send the successful result, but don't block
				select {
				case firstCh <- result:
					// Successfully sent the result
					cancel() // Cancel other operations
				default:
					// Channel already has a result, do nothing
				}
			} else if ctx.Err() != nil {
				// Context was canceled, likely because another operation succeeded
				return
			} else {
				// This was an operation error, send it but don't cancel others
				select {
				case firstCh <- result:
					// Sent the error
				default:
					// Channel already has a result, do nothing
				}
			}
		}(op)
	}
	
	// Wait for the first result or timeout
	select {
	case result := <-firstCh:
		return result
	case <-ctx.Done():
		return Result{nil, ctx.Err()}
	}
}

func main() {
	// Seed the random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create a parent context
	ctx := context.Background()
	
	// Define operations with different latency profiles
	fastButUnreliable := func(ctx context.Context) <-chan Result {
		return simulateDistributedOperation(ctx, "fast-service", 50*time.Millisecond, 150*time.Millisecond)
	}
	
	mediumLatency := func(ctx context.Context) <-chan Result {
		return simulateDistributedOperation(ctx, "medium-service", 100*time.Millisecond, 300*time.Millisecond)
	}
	
	slowButReliable := func(ctx context.Context) <-chan Result {
		return simulateDistributedOperation(ctx, "slow-service", 200*time.Millisecond, 500*time.Millisecond)
	}
	
	// Try multiple operations with a timeout
	fmt.Println("Executing distributed operations with redundancy...")
	result := firstResponse(ctx, 400*time.Millisecond, fastButUnreliable, mediumLatency, slowButReliable)
	
	if result.Err != nil {
		fmt.Printf("All operations failed or timed out: %v\n", result.Err)
	} else {
		fmt.Printf("Operation succeeded with result: %v\n", result.Value)
	}
	
	// Demonstrate a more complex scenario with multiple attempts
	fmt.Println("\nExecuting with multiple attempts...")
	
	for attempt := 1; attempt <= 3; attempt++ {
		fmt.Printf("Attempt %d...\n", attempt)
		
		// Increase timeout with each attempt
		timeout := time.Duration(attempt) * 200 * time.Millisecond
		
		result = firstResponse(ctx, timeout, fastButUnreliable, mediumLatency, slowButReliable)
		
		if result.Err == nil {
			fmt.Printf("Success on attempt %d: %v\n", attempt, result.Value)
			break
		} else {
			fmt.Printf("Attempt %d failed: %v\n", attempt, result.Err)
		}
	}
}

This pattern is essential for distributed systems where:

  • Services may have variable latency
  • You need to implement redundancy across multiple services
  • Graceful degradation is required when services are slow or unavailable
  • Timeouts must be carefully managed to prevent cascading failures