Advanced Channel Patterns
Building on the fundamentals, we can now explore more sophisticated channel patterns that solve complex concurrency challenges. These patterns provide reusable solutions for common concurrent programming scenarios.
Select Statement Patterns
The select
statement is one of Go’s most powerful concurrency primitives, enabling non-blocking operations and coordination between multiple channels:
package main
import (
"fmt"
"math/rand"
"time"
)
// timeoutOperation demonstrates using select for timeouts
func timeoutOperation() {
ch := make(chan string)
go func() {
// Simulate work that takes random time
delay := time.Duration(rand.Intn(500)) * time.Millisecond
time.Sleep(delay)
ch <- fmt.Sprintf("Operation completed in %v", delay)
}()
// Wait for result or timeout
select {
case result := <-ch:
fmt.Println("Success:", result)
case <-time.After(300 * time.Millisecond):
fmt.Println("Operation timed out")
}
}
// nonBlockingReceive demonstrates non-blocking channel operations
func nonBlockingReceive(ch chan string) {
select {
case msg := <-ch:
fmt.Println("Received message:", msg)
default:
fmt.Println("No message available")
}
}
// nonBlockingSend demonstrates non-blocking send operation
func nonBlockingSend(ch chan string, msg string) {
select {
case ch <- msg:
fmt.Println("Sent message:", msg)
default:
fmt.Println("Cannot send message, channel full or no receivers")
}
}
// prioritySelect demonstrates channel priority using select
func prioritySelect(high, medium, low chan string) {
for {
select {
case msg := <-high:
fmt.Println("High priority:", msg)
return
default:
// Continue to nested select
}
select {
case msg := <-high:
fmt.Println("High priority:", msg)
return
case msg := <-medium:
fmt.Println("Medium priority:", msg)
return
default:
// Continue to final select
}
select {
case msg := <-high:
fmt.Println("High priority:", msg)
case msg := <-medium:
fmt.Println("Medium priority:", msg)
case msg := <-low:
fmt.Println("Low priority:", msg)
case <-time.After(500 * time.Millisecond):
fmt.Println("Timed out waiting for messages")
}
return
}
}
// dynamicSelectCases demonstrates how to handle a dynamic number of channels
func dynamicSelectCases(channels []chan int) {
cases := make([]reflect.SelectCase, len(channels))
for i, ch := range channels {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
}
}
// Add timeout case
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(time.After(1 * time.Second)),
})
// Wait for any channel to receive
chosen, value, ok := reflect.Select(cases)
if chosen == len(cases)-1 {
fmt.Println("Timed out")
return
}
if ok {
fmt.Printf("Received value %v from channel %d\n", value.Int(), chosen)
} else {
fmt.Printf("Channel %d was closed\n", chosen)
}
}
func main() {
// Demonstrate timeout pattern
fmt.Println("=== Timeout Pattern ===")
timeoutOperation()
// Demonstrate non-blocking operations
fmt.Println("\n=== Non-blocking Operations ===")
ch := make(chan string, 1)
nonBlockingReceive(ch)
nonBlockingSend(ch, "Hello")
nonBlockingReceive(ch)
nonBlockingSend(ch, "World") // Channel is now full
// Demonstrate priority selection
fmt.Println("\n=== Priority Selection ===")
high := make(chan string, 1)
medium := make(chan string, 1)
low := make(chan string, 1)
// Try different scenarios
medium <- "Medium message"
prioritySelect(high, medium, low)
// For dynamic select case demo, we need the reflect package
fmt.Println("\n=== Dynamic Select Cases ===")
channels := make([]chan int, 3)
for i := range channels {
channels[i] = make(chan int)
i := i // Create new variable to avoid closure problem
go func() {
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
channels[i] <- i
}()
}
// Import reflect package at the top if running this code
// dynamicSelectCases(channels)
fmt.Println("(Dynamic select cases require the reflect package)")
}
Key select patterns:
- Timeout pattern: Combine a work channel with
time.After()
to implement timeouts - Non-blocking operations: Use the
default
case to make channel operations non-blocking - Priority selection: Nest multiple select statements to implement channel priority
- Dynamic cases: Use the
reflect
package to handle a dynamic number of channels
Fan-Out and Fan-In Patterns
Fan-out/fan-in is a powerful pattern for parallel processing that distributes work across multiple goroutines and then consolidates the results:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// worker processes items from input and sends results to output
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // Simulate work
results <- job * 2 // Send result
}
}
// fanOut distributes work across multiple workers
func fanOut(jobs <-chan int, numWorkers int) <-chan int {
results := make(chan int)
var wg sync.WaitGroup
// Start workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Close results channel when all workers are done
go func() {
wg.Wait()
close(results)
fmt.Println("All workers completed")
}()
return results
}
// fanIn merges multiple channels into a single channel
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
multiplexed := make(chan int)
// Function to forward values from input channel to multiplexed channel
forward := func(ch <-chan int) {
defer wg.Done()
for val := range ch {
multiplexed <- val
}
}
// Start a goroutine for each input channel
wg.Add(len(channels))
for _, ch := range channels {
go forward(ch)
}
// Close multiplexed channel when all input channels are done
go func() {
wg.Wait()
close(multiplexed)
}()
return multiplexed
}
// fanInReflect uses reflection to merge channels dynamically
func fanInReflect(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, c := range channels {
go func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Create jobs channel and send jobs
jobs := make(chan int, 10)
for i := 1; i <= 10; i++ {
jobs <- i
}
close(jobs)
// Fan out to 3 workers
fmt.Println("Starting fan-out with 3 workers")
results := fanOut(jobs, 3)
// Collect and print results
for result := range results {
fmt.Printf("Got result: %d\n", result)
}
// Demonstrate fan-in with multiple channels
fmt.Println("\nDemonstrating fan-in pattern")
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
// Send values on each channel
go func() {
for i := 1; i <= 3; i++ {
ch1 <- i * 10
time.Sleep(100 * time.Millisecond)
}
close(ch1)
}()
go func() {
for i := 1; i <= 3; i++ {
ch2 <- i * 100
time.Sleep(150 * time.Millisecond)
}
close(ch2)
}()
go func() {
for i := 1; i <= 3; i++ {
ch3 <- i * 1000
time.Sleep(80 * time.Millisecond)
}
close(ch3)
}()
// Fan-in the channels
merged := fanIn(ch1, ch2, ch3)
// Print merged results
for result := range merged {
fmt.Printf("Merged result: %d\n", result)
}
}
The fan-out/fan-in pattern is particularly useful for:
- CPU-bound tasks: Distributing computation across multiple cores
- I/O-bound tasks: Managing multiple concurrent I/O operations
- Rate limiting: Controlling the degree of parallelism
- Batch processing: Processing large datasets in parallel chunks
Pipeline Pattern
Pipelines compose a series of processing stages connected by channels, allowing data to flow through multiple transformations:
package main
import (
"fmt"
"math/rand"
"sync"
)
// generator creates a channel that emits the provided integers
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// square receives integers, squares them, and sends them to a returned channel
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// filter receives integers and sends only those that satisfy the predicate function
func filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
out <- n
}
}
}()
return out
}
// merge combines multiple channels into a single channel
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Close the output channel when all input channels are done
go func() {
wg.Wait()
close(out)
}()
return out
}
// batchProcessor demonstrates processing data in batches
func batchProcessor(in <-chan int, batchSize int) <-chan []int {
out := make(chan []int)
go func() {
defer close(out)
batch := make([]int, 0, batchSize)
for n := range in {
batch = append(batch, n)
// When batch is full, send it and create a new one
if len(batch) == batchSize {
out <- batch
batch = make([]int, 0, batchSize)
}
}
// Send any remaining items in the last batch
if len(batch) > 0 {
out <- batch
}
}()
return out
}
func main() {
// Create a pipeline: generate -> square -> filter -> print
fmt.Println("=== Basic Pipeline ===")
c := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
c = square(c)
c = filter(c, func(n int) bool {
return n%2 == 0 // Only even numbers
})
// Consume the output
for n := range c {
fmt.Println(n)
}
// Demonstrate a more complex pipeline with multiple sources and fan-in
fmt.Println("\n=== Multi-source Pipeline with Fan-in ===")
// Create multiple sources
c1 := generator(1, 2, 3)
c2 := generator(4, 5, 6)
c3 := generator(7, 8, 9)
// Process each source
c1 = square(c1)
c2 = square(c2)
c3 = square(c3)
// Merge the results
merged := merge(c1, c2, c3)
// Consume the merged output
for n := range merged {
fmt.Println(n)
}
// Demonstrate batch processing
fmt.Println("\n=== Batch Processing Pipeline ===")
// Generate 10 random numbers
source := make(chan int)
go func() {
defer close(source)
for i := 0; i < 10; i++ {
source <- rand.Intn(100)
}
}()
// Process in batches of 3
batches := batchProcessor(source, 3)
// Consume and process batches
for batch := range batches {
fmt.Printf("Processing batch: %v\n", batch)
sum := 0
for _, n := range batch {
sum += n
}
fmt.Printf("Batch sum: %d\n", sum)
}
}
Key pipeline characteristics:
- Composability: Each stage performs a specific transformation and can be composed with other stages
- Unidirectional flow: Data flows in one direction through the pipeline
- Concurrent execution: Each stage runs in its own goroutine
- Bounded stages: Each stage only processes one item at a time, providing natural backpressure
Worker Pool Pattern
Worker pools manage a fixed number of goroutines to process tasks from a queue:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// Task represents a unit of work
type Task struct {
ID int
Data interface{}
Result interface{}
Err error
}
// WorkerPool manages a pool of workers
type WorkerPool struct {
tasks chan Task
results chan Task
concurrency int
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
// NewWorkerPool creates a new worker pool with the specified concurrency
func NewWorkerPool(concurrency int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
return &WorkerPool{
tasks: make(chan Task),
results: make(chan Task),
concurrency: concurrency,
ctx: ctx,
cancel: cancel,
}
}
// Start launches the worker pool
func (p *WorkerPool) Start() {
// Start workers
for i := 0; i < p.concurrency; i++ {
p.wg.Add(1)
go p.worker(i)
}
// Start result collector
go func() {
p.wg.Wait()
close(p.results)
}()
}
// worker processes tasks from the task channel
func (p *WorkerPool) worker(id int) {
defer p.wg.Done()
for {
select {
case task, ok := <-p.tasks:
if !ok {
return // Task channel closed
}
// Process the task
fmt.Printf("Worker %d processing task %d\n", id, task.ID)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // Simulate work
// Generate result (in a real application, this would be actual processing)
task.Result = task.Data.(int) * 2
// Send result
select {
case p.results <- task:
// Result sent successfully
case <-p.ctx.Done():
return // Context cancelled
}
case <-p.ctx.Done():
return // Context cancelled
}
}
}
// Submit adds a task to the worker pool
func (p *WorkerPool) Submit(task Task) {
select {
case p.tasks <- task:
// Task submitted successfully
case <-p.ctx.Done():
// Pool is shutting down
}
}
// Results returns the channel of completed tasks
func (p *WorkerPool) Results() <-chan Task {
return p.results
}
// Stop gracefully shuts down the worker pool
func (p *WorkerPool) Stop() {
p.cancel() // Signal all workers to stop
close(p.tasks) // Close task channel
}
// ThrottledWorkerPool extends WorkerPool with rate limiting
type ThrottledWorkerPool struct {
*WorkerPool
rate time.Duration // Minimum time between task submissions
limit chan struct{} // Semaphore for limiting concurrent tasks
}
// NewThrottledWorkerPool creates a new rate-limited worker pool
func NewThrottledWorkerPool(concurrency int, rate time.Duration, maxQueued int) *ThrottledWorkerPool {
return &ThrottledWorkerPool{
WorkerPool: NewWorkerPool(concurrency),
rate: rate,
limit: make(chan struct{}, maxQueued),
}
}
// Submit adds a task to the throttled worker pool, respecting rate limits
func (p *ThrottledWorkerPool) Submit(task Task) {
// Add to limit before submitting
p.limit <- struct{}{}
go func() {
defer func() { <-p.limit }() // Release limit when done
// Submit to underlying pool
p.WorkerPool.Submit(task)
// Enforce rate limit
time.Sleep(p.rate)
}()
}
func main() {
// Create a worker pool with 3 workers
fmt.Println("=== Basic Worker Pool ===")
pool := NewWorkerPool(3)
pool.Start()
// Submit 10 tasks
for i := 0; i < 10; i++ {
pool.Submit(Task{
ID: i,
Data: i,
})
}
// Close the task channel to signal no more tasks
pool.Stop()
// Collect results
for task := range pool.Results() {
fmt.Printf("Task %d result: %v\n", task.ID, task.Result)
}
// Demonstrate throttled worker pool
fmt.Println("\n=== Throttled Worker Pool ===")
throttled := NewThrottledWorkerPool(3, 200*time.Millisecond, 5)
throttled.Start()
// Submit tasks rapidly,
// Submit tasks rapidly, but they'll be rate-limited
for i := 0; i < 10; i++ {
throttled.Submit(Task{
ID: i,
Data: i,
})
fmt.Printf("Submitted task %d\n", i)
}
// Wait for a bit to see the rate limiting in action
time.Sleep(1 * time.Second)
// Stop the pool
throttled.Stop()
// Collect results
for task := range throttled.Results() {
fmt.Printf("Throttled task %d result: %v\n", task.ID, task.Result)
}
}
The worker pool pattern is particularly useful for:
- Controlling concurrency: Limiting the number of concurrent operations to prevent resource exhaustion
- Load balancing: Distributing work evenly across available resources
- Backpressure handling: Managing the flow of work when producers are faster than consumers
- Resource management: Efficiently utilizing system resources like CPU cores, network connections, or database connections
Channel-Based Semaphores
Channels can be used as semaphores to limit concurrent access to resources:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Semaphore represents a counting semaphore
type Semaphore chan struct{}
// Acquire n resources from the semaphore
func (s Semaphore) Acquire(n int) {
for i := 0; i < n; i++ {
s <- struct{}{}
}
}
// Release n resources back to the semaphore
func (s Semaphore) Release(n int) {
for i := 0; i < n; i++ {
<-s
}
}
// TryAcquire attempts to acquire n resources without blocking
// Returns true if successful, false if would block
func (s Semaphore) TryAcquire(n int) bool {
select {
case s <- struct{}{}:
if n > 1 {
// For multiple resources, we need to be careful
// If we can't acquire all, release what we've acquired
if !s.TryAcquire(n - 1) {
<-s // Release the one we just acquired
return false
}
}
return true
default:
return false
}
}
// NewSemaphore creates a new semaphore with the given capacity
func NewSemaphore(capacity int) Semaphore {
return make(Semaphore, capacity)
}
// simulateResourceUsage demonstrates using a semaphore to limit concurrent access
func simulateResourceUsage(id int, sem Semaphore, wg *sync.WaitGroup) {
defer wg.Done()
// Try to acquire the resource
fmt.Printf("Client %d: Attempting to acquire resource\n", id)
// Try non-blocking acquire first
if sem.TryAcquire(1) {
fmt.Printf("Client %d: Immediately acquired resource\n", id)
} else {
fmt.Printf("Client %d: Waiting for resource...\n", id)
sem.Acquire(1)
fmt.Printf("Client %d: Eventually acquired resource\n", id)
}
// Use the resource
fmt.Printf("Client %d: Using resource\n", id)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
// Release the resource
fmt.Printf("Client %d: Releasing resource\n", id)
sem.Release(1)
}
func main() {
// Create a semaphore with capacity 3
sem := NewSemaphore(3)
// Create 10 clients that will try to use the resource
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go simulateResourceUsage(i, sem, &wg)
time.Sleep(50 * time.Millisecond) // Stagger client starts
}
// Wait for all clients to finish
wg.Wait()
fmt.Println("All clients finished")
}
Channel-based semaphores are useful for:
- Connection pooling: Limiting the number of concurrent connections to a resource
- Rate limiting: Controlling the rate of operations
- Resource protection: Preventing resource exhaustion by limiting concurrent access
- Concurrency control: Implementing more complex synchronization patterns