Advanced Channel Patterns
Channels are Go’s primary mechanism for communication between goroutines, but in distributed systems, we need to leverage more sophisticated patterns to handle complex coordination scenarios.
Fan-Out/Fan-In Pattern
The fan-out/fan-in pattern distributes work across multiple goroutines and then collects the results:
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Task represents a unit of work to be processed
type Task struct {
ID int
Input string
Result string
Err error
}
// fanOut distributes tasks to multiple worker goroutines
func fanOut(ctx context.Context, tasks []Task, workerCount int) <-chan Task {
tasksCh := make(chan Task)
go func() {
defer close(tasksCh)
for _, task := range tasks {
select {
case tasksCh <- task:
case <-ctx.Done():
return
}
}
}()
return tasksCh
}
// processTask simulates processing a task with potential failures
func processTask(ctx context.Context, task Task) Task {
// Simulate processing time
select {
case <-time.After(100 * time.Millisecond):
task.Result = fmt.Sprintf("Processed: %s", task.Input)
return task
case <-ctx.Done():
task.Err = ctx.Err()
return task
}
}
// worker processes tasks from the input channel and sends results to the output channel
func worker(ctx context.Context, id int, tasksCh <-chan Task, resultsCh chan<- Task) {
for task := range tasksCh {
select {
case <-ctx.Done():
return
default:
task.Result = fmt.Sprintf("Worker %d processed: %s", id, task.Input)
// Simulate occasional failures
if task.ID%7 == 0 {
task.Err = fmt.Errorf("processing error on task %d", task.ID)
task.Result = ""
}
// Send the result
select {
case resultsCh <- task:
case <-ctx.Done():
return
}
}
}
}
// fanIn collects results from multiple workers into a single channel
func fanIn(ctx context.Context, workerCount int, resultsCh chan Task) <-chan Task {
multiplexedCh := make(chan Task)
var wg sync.WaitGroup
wg.Add(workerCount)
// Start a goroutine for each worker to collect results
for i := 0; i < workerCount; i++ {
go func() {
defer wg.Done()
for {
select {
case result, ok := <-resultsCh:
if !ok {
return
}
select {
case multiplexedCh <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
}
// Close the multiplexed channel once all workers are done
go func() {
wg.Wait()
close(multiplexedCh)
}()
return multiplexedCh
}
// distributeAndProcess implements the complete fan-out/fan-in pattern
func distributeAndProcess(ctx context.Context, tasks []Task, workerCount int) []Task {
// Create a buffered channel for results to prevent blocking
resultsCh := make(chan Task, len(tasks))
// Fan out: distribute tasks to workers
tasksCh := fanOut(ctx, tasks, workerCount)
// Start workers
var wg sync.WaitGroup
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func(workerID int) {
defer wg.Done()
worker(ctx, workerID, tasksCh, resultsCh)
}(i)
}
// Close results channel when all workers are done
go func() {
wg.Wait()
close(resultsCh)
}()
// Fan in: collect results
collectedResults := make([]Task, 0, len(tasks))
for result := range resultsCh {
collectedResults = append(collectedResults, result)
}
return collectedResults
}
func main() {
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Generate sample tasks
tasks := make([]Task, 20)
for i := 0; i < 20; i++ {
tasks[i] = Task{
ID: i,
Input: fmt.Sprintf("Task %d input", i),
}
}
// Process tasks using fan-out/fan-in pattern
results := distributeAndProcess(ctx, tasks, 5)
// Print results
fmt.Println("Results:")
successCount := 0
failureCount := 0
for _, result := range results {
if result.Err != nil {
fmt.Printf("Task %d failed: %v\n", result.ID, result.Err)
failureCount++
} else {
fmt.Printf("Task %d succeeded: %s\n", result.ID, result.Result)
successCount++
}
}
fmt.Printf("\nSummary: %d succeeded, %d failed\n", successCount, failureCount)
}
This pattern is particularly useful for distributed systems where you need to:
- Process a large number of tasks in parallel
- Handle failures gracefully
- Collect and aggregate results efficiently
- Implement backpressure mechanisms
Multiplexing with Select
In distributed systems, you often need to coordinate multiple channels with different priorities and timeouts:
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
// Event represents a message in our system
type Event struct {
Source string
Type string
Payload interface{}
Time time.Time
}
// EventSource generates events from different parts of a distributed system
func EventSource(ctx context.Context, name string, interval time.Duration, priority int) <-chan Event {
ch := make(chan Event)
go func() {
defer close(ch)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Generate an event
event := Event{
Source: name,
Type: fmt.Sprintf("event-type-%d", rand.Intn(3)),
Payload: fmt.Sprintf("data from %s", name),
Time: time.Now(),
}
// Try to send the event
select {
case ch <- event:
// Event sent successfully
case <-ctx.Done():
return
case <-time.After(50 * time.Millisecond):
// Couldn't send within timeout, log and continue
fmt.Printf("Warning: Dropped event from %s due to backpressure\n", name)
}
case <-ctx.Done():
return
}
}
}()
return ch
}
// PriorityMultiplexer combines multiple event sources with priority handling
func PriorityMultiplexer(ctx context.Context, sources map[string]<-chan Event, priorities map[string]int) <-chan Event {
multiplexed := make(chan Event)
go func() {
defer close(multiplexed)
// Keep track of active sources
remaining := len(sources)
// Create a case for each source
for remaining > 0 {
// Find the highest priority source with available events
var highestPriority int = -1
var selectedEvent Event
var selectedSource string
for name, ch := range sources {
if ch == nil {
continue // This source is already closed
}
// Try to receive from this source with a short timeout
select {
case event, ok := <-ch:
if !ok {
// Source is closed, remove it
sources[name] = nil
remaining--
continue
}
// Check if this source has higher priority than current selection
priority := priorities[name]
if highestPriority == -1 || priority > highestPriority {
highestPriority = priority
selectedEvent = event
selectedSource = name
}
case <-time.After(1 * time.Millisecond):
// No event available from this source right now
continue
}
}
// If we found an event, try to send it
if highestPriority != -1 {
select {
case multiplexed <- selectedEvent:
fmt.Printf("Forwarded event from %s (priority %d)\n",
selectedSource, highestPriority)
case <-ctx.Done():
return
}
} else {
// No events available from any source, wait a bit
select {
case <-time.After(10 * time.Millisecond):
case <-ctx.Done():
return
}
}
}
}()
return multiplexed
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create event sources with different priorities
sources := make(map[string]<-chan Event)
priorities := make(map[string]int)
// High priority source (critical system events)
sources["critical"] = EventSource(ctx, "critical", 500*time.Millisecond, 10)
priorities["critical"] = 10
// Medium priority source (user actions)
sources["user"] = EventSource(ctx, "user", 200*time.Millisecond, 5)
priorities["user"] = 5
// Low priority source (metrics)
sources["metrics"] = EventSource(ctx, "metrics", 100*time.Millisecond, 1)
priorities["metrics"] = 1
// Multiplex the sources with priority handling
multiplexed := PriorityMultiplexer(ctx, sources, priorities)
// Process the multiplexed events
for event := range multiplexed {
fmt.Printf("Processed: %s event from %s at %v\n",
event.Type, event.Source, event.Time.Format(time.RFC3339Nano))
}
}
This pattern enables sophisticated event handling in distributed systems by:
- Prioritizing critical events over less important ones
- Implementing backpressure to prevent overwhelming consumers
- Gracefully handling source failures
- Efficiently multiplexing multiple event streams
Timed Channel Operations
In distributed systems, timeouts are crucial for preventing deadlocks and ensuring responsiveness:
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
// Result represents the outcome of a distributed operation
type Result struct {
Value interface{}
Err error
}
// simulateDistributedOperation mimics a call to a remote service with variable latency
func simulateDistributedOperation(ctx context.Context, name string, minLatency, maxLatency time.Duration) <-chan Result {
resultCh := make(chan Result, 1) // Buffered to prevent goroutine leak
go func() {
// Calculate a random latency between min and max
latency := minLatency + time.Duration(rand.Int63n(int64(maxLatency-minLatency)))
// Simulate processing
select {
case <-time.After(latency):
// 10% chance of error
if rand.Intn(10) == 0 {
resultCh <- Result{nil, fmt.Errorf("%s operation failed", name)}
} else {
resultCh <- Result{fmt.Sprintf("%s result", name), nil}
}
case <-ctx.Done():
resultCh <- Result{nil, ctx.Err()}
}
close(resultCh)
}()
return resultCh
}
// firstResponse returns the first successful result or the last error if all fail
func firstResponse(ctx context.Context, timeout time.Duration, operations ...func(context.Context) <-chan Result) Result {
// Create a context with timeout
opCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// Create a channel for the first response
firstCh := make(chan Result, 1)
// Launch all operations
for _, op := range operations {
go func(operation func(context.Context) <-chan Result) {
resultCh := operation(opCtx)
result := <-resultCh
// Only forward successful results or the last error
if result.Err == nil {
// Try to send the successful result, but don't block
select {
case firstCh <- result:
// Successfully sent the result
cancel() // Cancel other operations
default:
// Channel already has a result, do nothing
}
} else if ctx.Err() != nil {
// Context was canceled, likely because another operation succeeded
return
} else {
// This was an operation error, send it but don't cancel others
select {
case firstCh <- result:
// Sent the error
default:
// Channel already has a result, do nothing
}
}
}(op)
}
// Wait for the first result or timeout
select {
case result := <-firstCh:
return result
case <-ctx.Done():
return Result{nil, ctx.Err()}
}
}
func main() {
// Seed the random number generator
rand.Seed(time.Now().UnixNano())
// Create a parent context
ctx := context.Background()
// Define operations with different latency profiles
fastButUnreliable := func(ctx context.Context) <-chan Result {
return simulateDistributedOperation(ctx, "fast-service", 50*time.Millisecond, 150*time.Millisecond)
}
mediumLatency := func(ctx context.Context) <-chan Result {
return simulateDistributedOperation(ctx, "medium-service", 100*time.Millisecond, 300*time.Millisecond)
}
slowButReliable := func(ctx context.Context) <-chan Result {
return simulateDistributedOperation(ctx, "slow-service", 200*time.Millisecond, 500*time.Millisecond)
}
// Try multiple operations with a timeout
fmt.Println("Executing distributed operations with redundancy...")
result := firstResponse(ctx, 400*time.Millisecond, fastButUnreliable, mediumLatency, slowButReliable)
if result.Err != nil {
fmt.Printf("All operations failed or timed out: %v\n", result.Err)
} else {
fmt.Printf("Operation succeeded with result: %v\n", result.Value)
}
// Demonstrate a more complex scenario with multiple attempts
fmt.Println("\nExecuting with multiple attempts...")
for attempt := 1; attempt <= 3; attempt++ {
fmt.Printf("Attempt %d...\n", attempt)
// Increase timeout with each attempt
timeout := time.Duration(attempt) * 200 * time.Millisecond
result = firstResponse(ctx, timeout, fastButUnreliable, mediumLatency, slowButReliable)
if result.Err == nil {
fmt.Printf("Success on attempt %d: %v\n", attempt, result.Value)
break
} else {
fmt.Printf("Attempt %d failed: %v\n", attempt, result.Err)
}
}
}
This pattern is essential for distributed systems where:
- Services may have variable latency
- You need to implement redundancy across multiple services
- Graceful degradation is required when services are slow or unavailable
- Timeouts must be carefully managed to prevent cascading failures