Channel Orchestration Techniques
Beyond individual patterns, channels can be orchestrated to create sophisticated concurrent systems. These techniques combine multiple patterns to solve complex coordination problems.
Timeout and Cancellation Patterns
Proper timeout and cancellation handling is essential for robust concurrent systems:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// timeoutOperation demonstrates a simple timeout pattern
func timeoutOperation(timeout time.Duration) (result string, err error) {
// Create a channel for the operation result
resultCh := make(chan string, 1)
// Start the operation in a goroutine
go func() {
// Simulate work
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
resultCh <- "Operation completed successfully"
}()
// Wait for result or timeout
select {
case result = <-resultCh:
return result, nil
case <-time.After(timeout):
return "", fmt.Errorf("operation timed out after %v", timeout)
}
}
// contextAwareOperation demonstrates using context for cancellation
func contextAwareOperation(ctx context.Context) (string, error) {
// Create a channel for the operation result
resultCh := make(chan string, 1)
// Start the operation in a goroutine
go func() {
// Simulate work with periodic cancellation checks
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
// Context was cancelled, abort operation
return
case <-time.After(200 * time.Millisecond):
// Continue working
fmt.Println("Operation in progress...")
}
}
resultCh <- "Operation completed successfully"
}()
// Wait for result or cancellation
select {
case result := <-resultCh:
return result, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
// multiStageTimeout demonstrates different timeouts for different stages
func multiStageTimeout() error {
// Stage 1: Connect (short timeout)
connectCtx, cancelConnect := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancelConnect()
fmt.Println("Stage 1: Connecting...")
if err := stageOperation(connectCtx, "connect", 300); err != nil {
return fmt.Errorf("connect failed: %w", err)
}
// Stage 2: Process (longer timeout)
processCtx, cancelProcess := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelProcess()
fmt.Println("Stage 2: Processing...")
if err := stageOperation(processCtx, "process", 1000); err != nil {
return fmt.Errorf("process failed: %w", err)
}
// Stage 3: Finalize (medium timeout)
finalizeCtx, cancelFinalize := context.WithTimeout(context.Background(), 1*time.Second)
defer cancelFinalize()
fmt.Println("Stage 3: Finalizing...")
if err := stageOperation(finalizeCtx, "finalize", 500); err != nil {
return fmt.Errorf("finalize failed: %w", err)
}
fmt.Println("All stages completed successfully")
return nil
}
// stageOperation simulates a stage in a multi-stage operation
func stageOperation(ctx context.Context, name string, maxDuration int) error {
// Simulate work with random duration
duration := time.Duration(rand.Intn(maxDuration)) * time.Millisecond
select {
case <-time.After(duration):
fmt.Printf("Stage '%s' completed in %v\n", name, duration)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// propagatingCancellation demonstrates how cancellation propagates through a pipeline
func propagatingCancellation() {
// Create a cancellable context
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Ensure cancellation in all cases
// Create a pipeline with this context
nums := generateNumbers(ctx, 1, 100)
squares := squareNumbers(ctx, nums)
results := filterNumbers(ctx, squares, func(n int) bool {
return n%10 == 0 // Only numbers divisible by 10
})
// Process results for a while, then cancel
go func() {
time.Sleep(500 * time.Millisecond)
fmt.Println("Cancelling pipeline...")
cancel()
}()
// Consume results until cancellation
for result := range results {
fmt.Printf("Result: %d\n", result)
time.Sleep(100 * time.Millisecond) // Slow consumer
}
fmt.Println("Pipeline terminated")
}
// generateNumbers produces a sequence of integers, respecting context cancellation
func generateNumbers(ctx context.Context, start, end int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := start; i <= end; i++ {
select {
case <-ctx.Done():
fmt.Println("Generator cancelled")
return
case out <- i:
// Successfully sent value
}
}
}()
return out
}
// squareNumbers transforms input numbers by squaring them
func squareNumbers(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-ctx.Done():
fmt.Println("Square operation cancelled")
return
case out <- n * n:
// Successfully sent value
}
}
}()
return out
}
// filterNumbers filters numbers based on a predicate function
func filterNumbers(ctx context.Context, in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
select {
case <-ctx.Done():
fmt.Println("Filter operation cancelled")
return
case out <- n:
// Successfully sent value
}
}
}
}()
return out
}
func main() {
// Demonstrate simple timeout
fmt.Println("=== Simple Timeout Pattern ===")
result, err := timeoutOperation(500 * time.Millisecond)
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Printf("Result: %s\n", result)
}
// Demonstrate context cancellation
fmt.Println("\n=== Context Cancellation Pattern ===")
ctx, cancel := context.WithTimeout(context.Background(), 700*time.Millisecond)
defer cancel()
result, err = contextAwareOperation(ctx)
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Printf("Result: %s\n", result)
}
// Demonstrate multi-stage timeout
fmt.Println("\n=== Multi-stage Timeout Pattern ===")
if err := multiStageTimeout(); err != nil {
fmt.Printf("Error: %v\n", err)
}
// Demonstrate cancellation propagation
fmt.Println("\n=== Cancellation Propagation Pattern ===")
propagatingCancellation()
}
Key timeout and cancellation patterns:
- Simple timeout: Using
select
withtime.After
to implement timeouts - Context-based cancellation: Using Go’s
context
package for cancellation propagation - Multi-stage timeouts: Different timeouts for different stages of an operation
- Cancellation propagation: Ensuring cancellation signals flow through all parts of a system
Coordinating Multiple Goroutines
Complex concurrent systems often require sophisticated coordination between multiple goroutines:
package main
import (
"fmt"
"sync"
"time"
)
// Barrier synchronizes multiple goroutines to a common point
type Barrier struct {
count int
mutex sync.Mutex
cond *sync.Cond
required int
}
// NewBarrier creates a new barrier that waits for the required number of goroutines
func NewBarrier(required int) *Barrier {
b := &Barrier{required: required}
b.cond = sync.NewCond(&b.mutex)
return b
}
// Wait blocks until all goroutines have reached the barrier
func (b *Barrier) Wait() {
b.mutex.Lock()
defer b.mutex.Unlock()
b.count++
if b.count == b.required {
// Last goroutine to arrive, reset and broadcast
b.count = 0
b.cond.Broadcast()
} else {
// Wait for all goroutines to arrive
b.cond.Wait()
}
}
// WorkGroup coordinates multiple goroutines with phases
type WorkGroup struct {
size int
wg sync.WaitGroup
startChan chan struct{}
doneChan chan struct{}
barriers []*Barrier
}
// NewWorkGroup creates a new work group for coordinating goroutines
func NewWorkGroup(size int, phases int) *WorkGroup {
wg := &WorkGroup{
size: size,
startChan: make(chan struct{}),
doneChan: make(chan struct{}),
barriers: make([]*Barrier, phases),
}
// Create barriers for each phase
for i := 0; i < phases; i++ {
wg.barriers[i] = NewBarrier(size)
}
return wg
}
// Start launches the work group
func (wg *WorkGroup) Start(work func(id, phase int)) {
// Start all workers
wg.wg.Add(wg.size)
for i := 0; i < wg.size; i++ {
go func(id int) {
defer wg.wg.Done()
// Wait for start signal
<-wg.startChan
// Execute work through all phases
for phase := 0; phase < len(wg.barriers); phase++ {
work(id, phase)
wg.barriers[phase].Wait() // Synchronize at barrier
}
}(i)
}
// Signal all goroutines to start
close(wg.startChan)
// Wait for all to complete in a separate goroutine
go func() {
wg.wg.Wait()
close(wg.doneChan)
}()
}
// Wait blocks until all work is complete
func (wg *WorkGroup) Wait() {
<-wg.doneChan
}
// Rendezvous coordinates pairs of goroutines
type Rendezvous struct {
firstArrived chan int
secondArrived chan struct{}
}
// NewRendezvous creates a new rendezvous point
func NewRendezvous() *Rendezvous {
return &Rendezvous{
firstArrived: make(chan int),
secondArrived: make(chan struct{}),
}
}
// Arrive waits for a pair of goroutines to meet
// Returns the ID of the first to arrive if this is the second arrival
func (r *Rendezvous) Arrive(id int) (int, bool) {
select {
case r.firstArrived <- id:
// We're first to arrive, wait for second
<-r.secondArrived
return 0, false
case firstID := <-r.firstArrived:
// We're second to arrive, signal first
close(r.secondArrived)
return firstID, true
}
}
func main() {
// Demonstrate barrier synchronization
fmt.Println("=== Barrier Synchronization ===")
barrier := NewBarrier(3)
for i := 0; i < 3; i++ {
go func(id int) {
fmt.Printf("Goroutine %d starting\n", id)
time.Sleep(time.Duration(id*300) * time.Millisecond) // Different work times
fmt.Printf("Goroutine %d arriving at barrier\n", id)
barrier.Wait()
fmt.Printf("Goroutine %d continuing after barrier\n", id)
}(i)
}
// Demonstrate phased work group
fmt.Println("\n=== Phased Work Group ===")
workGroup := NewWorkGroup(4, 3)
workGroup.Start(func(id, phase int) {
fmt.Printf("Worker %d executing phase %d\n", id, phase)
time.Sleep(time.Duration(100*(id+phase)) * time.Millisecond)
fmt.Printf("Worker %d completed phase %d\n", id, phase)
})
workGroup.Wait()
fmt.Println("All workers completed all phases")
// Demonstrate rendezvous pattern
fmt.Println("\n=== Rendezvous Pattern ===")
rendezvous := NewRendezvous()
go func() {
fmt.Println("Goroutine A starting")
time.Sleep(300 * time.Millisecond)
fmt.Println("Goroutine A arriving at rendezvous")
otherID, isSecond := rendezvous.Arrive(1)
if isSecond {
fmt.Printf("Goroutine A met with Goroutine %d\n", otherID)
} else {
fmt.Println("Goroutine A continuing after rendezvous")
}
}()
go func() {
fmt.Println("Goroutine B starting")
time.Sleep(100 * time.Millisecond)
fmt.Println("Goroutine B arriving at rendezvous")
otherID, isSecond := rendezvous.Arrive(2)
if isSecond {
fmt.Printf("Goroutine B met with Goroutine %d\n", otherID)
} else {
fmt.Println("Goroutine B continuing after rendezvous")
}
}()
// Wait for demonstration to complete
time.Sleep(1 * time.Second)
}
These coordination techniques enable:
- Phased execution: Coordinating multiple goroutines through distinct phases
- Barrier synchronization: Ensuring all goroutines reach a common point before proceeding
- Rendezvous: Coordinating pairs of goroutines to meet at a common point
- Work distribution: Efficiently distributing work across multiple goroutines
Multiplexing and Demultiplexing Channels
Channel multiplexing combines multiple input channels into a single output channel, while demultiplexing does the reverse:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// multiplexChannels combines multiple input channels into a single output channel
func multiplexChannels(inputs ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
// Start a goroutine for each input channel
for i, ch := range inputs {
wg.Add(1)
go func(id int, ch <-chan int) {
defer wg.Done()
for val := range ch {
fmt.Printf("Multiplexer receiving from input %d: %d\n", id, val)
output <- val
}
}(i, ch)
}
// Close the output channel when all input channels are done
go func() {
wg.Wait()
close(output)
fmt.Println("All input channels closed, multiplexer shutting down")
}()
return output
}
// demultiplexChannel splits a single input channel into multiple output channels
func demultiplexChannel(input <-chan int, n int) []<-chan int {
outputs := make([]chan int, n)
for i := range outputs {
outputs[i] = make(chan int)
}
// Convert to read-only channels for return
readOnlyOutputs := make([]<-chan int, n)
for i, ch := range outputs {
readOnlyOutputs[i] = ch
}
// Start the demultiplexer
go func() {
defer func() {
for _, ch := range outputs {
close(ch)
}
fmt.Println("Demultiplexer shutting down, all output channels closed")
}()
// Round-robin distribution
for val := range input {
// Determine which output channel to use based on the value
outChan := val % n
fmt.Printf("Demultiplexer sending %d to output %d\n", val, outChan)
outputs[outChan] <- val
}
}()
return readOnlyOutputs
}
// contentBasedDemux routes messages based on their content
func contentBasedDemux(input <-chan int, predicates ...func(int) bool) []<-chan int {
outputs := make([]chan int, len(predicates))
for i := range outputs {
outputs[i] = make(chan int)
}
// Convert to read-only channels for return
readOnlyOutputs := make([]<-chan int, len(predicates))
for i, ch := range outputs {
readOnlyOutputs[i] = ch
}
// Start the content-based router
go func() {
defer func() {
for _, ch := range outputs {
close(ch)
}
fmt.Println("Content router shutting down, all output channels closed")
}()
for val := range input {
// Send to all matching outputs
for i, predicate := range predicates {
if predicate(val) {
fmt.Printf("Router sending %d to output %d\n", val, i)
outputs[i] <- val
}
}
}
}()
return readOnlyOutputs
}
// broadcastChannel sends each input value to all output channels
func broadcastChannel(input <-chan int, n int) []<-chan int {
outputs := make([]chan int, n)
for i := range outputs {
outputs[i] = make(chan int, 5) // Buffer to prevent blocking
}
// Convert to read-only channels for return
readOnlyOutputs := make([]<-chan int, n)
for i, ch := range outputs {
readOnlyOutputs[i] = ch
}
// Start the broadcaster
go func() {
defer func() {
for _, ch := range outputs {
close(ch)
}
fmt.Println("Broadcaster shutting down, all output channels closed")
}()
for val := range input {
fmt.Printf("Broadcasting value: %d\n", val)
for i, ch := range outputs {
select {
case ch <- val:
// Value sent successfully
default:
// Channel buffer full, log and continue
fmt.Printf("Warning: Output %d buffer full, dropping value %d\n", i, val)
}
}
}
}()
return readOnlyOutputs
}
func main() {
// Create input channels for multiplexing demo
fmt.Println("=== Channel Multiplexing ===")
inputs := make([]chan int, 3)
for i := range inputs {
inputs[i] = make(chan int)
i := i // Create new variable to avoid closure problem
// Start producer for each input channel
go func() {
defer close(inputs[i])
for j := 0; j < 3; j++ {
val := i*10 + j
fmt.Printf("Producer %d sending: %d\n", i, val)
inputs[i] <- val
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}()
}
// Convert to read-only channels for multiplexing
readOnlyInputs := make([]<-chan int, len(inputs))
for i, ch := range inputs {
readOnlyInputs[i] = ch
}
// Multiplex the channels
multiplexed := multiplexChannels(readOnlyInputs...)
// Consume multiplexed output
go func() {
for val := range multiplexed {
fmt.Printf("Consumer received from multiplexed channel: %d\n", val)
}
}()
// Wait for multiplexing demo to complete
time.Sleep(2 * time.Second)
// Demonstrate demultiplexing
fmt.Println("\n=== Channel Demultiplexing ===")
input := make(chan int)
// Start producer for input channel
go func() {
defer close(input)
for i := 0; i < 10; i++ {
fmt.Printf("Demux producer sending: %d\n", i)
input <- i
time.Sleep(100 * time.Millisecond)
}
}()
// Demultiplex into 3 output channels
outputs := demultiplexChannel(input, 3)
// Start consumers for each output channel
var wg sync.WaitGroup
for i, ch := range outputs {
wg.Add(1)
i := i // Create new variable to avoid closure problem
ch := ch
go func() {
defer wg.Done()
for val := range ch {
fmt.Printf("Demux consumer %d received: %d\n", i, val)
}
}()
}
// Wait for demultiplexing demo to complete
time.Sleep(2 * time.Second)
// Demonstrate content-based routing
fmt.Println("\n=== Content-Based Routing ===")
routerInput := make(chan int)
// Define predicates for routing
isEven := func(n int) bool { return n%2 == 0 }
isDivisibleBy3 := func(n int) bool { return n%3 == 0 }
isGreaterThan5 := func(n int) bool { return n > 5 }
// Create router
routedOutputs := contentBasedDemux(routerInput, isEven, isDivisibleBy3, isGreaterThan5)
// Start consumers for each routed output
for i, ch := range routedOutputs {
wg.Add(1)
i := i // Create new variable to avoid closure problem
ch := ch
go func() {
defer wg.Done()
for val := range ch {
var condition string
switch i {
case 0:
condition = "even"
case 1:
condition = "divisible by 3"
case 2:
condition = "greater than 5"
}
fmt.Printf("Router consumer %d received %d (%s)\n", i, val, condition)
}
}()
}
// Send values to the router
go func() {
defer close(routerInput)
for i := 0; i < 10; i++ {
fmt.Printf("Router producer sending: %d\n", i)
routerInput <- i
time.Sleep(100 * time.Millisecond)
}
}()
// Wait for routing demo to complete
time.Sleep(2 * time.Second)
}
These multiplexing and demultiplexing patterns enable:
- Channel consolidation: Combining multiple input sources into a single stream
- Load distribution: Distributing work across multiple workers
- Content-based routing: Directing messages based on their content
- Broadcasting: Sending the same message to multiple recipients