Channel Orchestration Techniques

Beyond individual patterns, channels can be orchestrated to create sophisticated concurrent systems. These techniques combine multiple patterns to solve complex coordination problems.

Timeout and Cancellation Patterns

Proper timeout and cancellation handling is essential for robust concurrent systems:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// timeoutOperation demonstrates a simple timeout pattern
func timeoutOperation(timeout time.Duration) (result string, err error) {
	// Create a channel for the operation result
	resultCh := make(chan string, 1)
	
	// Start the operation in a goroutine
	go func() {
		// Simulate work
		time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
		resultCh <- "Operation completed successfully"
	}()
	
	// Wait for result or timeout
	select {
	case result = <-resultCh:
		return result, nil
	case <-time.After(timeout):
		return "", fmt.Errorf("operation timed out after %v", timeout)
	}
}

// contextAwareOperation demonstrates using context for cancellation
func contextAwareOperation(ctx context.Context) (string, error) {
	// Create a channel for the operation result
	resultCh := make(chan string, 1)
	
	// Start the operation in a goroutine
	go func() {
		// Simulate work with periodic cancellation checks
		for i := 0; i < 5; i++ {
			select {
			case <-ctx.Done():
				// Context was cancelled, abort operation
				return
			case <-time.After(200 * time.Millisecond):
				// Continue working
				fmt.Println("Operation in progress...")
			}
		}
		
		resultCh <- "Operation completed successfully"
	}()
	
	// Wait for result or cancellation
	select {
	case result := <-resultCh:
		return result, nil
	case <-ctx.Done():
		return "", ctx.Err()
	}
}

// multiStageTimeout demonstrates different timeouts for different stages
func multiStageTimeout() error {
	// Stage 1: Connect (short timeout)
	connectCtx, cancelConnect := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancelConnect()
	
	fmt.Println("Stage 1: Connecting...")
	if err := stageOperation(connectCtx, "connect", 300); err != nil {
		return fmt.Errorf("connect failed: %w", err)
	}
	
	// Stage 2: Process (longer timeout)
	processCtx, cancelProcess := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancelProcess()
	
	fmt.Println("Stage 2: Processing...")
	if err := stageOperation(processCtx, "process", 1000); err != nil {
		return fmt.Errorf("process failed: %w", err)
	}
	
	// Stage 3: Finalize (medium timeout)
	finalizeCtx, cancelFinalize := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancelFinalize()
	
	fmt.Println("Stage 3: Finalizing...")
	if err := stageOperation(finalizeCtx, "finalize", 500); err != nil {
		return fmt.Errorf("finalize failed: %w", err)
	}
	
	fmt.Println("All stages completed successfully")
	return nil
}

// stageOperation simulates a stage in a multi-stage operation
func stageOperation(ctx context.Context, name string, maxDuration int) error {
	// Simulate work with random duration
	duration := time.Duration(rand.Intn(maxDuration)) * time.Millisecond
	
	select {
	case <-time.After(duration):
		fmt.Printf("Stage '%s' completed in %v\n", name, duration)
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

// propagatingCancellation demonstrates how cancellation propagates through a pipeline
func propagatingCancellation() {
	// Create a cancellable context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // Ensure cancellation in all cases
	
	// Create a pipeline with this context
	nums := generateNumbers(ctx, 1, 100)
	squares := squareNumbers(ctx, nums)
	results := filterNumbers(ctx, squares, func(n int) bool {
		return n%10 == 0 // Only numbers divisible by 10
	})
	
	// Process results for a while, then cancel
	go func() {
		time.Sleep(500 * time.Millisecond)
		fmt.Println("Cancelling pipeline...")
		cancel()
	}()
	
	// Consume results until cancellation
	for result := range results {
		fmt.Printf("Result: %d\n", result)
		time.Sleep(100 * time.Millisecond) // Slow consumer
	}
	
	fmt.Println("Pipeline terminated")
}

// generateNumbers produces a sequence of integers, respecting context cancellation
func generateNumbers(ctx context.Context, start, end int) <-chan int {
	out := make(chan int)
	
	go func() {
		defer close(out)
		for i := start; i <= end; i++ {
			select {
			case <-ctx.Done():
				fmt.Println("Generator cancelled")
				return
			case out <- i:
				// Successfully sent value
			}
		}
	}()
	
	return out
}

// squareNumbers transforms input numbers by squaring them
func squareNumbers(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	
	go func() {
		defer close(out)
		for n := range in {
			select {
			case <-ctx.Done():
				fmt.Println("Square operation cancelled")
				return
			case out <- n * n:
				// Successfully sent value
			}
		}
	}()
	
	return out
}

// filterNumbers filters numbers based on a predicate function
func filterNumbers(ctx context.Context, in <-chan int, predicate func(int) bool) <-chan int {
	out := make(chan int)
	
	go func() {
		defer close(out)
		for n := range in {
			if predicate(n) {
				select {
				case <-ctx.Done():
					fmt.Println("Filter operation cancelled")
					return
				case out <- n:
					// Successfully sent value
				}
			}
		}
	}()
	
	return out
}

func main() {
	// Demonstrate simple timeout
	fmt.Println("=== Simple Timeout Pattern ===")
	result, err := timeoutOperation(500 * time.Millisecond)
	if err != nil {
		fmt.Printf("Error: %v\n", err)
	} else {
		fmt.Printf("Result: %s\n", result)
	}
	
	// Demonstrate context cancellation
	fmt.Println("\n=== Context Cancellation Pattern ===")
	ctx, cancel := context.WithTimeout(context.Background(), 700*time.Millisecond)
	defer cancel()
	
	result, err = contextAwareOperation(ctx)
	if err != nil {
		fmt.Printf("Error: %v\n", err)
	} else {
		fmt.Printf("Result: %s\n", result)
	}
	
	// Demonstrate multi-stage timeout
	fmt.Println("\n=== Multi-stage Timeout Pattern ===")
	if err := multiStageTimeout(); err != nil {
		fmt.Printf("Error: %v\n", err)
	}
	
	// Demonstrate cancellation propagation
	fmt.Println("\n=== Cancellation Propagation Pattern ===")
	propagatingCancellation()
}

Key timeout and cancellation patterns:

  1. Simple timeout: Using select with time.After to implement timeouts
  2. Context-based cancellation: Using Go’s context package for cancellation propagation
  3. Multi-stage timeouts: Different timeouts for different stages of an operation
  4. Cancellation propagation: Ensuring cancellation signals flow through all parts of a system

Coordinating Multiple Goroutines

Complex concurrent systems often require sophisticated coordination between multiple goroutines:

package main

import (
	"fmt"
	"sync"
	"time"
)

// Barrier synchronizes multiple goroutines to a common point
type Barrier struct {
	count    int
	mutex    sync.Mutex
	cond     *sync.Cond
	required int
}

// NewBarrier creates a new barrier that waits for the required number of goroutines
func NewBarrier(required int) *Barrier {
	b := &Barrier{required: required}
	b.cond = sync.NewCond(&b.mutex)
	return b
}

// Wait blocks until all goroutines have reached the barrier
func (b *Barrier) Wait() {
	b.mutex.Lock()
	defer b.mutex.Unlock()
	
	b.count++
	if b.count == b.required {
		// Last goroutine to arrive, reset and broadcast
		b.count = 0
		b.cond.Broadcast()
	} else {
		// Wait for all goroutines to arrive
		b.cond.Wait()
	}
}

// WorkGroup coordinates multiple goroutines with phases
type WorkGroup struct {
	size      int
	wg        sync.WaitGroup
	startChan chan struct{}
	doneChan  chan struct{}
	barriers  []*Barrier
}

// NewWorkGroup creates a new work group for coordinating goroutines
func NewWorkGroup(size int, phases int) *WorkGroup {
	wg := &WorkGroup{
		size:      size,
		startChan: make(chan struct{}),
		doneChan:  make(chan struct{}),
		barriers:  make([]*Barrier, phases),
	}
	
	// Create barriers for each phase
	for i := 0; i < phases; i++ {
		wg.barriers[i] = NewBarrier(size)
	}
	
	return wg
}

// Start launches the work group
func (wg *WorkGroup) Start(work func(id, phase int)) {
	// Start all workers
	wg.wg.Add(wg.size)
	for i := 0; i < wg.size; i++ {
		go func(id int) {
			defer wg.wg.Done()
			
			// Wait for start signal
			<-wg.startChan
			
			// Execute work through all phases
			for phase := 0; phase < len(wg.barriers); phase++ {
				work(id, phase)
				wg.barriers[phase].Wait() // Synchronize at barrier
			}
		}(i)
	}
	
	// Signal all goroutines to start
	close(wg.startChan)
	
	// Wait for all to complete in a separate goroutine
	go func() {
		wg.wg.Wait()
		close(wg.doneChan)
	}()
}

// Wait blocks until all work is complete
func (wg *WorkGroup) Wait() {
	<-wg.doneChan
}

// Rendezvous coordinates pairs of goroutines
type Rendezvous struct {
	firstArrived  chan int
	secondArrived chan struct{}
}

// NewRendezvous creates a new rendezvous point
func NewRendezvous() *Rendezvous {
	return &Rendezvous{
		firstArrived:  make(chan int),
		secondArrived: make(chan struct{}),
	}
}

// Arrive waits for a pair of goroutines to meet
// Returns the ID of the first to arrive if this is the second arrival
func (r *Rendezvous) Arrive(id int) (int, bool) {
	select {
	case r.firstArrived <- id:
		// We're first to arrive, wait for second
		<-r.secondArrived
		return 0, false
	case firstID := <-r.firstArrived:
		// We're second to arrive, signal first
		close(r.secondArrived)
		return firstID, true
	}
}

func main() {
	// Demonstrate barrier synchronization
	fmt.Println("=== Barrier Synchronization ===")
	barrier := NewBarrier(3)
	
	for i := 0; i < 3; i++ {
		go func(id int) {
			fmt.Printf("Goroutine %d starting\n", id)
			time.Sleep(time.Duration(id*300) * time.Millisecond) // Different work times
			fmt.Printf("Goroutine %d arriving at barrier\n", id)
			barrier.Wait()
			fmt.Printf("Goroutine %d continuing after barrier\n", id)
		}(i)
	}
	
	// Demonstrate phased work group
	fmt.Println("\n=== Phased Work Group ===")
	workGroup := NewWorkGroup(4, 3)
	
	workGroup.Start(func(id, phase int) {
		fmt.Printf("Worker %d executing phase %d\n", id, phase)
		time.Sleep(time.Duration(100*(id+phase)) * time.Millisecond)
		fmt.Printf("Worker %d completed phase %d\n", id, phase)
	})
	
	workGroup.Wait()
	fmt.Println("All workers completed all phases")
	
	// Demonstrate rendezvous pattern
	fmt.Println("\n=== Rendezvous Pattern ===")
	rendezvous := NewRendezvous()
	
	go func() {
		fmt.Println("Goroutine A starting")
		time.Sleep(300 * time.Millisecond)
		fmt.Println("Goroutine A arriving at rendezvous")
		otherID, isSecond := rendezvous.Arrive(1)
		if isSecond {
			fmt.Printf("Goroutine A met with Goroutine %d\n", otherID)
		} else {
			fmt.Println("Goroutine A continuing after rendezvous")
		}
	}()
	
	go func() {
		fmt.Println("Goroutine B starting")
		time.Sleep(100 * time.Millisecond)
		fmt.Println("Goroutine B arriving at rendezvous")
		otherID, isSecond := rendezvous.Arrive(2)
		if isSecond {
			fmt.Printf("Goroutine B met with Goroutine %d\n", otherID)
		} else {
			fmt.Println("Goroutine B continuing after rendezvous")
		}
	}()
	
	// Wait for demonstration to complete
	time.Sleep(1 * time.Second)
}

These coordination techniques enable:

  1. Phased execution: Coordinating multiple goroutines through distinct phases
  2. Barrier synchronization: Ensuring all goroutines reach a common point before proceeding
  3. Rendezvous: Coordinating pairs of goroutines to meet at a common point
  4. Work distribution: Efficiently distributing work across multiple goroutines

Multiplexing and Demultiplexing Channels

Channel multiplexing combines multiple input channels into a single output channel, while demultiplexing does the reverse:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// multiplexChannels combines multiple input channels into a single output channel
func multiplexChannels(inputs ...<-chan int) <-chan int {
	output := make(chan int)
	var wg sync.WaitGroup
	
	// Start a goroutine for each input channel
	for i, ch := range inputs {
		wg.Add(1)
		go func(id int, ch <-chan int) {
			defer wg.Done()
			for val := range ch {
				fmt.Printf("Multiplexer receiving from input %d: %d\n", id, val)
				output <- val
			}
		}(i, ch)
	}
	
	// Close the output channel when all input channels are done
	go func() {
		wg.Wait()
		close(output)
		fmt.Println("All input channels closed, multiplexer shutting down")
	}()
	
	return output
}

// demultiplexChannel splits a single input channel into multiple output channels
func demultiplexChannel(input <-chan int, n int) []<-chan int {
	outputs := make([]chan int, n)
	for i := range outputs {
		outputs[i] = make(chan int)
	}
	
	// Convert to read-only channels for return
	readOnlyOutputs := make([]<-chan int, n)
	for i, ch := range outputs {
		readOnlyOutputs[i] = ch
	}
	
	// Start the demultiplexer
	go func() {
		defer func() {
			for _, ch := range outputs {
				close(ch)
			}
			fmt.Println("Demultiplexer shutting down, all output channels closed")
		}()
		
		// Round-robin distribution
		for val := range input {
			// Determine which output channel to use based on the value
			outChan := val % n
			fmt.Printf("Demultiplexer sending %d to output %d\n", val, outChan)
			outputs[outChan] <- val
		}
	}()
	
	return readOnlyOutputs
}

// contentBasedDemux routes messages based on their content
func contentBasedDemux(input <-chan int, predicates ...func(int) bool) []<-chan int {
	outputs := make([]chan int, len(predicates))
	for i := range outputs {
		outputs[i] = make(chan int)
	}
	
	// Convert to read-only channels for return
	readOnlyOutputs := make([]<-chan int, len(predicates))
	for i, ch := range outputs {
		readOnlyOutputs[i] = ch
	}
	
	// Start the content-based router
	go func() {
		defer func() {
			for _, ch := range outputs {
				close(ch)
			}
			fmt.Println("Content router shutting down, all output channels closed")
		}()
		
		for val := range input {
			// Send to all matching outputs
			for i, predicate := range predicates {
				if predicate(val) {
					fmt.Printf("Router sending %d to output %d\n", val, i)
					outputs[i] <- val
				}
			}
		}
	}()
	
	return readOnlyOutputs
}

// broadcastChannel sends each input value to all output channels
func broadcastChannel(input <-chan int, n int) []<-chan int {
	outputs := make([]chan int, n)
	for i := range outputs {
		outputs[i] = make(chan int, 5) // Buffer to prevent blocking
	}
	
	// Convert to read-only channels for return
	readOnlyOutputs := make([]<-chan int, n)
	for i, ch := range outputs {
		readOnlyOutputs[i] = ch
	}
	
	// Start the broadcaster
	go func() {
		defer func() {
			for _, ch := range outputs {
				close(ch)
			}
			fmt.Println("Broadcaster shutting down, all output channels closed")
		}()
		
		for val := range input {
			fmt.Printf("Broadcasting value: %d\n", val)
			for i, ch := range outputs {
				select {
				case ch <- val:
					// Value sent successfully
				default:
					// Channel buffer full, log and continue
					fmt.Printf("Warning: Output %d buffer full, dropping value %d\n", i, val)
				}
			}
		}
	}()
	
	return readOnlyOutputs
}

func main() {
	// Create input channels for multiplexing demo
	fmt.Println("=== Channel Multiplexing ===")
	inputs := make([]chan int, 3)
	for i := range inputs {
		inputs[i] = make(chan int)
		i := i // Create new variable to avoid closure problem
		
		// Start producer for each input channel
		go func() {
			defer close(inputs[i])
			for j := 0; j < 3; j++ {
				val := i*10 + j
				fmt.Printf("Producer %d sending: %d\n", i, val)
				inputs[i] <- val
				time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
			}
		}()
	}
	
	// Convert to read-only channels for multiplexing
	readOnlyInputs := make([]<-chan int, len(inputs))
	for i, ch := range inputs {
		readOnlyInputs[i] = ch
	}
	
	// Multiplex the channels
	multiplexed := multiplexChannels(readOnlyInputs...)
	
	// Consume multiplexed output
	go func() {
		for val := range multiplexed {
			fmt.Printf("Consumer received from multiplexed channel: %d\n", val)
		}
	}()
	
	// Wait for multiplexing demo to complete
	time.Sleep(2 * time.Second)
	
	// Demonstrate demultiplexing
	fmt.Println("\n=== Channel Demultiplexing ===")
	input := make(chan int)
	
	// Start producer for input channel
	go func() {
		defer close(input)
		for i := 0; i < 10; i++ {
			fmt.Printf("Demux producer sending: %d\n", i)
			input <- i
			time.Sleep(100 * time.Millisecond)
		}
	}()
	
	// Demultiplex into 3 output channels
	outputs := demultiplexChannel(input, 3)
	
	// Start consumers for each output channel
	var wg sync.WaitGroup
	for i, ch := range outputs {
		wg.Add(1)
		i := i // Create new variable to avoid closure problem
		ch := ch
		
		go func() {
			defer wg.Done()
			for val := range ch {
				fmt.Printf("Demux consumer %d received: %d\n", i, val)
			}
		}()
	}
	
	// Wait for demultiplexing demo to complete
	time.Sleep(2 * time.Second)
	
	// Demonstrate content-based routing
	fmt.Println("\n=== Content-Based Routing ===")
	routerInput := make(chan int)
	
	// Define predicates for routing
	isEven := func(n int) bool { return n%2 == 0 }
	isDivisibleBy3 := func(n int) bool { return n%3 == 0 }
	isGreaterThan5 := func(n int) bool { return n > 5 }
	
	// Create router
	routedOutputs := contentBasedDemux(routerInput, isEven, isDivisibleBy3, isGreaterThan5)
	
	// Start consumers for each routed output
	for i, ch := range routedOutputs {
		wg.Add(1)
		i := i // Create new variable to avoid closure problem
		ch := ch
		
		go func() {
			defer wg.Done()
			for val := range ch {
				var condition string
				switch i {
				case 0:
					condition = "even"
				case 1:
					condition = "divisible by 3"
				case 2:
					condition = "greater than 5"
				}
				fmt.Printf("Router consumer %d received %d (%s)\n", i, val, condition)
			}
		}()
	}
	
	// Send values to the router
	go func() {
		defer close(routerInput)
		for i := 0; i < 10; i++ {
			fmt.Printf("Router producer sending: %d\n", i)
			routerInput <- i
			time.Sleep(100 * time.Millisecond)
		}
	}()
	
	// Wait for routing demo to complete
	time.Sleep(2 * time.Second)
}

These multiplexing and demultiplexing patterns enable:

  1. Channel consolidation: Combining multiple input sources into a single stream
  2. Load distribution: Distributing work across multiple workers
  3. Content-based routing: Directing messages based on their content
  4. Broadcasting: Sending the same message to multiple recipients