Advanced Go Channel Patterns: Mastering Communication in Concurrent Systems
Explore sophisticated channel patterns that enable elegant, efficient communication between goroutines in complex concurrent applications.
#Channels are Go’s primary tool for goroutine communication. Most developers know the basics - sending and receiving values - but advanced patterns unlock much more powerful concurrent designs.
Beyond Basic Channels
Basic channel operations are straightforward:
ch := make(chan int)
go func() { ch <- 42 }()
value := <-ch
But real systems need more sophisticated coordination:
- Pipeline Processing: Chain operations together
- Fan-Out/Fan-In: Distribute work and collect results
- Rate Limiting: Control operation flow
- Timeouts: Handle operations that take too long
- Cancellation: Stop work when it’s no longer needed
Common Channel Pitfalls
Before diving into patterns, understand the traps:
- Deadlocks: Goroutines waiting forever for each other
- Goroutine Leaks: Forgetting to close channels or handle cancellation
- Race Conditions: Channels don’t solve all concurrency issues
- Blocking Operations: Not handling channel operations that might block
Channel Patterns You’ll Learn
This guide covers practical channel patterns:
- Select Patterns: Non-blocking operations and timeouts
- Pipeline Patterns: Chaining processing stages
- Fan Patterns: Distributing and collecting work
- Cancellation Patterns: Stopping work gracefully
- Rate Limiting: Controlling operation frequency
- Worker Pools: Managing goroutines efficiently
Each pattern includes examples showing when and how to use it effectively.
When to Use Channels vs Sync Primitives
Channels excel at:
- Passing data between goroutines
- Coordinating goroutine lifecycles
- Implementing timeouts and cancellation
- Building processing pipelines
Use sync primitives (mutexes, etc.) for:
- Protecting shared state
- Simple coordination (WaitGroup)
- Performance-critical sections
Channel Fundamentals and Best Practices
Before diving into advanced patterns, it’s crucial to establish a solid understanding of channel fundamentals and best practices. These core concepts form the foundation upon which more complex patterns are built.
Channel Types and Directionality
Go channels can be bidirectional or unidirectional, with the latter providing important compile-time safety guarantees:
package main
import (
"fmt"
"time"
)
// produceValues demonstrates a function that only sends on a channel
func produceValues(ch chan<- int) {
for i := 0; i < 5; i++ {
fmt.Printf("Sending: %d\n", i)
ch <- i
time.Sleep(100 * time.Millisecond)
}
close(ch) // Producer is responsible for closing
}
// consumeValues demonstrates a function that only receives from a channel
func consumeValues(ch <-chan int) {
// Using range loop automatically handles channel closure
for val := range ch {
fmt.Printf("Received: %d\n", val)
}
}
func main() {
// Create a bidirectional channel
ch := make(chan int)
// Start producer and consumer goroutines
go produceValues(ch) // Channel converted to send-only
go consumeValues(ch) // Channel converted to receive-only
// Wait for completion
time.Sleep(1 * time.Second)
fmt.Println("Done")
}
This example demonstrates several best practices:
- Explicit directionality: Functions declare whether they intend to send (
chan<-
) or receive (<-chan
), making the code’s intent clear and preventing accidental misuse. - Producer responsibility: The producer (sender) is responsible for closing the channel when no more values will be sent.
- Range loop for consumers: Using
range
to receive values automatically handles channel closure.
Buffered vs. Unbuffered Channels
The choice between buffered and unbuffered channels significantly impacts program behavior:
package main
import (
"fmt"
"time"
)
func bufferingDemo() {
fmt.Println("Unbuffered channel demonstration:")
unbuffered := make(chan int)
go func() {
fmt.Println("Sender: Attempting to send")
unbuffered <- 42
fmt.Println("Sender: Send completed")
}()
// Give sender time to attempt sending
time.Sleep(100 * time.Millisecond)
fmt.Println("Receiver: About to receive")
value := <-unbuffered
fmt.Printf("Receiver: Received value %d\n", value)
fmt.Println("\nBuffered channel demonstration:")
buffered := make(chan int, 2)
go func() {
for i := 0; i < 3; i++ {
fmt.Printf("Sender: Sending value %d\n", i)
buffered <- i
fmt.Printf("Sender: Sent value %d\n", i)
}
}()
// Give sender time to send values
time.Sleep(100 * time.Millisecond)
for i := 0; i < 3; i++ {
fmt.Println("Receiver: About to receive")
value := <-buffered
fmt.Printf("Receiver: Received value %d\n", value)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
bufferingDemo()
time.Sleep(500 * time.Millisecond) // Ensure all output is printed
}
Key differences to understand:
- Unbuffered channels (capacity 0) synchronize the sender and receiver—the sender blocks until a receiver is ready to receive the value.
- Buffered channels allow senders to proceed without an immediate receiver, up to the buffer’s capacity.
- Blocking behavior: Once a buffered channel is full, senders block until space becomes available.
Channel Closure and the nil Channel
Understanding channel closure and nil channel behavior is critical for advanced patterns:
package main
import (
"fmt"
"time"
)
func channelClosureDemo() {
ch := make(chan int)
// Sender goroutine
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
fmt.Println("Sender: Channel closed")
}()
// Receiver loop - continues until channel is closed
for {
value, ok := <-ch
if !ok {
fmt.Println("Receiver: Channel closed detected")
break
}
fmt.Printf("Receiver: Got value %d\n", value)
}
// Demonstrate behavior of closed and nil channels
closedCh := make(chan int)
close(closedCh)
// Reading from closed channel returns zero value
val, ok := <-closedCh
fmt.Printf("Reading from closed channel: value=%d, ok=%v\n", val, ok)
// Writing to closed channel panics
// closedCh <- 1 // This would panic
// Nil channel operations block forever
var nilCh chan int // nil channel
// Demonstrating nil channel with timeout
go func() {
fmt.Println("Attempting to read from nil channel (will block forever)")
// <-nilCh // This would block forever
}()
}
func main() {
channelClosureDemo()
time.Sleep(500 * time.Millisecond) // Ensure all output is printed
}
Important principles:
- Closed channel behavior:
- Reading from a closed channel returns the zero value and
ok=false
- Writing to a closed channel causes a panic
- Closing an already closed channel causes a panic
- Reading from a closed channel returns the zero value and
- Nil channel behavior:
- Operations on nil channels block forever
- This property is useful in select statements for disabling cases
Channel Ownership Principles
Clear channel ownership is essential for preventing concurrency bugs:
package main
import (
"fmt"
"sync"
)
// ChannelOwner demonstrates the channel ownership pattern
type ChannelOwner struct {
values chan int
done chan struct{}
}
// NewChannelOwner creates and returns a new ChannelOwner
// The constructor is the only place where the channels are created
func NewChannelOwner() *ChannelOwner {
return &ChannelOwner{
values: make(chan int),
done: make(chan struct{}),
}
}
// Start begins producing values and returns a receive-only channel
// The owner starts the producer goroutine and manages its lifecycle
func (co *ChannelOwner) Start() <-chan int {
go func() {
defer close(co.values) // Owner ensures channel is closed
for i := 0; i < 5; i++ {
select {
case co.values <- i:
// Value sent successfully
case <-co.done:
fmt.Println("Producer received cancellation signal")
return
}
}
}()
return co.values // Return receive-only channel to consumers
}
// Stop signals the producer to stop and cleans up resources
func (co *ChannelOwner) Stop() {
close(co.done)
}
func main() {
// Create owner and start production
owner := NewChannelOwner()
valuesCh := owner.Start()
// Consume values
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for value := range valuesCh {
fmt.Printf("Received: %d\n", value)
}
fmt.Println("Consumer finished")
}()
// Let it run for a bit, then stop
// In a real application, this might be triggered by a timeout or user action
for i := 0; i < 3; i++ {
<-valuesCh
}
owner.Stop()
wg.Wait()
}
Key ownership principles:
- Single writer principle: Only one goroutine should write to a channel
- Clear ownership: The owner creates, writes to, and closes the channel
- Consumers only read: Consumers should only read from channels, never close them
- Encapsulation: Hide channel creation and management inside constructors and methods
These fundamentals provide the foundation for the advanced patterns we’ll explore next. By adhering to these principles, you can avoid many common concurrency pitfalls and build more reliable concurrent systems.
Advanced Channel Patterns
Building on the fundamentals, we can now explore more sophisticated channel patterns that solve complex concurrency challenges. These patterns provide reusable solutions for common concurrent programming scenarios.
Select Statement Patterns
The select
statement is one of Go’s most powerful concurrency primitives, enabling non-blocking operations and coordination between multiple channels:
package main
import (
"fmt"
"math/rand"
"time"
)
// timeoutOperation demonstrates using select for timeouts
func timeoutOperation() {
ch := make(chan string)
go func() {
// Simulate work that takes random time
delay := time.Duration(rand.Intn(500)) * time.Millisecond
time.Sleep(delay)
ch <- fmt.Sprintf("Operation completed in %v", delay)
}()
// Wait for result or timeout
select {
case result := <-ch:
fmt.Println("Success:", result)
case <-time.After(300 * time.Millisecond):
fmt.Println("Operation timed out")
}
}
// nonBlockingReceive demonstrates non-blocking channel operations
func nonBlockingReceive(ch chan string) {
select {
case msg := <-ch:
fmt.Println("Received message:", msg)
default:
fmt.Println("No message available")
}
}
// nonBlockingSend demonstrates non-blocking send operation
func nonBlockingSend(ch chan string, msg string) {
select {
case ch <- msg:
fmt.Println("Sent message:", msg)
default:
fmt.Println("Cannot send message, channel full or no receivers")
}
}
// prioritySelect demonstrates channel priority using select
func prioritySelect(high, medium, low chan string) {
for {
select {
case msg := <-high:
fmt.Println("High priority:", msg)
return
default:
// Continue to nested select
}
select {
case msg := <-high:
fmt.Println("High priority:", msg)
return
case msg := <-medium:
fmt.Println("Medium priority:", msg)
return
default:
// Continue to final select
}
select {
case msg := <-high:
fmt.Println("High priority:", msg)
case msg := <-medium:
fmt.Println("Medium priority:", msg)
case msg := <-low:
fmt.Println("Low priority:", msg)
case <-time.After(500 * time.Millisecond):
fmt.Println("Timed out waiting for messages")
}
return
}
}
// dynamicSelectCases demonstrates how to handle a dynamic number of channels
func dynamicSelectCases(channels []chan int) {
cases := make([]reflect.SelectCase, len(channels))
for i, ch := range channels {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
}
}
// Add timeout case
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(time.After(1 * time.Second)),
})
// Wait for any channel to receive
chosen, value, ok := reflect.Select(cases)
if chosen == len(cases)-1 {
fmt.Println("Timed out")
return
}
if ok {
fmt.Printf("Received value %v from channel %d\n", value.Int(), chosen)
} else {
fmt.Printf("Channel %d was closed\n", chosen)
}
}
func main() {
// Demonstrate timeout pattern
fmt.Println("=== Timeout Pattern ===")
timeoutOperation()
// Demonstrate non-blocking operations
fmt.Println("\n=== Non-blocking Operations ===")
ch := make(chan string, 1)
nonBlockingReceive(ch)
nonBlockingSend(ch, "Hello")
nonBlockingReceive(ch)
nonBlockingSend(ch, "World") // Channel is now full
// Demonstrate priority selection
fmt.Println("\n=== Priority Selection ===")
high := make(chan string, 1)
medium := make(chan string, 1)
low := make(chan string, 1)
// Try different scenarios
medium <- "Medium message"
prioritySelect(high, medium, low)
// For dynamic select case demo, we need the reflect package
fmt.Println("\n=== Dynamic Select Cases ===")
channels := make([]chan int, 3)
for i := range channels {
channels[i] = make(chan int)
i := i // Create new variable to avoid closure problem
go func() {
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
channels[i] <- i
}()
}
// Import reflect package at the top if running this code
// dynamicSelectCases(channels)
fmt.Println("(Dynamic select cases require the reflect package)")
}
Key select patterns:
- Timeout pattern: Combine a work channel with
time.After()
to implement timeouts - Non-blocking operations: Use the
default
case to make channel operations non-blocking - Priority selection: Nest multiple select statements to implement channel priority
- Dynamic cases: Use the
reflect
package to handle a dynamic number of channels
Fan-Out and Fan-In Patterns
Fan-out/fan-in is a powerful pattern for parallel processing that distributes work across multiple goroutines and then consolidates the results:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// worker processes items from input and sends results to output
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // Simulate work
results <- job * 2 // Send result
}
}
// fanOut distributes work across multiple workers
func fanOut(jobs <-chan int, numWorkers int) <-chan int {
results := make(chan int)
var wg sync.WaitGroup
// Start workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Close results channel when all workers are done
go func() {
wg.Wait()
close(results)
fmt.Println("All workers completed")
}()
return results
}
// fanIn merges multiple channels into a single channel
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
multiplexed := make(chan int)
// Function to forward values from input channel to multiplexed channel
forward := func(ch <-chan int) {
defer wg.Done()
for val := range ch {
multiplexed <- val
}
}
// Start a goroutine for each input channel
wg.Add(len(channels))
for _, ch := range channels {
go forward(ch)
}
// Close multiplexed channel when all input channels are done
go func() {
wg.Wait()
close(multiplexed)
}()
return multiplexed
}
// fanInReflect uses reflection to merge channels dynamically
func fanInReflect(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, c := range channels {
go func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Create jobs channel and send jobs
jobs := make(chan int, 10)
for i := 1; i <= 10; i++ {
jobs <- i
}
close(jobs)
// Fan out to 3 workers
fmt.Println("Starting fan-out with 3 workers")
results := fanOut(jobs, 3)
// Collect and print results
for result := range results {
fmt.Printf("Got result: %d\n", result)
}
// Demonstrate fan-in with multiple channels
fmt.Println("\nDemonstrating fan-in pattern")
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
// Send values on each channel
go func() {
for i := 1; i <= 3; i++ {
ch1 <- i * 10
time.Sleep(100 * time.Millisecond)
}
close(ch1)
}()
go func() {
for i := 1; i <= 3; i++ {
ch2 <- i * 100
time.Sleep(150 * time.Millisecond)
}
close(ch2)
}()
go func() {
for i := 1; i <= 3; i++ {
ch3 <- i * 1000
time.Sleep(80 * time.Millisecond)
}
close(ch3)
}()
// Fan-in the channels
merged := fanIn(ch1, ch2, ch3)
// Print merged results
for result := range merged {
fmt.Printf("Merged result: %d\n", result)
}
}
The fan-out/fan-in pattern is particularly useful for:
- CPU-bound tasks: Distributing computation across multiple cores
- I/O-bound tasks: Managing multiple concurrent I/O operations
- Rate limiting: Controlling the degree of parallelism
- Batch processing: Processing large datasets in parallel chunks
Pipeline Pattern
Pipelines compose a series of processing stages connected by channels, allowing data to flow through multiple transformations:
package main
import (
"fmt"
"math/rand"
"sync"
)
// generator creates a channel that emits the provided integers
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// square receives integers, squares them, and sends them to a returned channel
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// filter receives integers and sends only those that satisfy the predicate function
func filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
out <- n
}
}
}()
return out
}
// merge combines multiple channels into a single channel
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Close the output channel when all input channels are done
go func() {
wg.Wait()
close(out)
}()
return out
}
// batchProcessor demonstrates processing data in batches
func batchProcessor(in <-chan int, batchSize int) <-chan []int {
out := make(chan []int)
go func() {
defer close(out)
batch := make([]int, 0, batchSize)
for n := range in {
batch = append(batch, n)
// When batch is full, send it and create a new one
if len(batch) == batchSize {
out <- batch
batch = make([]int, 0, batchSize)
}
}
// Send any remaining items in the last batch
if len(batch) > 0 {
out <- batch
}
}()
return out
}
func main() {
// Create a pipeline: generate -> square -> filter -> print
fmt.Println("=== Basic Pipeline ===")
c := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
c = square(c)
c = filter(c, func(n int) bool {
return n%2 == 0 // Only even numbers
})
// Consume the output
for n := range c {
fmt.Println(n)
}
// Demonstrate a more complex pipeline with multiple sources and fan-in
fmt.Println("\n=== Multi-source Pipeline with Fan-in ===")
// Create multiple sources
c1 := generator(1, 2, 3)
c2 := generator(4, 5, 6)
c3 := generator(7, 8, 9)
// Process each source
c1 = square(c1)
c2 = square(c2)
c3 = square(c3)
// Merge the results
merged := merge(c1, c2, c3)
// Consume the merged output
for n := range merged {
fmt.Println(n)
}
// Demonstrate batch processing
fmt.Println("\n=== Batch Processing Pipeline ===")
// Generate 10 random numbers
source := make(chan int)
go func() {
defer close(source)
for i := 0; i < 10; i++ {
source <- rand.Intn(100)
}
}()
// Process in batches of 3
batches := batchProcessor(source, 3)
// Consume and process batches
for batch := range batches {
fmt.Printf("Processing batch: %v\n", batch)
sum := 0
for _, n := range batch {
sum += n
}
fmt.Printf("Batch sum: %d\n", sum)
}
}
Key pipeline characteristics:
- Composability: Each stage performs a specific transformation and can be composed with other stages
- Unidirectional flow: Data flows in one direction through the pipeline
- Concurrent execution: Each stage runs in its own goroutine
- Bounded stages: Each stage only processes one item at a time, providing natural backpressure
Worker Pool Pattern
Worker pools manage a fixed number of goroutines to process tasks from a queue:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// Task represents a unit of work
type Task struct {
ID int
Data interface{}
Result interface{}
Err error
}
// WorkerPool manages a pool of workers
type WorkerPool struct {
tasks chan Task
results chan Task
concurrency int
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
// NewWorkerPool creates a new worker pool with the specified concurrency
func NewWorkerPool(concurrency int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
return &WorkerPool{
tasks: make(chan Task),
results: make(chan Task),
concurrency: concurrency,
ctx: ctx,
cancel: cancel,
}
}
// Start launches the worker pool
func (p *WorkerPool) Start() {
// Start workers
for i := 0; i < p.concurrency; i++ {
p.wg.Add(1)
go p.worker(i)
}
// Start result collector
go func() {
p.wg.Wait()
close(p.results)
}()
}
// worker processes tasks from the task channel
func (p *WorkerPool) worker(id int) {
defer p.wg.Done()
for {
select {
case task, ok := <-p.tasks:
if !ok {
return // Task channel closed
}
// Process the task
fmt.Printf("Worker %d processing task %d\n", id, task.ID)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // Simulate work
// Generate result (in a real application, this would be actual processing)
task.Result = task.Data.(int) * 2
// Send result
select {
case p.results <- task:
// Result sent successfully
case <-p.ctx.Done():
return // Context cancelled
}
case <-p.ctx.Done():
return // Context cancelled
}
}
}
// Submit adds a task to the worker pool
func (p *WorkerPool) Submit(task Task) {
select {
case p.tasks <- task:
// Task submitted successfully
case <-p.ctx.Done():
// Pool is shutting down
}
}
// Results returns the channel of completed tasks
func (p *WorkerPool) Results() <-chan Task {
return p.results
}
// Stop gracefully shuts down the worker pool
func (p *WorkerPool) Stop() {
p.cancel() // Signal all workers to stop
close(p.tasks) // Close task channel
}
// ThrottledWorkerPool extends WorkerPool with rate limiting
type ThrottledWorkerPool struct {
*WorkerPool
rate time.Duration // Minimum time between task submissions
limit chan struct{} // Semaphore for limiting concurrent tasks
}
// NewThrottledWorkerPool creates a new rate-limited worker pool
func NewThrottledWorkerPool(concurrency int, rate time.Duration, maxQueued int) *ThrottledWorkerPool {
return &ThrottledWorkerPool{
WorkerPool: NewWorkerPool(concurrency),
rate: rate,
limit: make(chan struct{}, maxQueued),
}
}
// Submit adds a task to the throttled worker pool, respecting rate limits
func (p *ThrottledWorkerPool) Submit(task Task) {
// Add to limit before submitting
p.limit <- struct{}{}
go func() {
defer func() { <-p.limit }() // Release limit when done
// Submit to underlying pool
p.WorkerPool.Submit(task)
// Enforce rate limit
time.Sleep(p.rate)
}()
}
func main() {
// Create a worker pool with 3 workers
fmt.Println("=== Basic Worker Pool ===")
pool := NewWorkerPool(3)
pool.Start()
// Submit 10 tasks
for i := 0; i < 10; i++ {
pool.Submit(Task{
ID: i,
Data: i,
})
}
// Close the task channel to signal no more tasks
pool.Stop()
// Collect results
for task := range pool.Results() {
fmt.Printf("Task %d result: %v\n", task.ID, task.Result)
}
// Demonstrate throttled worker pool
fmt.Println("\n=== Throttled Worker Pool ===")
throttled := NewThrottledWorkerPool(3, 200*time.Millisecond, 5)
throttled.Start()
// Submit tasks rapidly,
// Submit tasks rapidly, but they'll be rate-limited
for i := 0; i < 10; i++ {
throttled.Submit(Task{
ID: i,
Data: i,
})
fmt.Printf("Submitted task %d\n", i)
}
// Wait for a bit to see the rate limiting in action
time.Sleep(1 * time.Second)
// Stop the pool
throttled.Stop()
// Collect results
for task := range throttled.Results() {
fmt.Printf("Throttled task %d result: %v\n", task.ID, task.Result)
}
}
The worker pool pattern is particularly useful for:
- Controlling concurrency: Limiting the number of concurrent operations to prevent resource exhaustion
- Load balancing: Distributing work evenly across available resources
- Backpressure handling: Managing the flow of work when producers are faster than consumers
- Resource management: Efficiently utilizing system resources like CPU cores, network connections, or database connections
Channel-Based Semaphores
Channels can be used as semaphores to limit concurrent access to resources:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Semaphore represents a counting semaphore
type Semaphore chan struct{}
// Acquire n resources from the semaphore
func (s Semaphore) Acquire(n int) {
for i := 0; i < n; i++ {
s <- struct{}{}
}
}
// Release n resources back to the semaphore
func (s Semaphore) Release(n int) {
for i := 0; i < n; i++ {
<-s
}
}
// TryAcquire attempts to acquire n resources without blocking
// Returns true if successful, false if would block
func (s Semaphore) TryAcquire(n int) bool {
select {
case s <- struct{}{}:
if n > 1 {
// For multiple resources, we need to be careful
// If we can't acquire all, release what we've acquired
if !s.TryAcquire(n - 1) {
<-s // Release the one we just acquired
return false
}
}
return true
default:
return false
}
}
// NewSemaphore creates a new semaphore with the given capacity
func NewSemaphore(capacity int) Semaphore {
return make(Semaphore, capacity)
}
// simulateResourceUsage demonstrates using a semaphore to limit concurrent access
func simulateResourceUsage(id int, sem Semaphore, wg *sync.WaitGroup) {
defer wg.Done()
// Try to acquire the resource
fmt.Printf("Client %d: Attempting to acquire resource\n", id)
// Try non-blocking acquire first
if sem.TryAcquire(1) {
fmt.Printf("Client %d: Immediately acquired resource\n", id)
} else {
fmt.Printf("Client %d: Waiting for resource...\n", id)
sem.Acquire(1)
fmt.Printf("Client %d: Eventually acquired resource\n", id)
}
// Use the resource
fmt.Printf("Client %d: Using resource\n", id)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
// Release the resource
fmt.Printf("Client %d: Releasing resource\n", id)
sem.Release(1)
}
func main() {
// Create a semaphore with capacity 3
sem := NewSemaphore(3)
// Create 10 clients that will try to use the resource
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go simulateResourceUsage(i, sem, &wg)
time.Sleep(50 * time.Millisecond) // Stagger client starts
}
// Wait for all clients to finish
wg.Wait()
fmt.Println("All clients finished")
}
Channel-based semaphores are useful for:
- Connection pooling: Limiting the number of concurrent connections to a resource
- Rate limiting: Controlling the rate of operations
- Resource protection: Preventing resource exhaustion by limiting concurrent access
- Concurrency control: Implementing more complex synchronization patterns
Channel Orchestration Techniques
Beyond individual patterns, channels can be orchestrated to create sophisticated concurrent systems. These techniques combine multiple patterns to solve complex coordination problems.
Timeout and Cancellation Patterns
Proper timeout and cancellation handling is essential for robust concurrent systems:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// timeoutOperation demonstrates a simple timeout pattern
func timeoutOperation(timeout time.Duration) (result string, err error) {
// Create a channel for the operation result
resultCh := make(chan string, 1)
// Start the operation in a goroutine
go func() {
// Simulate work
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
resultCh <- "Operation completed successfully"
}()
// Wait for result or timeout
select {
case result = <-resultCh:
return result, nil
case <-time.After(timeout):
return "", fmt.Errorf("operation timed out after %v", timeout)
}
}
// contextAwareOperation demonstrates using context for cancellation
func contextAwareOperation(ctx context.Context) (string, error) {
// Create a channel for the operation result
resultCh := make(chan string, 1)
// Start the operation in a goroutine
go func() {
// Simulate work with periodic cancellation checks
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
// Context was cancelled, abort operation
return
case <-time.After(200 * time.Millisecond):
// Continue working
fmt.Println("Operation in progress...")
}
}
resultCh <- "Operation completed successfully"
}()
// Wait for result or cancellation
select {
case result := <-resultCh:
return result, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
// multiStageTimeout demonstrates different timeouts for different stages
func multiStageTimeout() error {
// Stage 1: Connect (short timeout)
connectCtx, cancelConnect := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancelConnect()
fmt.Println("Stage 1: Connecting...")
if err := stageOperation(connectCtx, "connect", 300); err != nil {
return fmt.Errorf("connect failed: %w", err)
}
// Stage 2: Process (longer timeout)
processCtx, cancelProcess := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelProcess()
fmt.Println("Stage 2: Processing...")
if err := stageOperation(processCtx, "process", 1000); err != nil {
return fmt.Errorf("process failed: %w", err)
}
// Stage 3: Finalize (medium timeout)
finalizeCtx, cancelFinalize := context.WithTimeout(context.Background(), 1*time.Second)
defer cancelFinalize()
fmt.Println("Stage 3: Finalizing...")
if err := stageOperation(finalizeCtx, "finalize", 500); err != nil {
return fmt.Errorf("finalize failed: %w", err)
}
fmt.Println("All stages completed successfully")
return nil
}
// stageOperation simulates a stage in a multi-stage operation
func stageOperation(ctx context.Context, name string, maxDuration int) error {
// Simulate work with random duration
duration := time.Duration(rand.Intn(maxDuration)) * time.Millisecond
select {
case <-time.After(duration):
fmt.Printf("Stage '%s' completed in %v\n", name, duration)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// propagatingCancellation demonstrates how cancellation propagates through a pipeline
func propagatingCancellation() {
// Create a cancellable context
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Ensure cancellation in all cases
// Create a pipeline with this context
nums := generateNumbers(ctx, 1, 100)
squares := squareNumbers(ctx, nums)
results := filterNumbers(ctx, squares, func(n int) bool {
return n%10 == 0 // Only numbers divisible by 10
})
// Process results for a while, then cancel
go func() {
time.Sleep(500 * time.Millisecond)
fmt.Println("Cancelling pipeline...")
cancel()
}()
// Consume results until cancellation
for result := range results {
fmt.Printf("Result: %d\n", result)
time.Sleep(100 * time.Millisecond) // Slow consumer
}
fmt.Println("Pipeline terminated")
}
// generateNumbers produces a sequence of integers, respecting context cancellation
func generateNumbers(ctx context.Context, start, end int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := start; i <= end; i++ {
select {
case <-ctx.Done():
fmt.Println("Generator cancelled")
return
case out <- i:
// Successfully sent value
}
}
}()
return out
}
// squareNumbers transforms input numbers by squaring them
func squareNumbers(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-ctx.Done():
fmt.Println("Square operation cancelled")
return
case out <- n * n:
// Successfully sent value
}
}
}()
return out
}
// filterNumbers filters numbers based on a predicate function
func filterNumbers(ctx context.Context, in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
select {
case <-ctx.Done():
fmt.Println("Filter operation cancelled")
return
case out <- n:
// Successfully sent value
}
}
}
}()
return out
}
func main() {
// Demonstrate simple timeout
fmt.Println("=== Simple Timeout Pattern ===")
result, err := timeoutOperation(500 * time.Millisecond)
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Printf("Result: %s\n", result)
}
// Demonstrate context cancellation
fmt.Println("\n=== Context Cancellation Pattern ===")
ctx, cancel := context.WithTimeout(context.Background(), 700*time.Millisecond)
defer cancel()
result, err = contextAwareOperation(ctx)
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Printf("Result: %s\n", result)
}
// Demonstrate multi-stage timeout
fmt.Println("\n=== Multi-stage Timeout Pattern ===")
if err := multiStageTimeout(); err != nil {
fmt.Printf("Error: %v\n", err)
}
// Demonstrate cancellation propagation
fmt.Println("\n=== Cancellation Propagation Pattern ===")
propagatingCancellation()
}
Key timeout and cancellation patterns:
- Simple timeout: Using
select
withtime.After
to implement timeouts - Context-based cancellation: Using Go’s
context
package for cancellation propagation - Multi-stage timeouts: Different timeouts for different stages of an operation
- Cancellation propagation: Ensuring cancellation signals flow through all parts of a system
Coordinating Multiple Goroutines
Complex concurrent systems often require sophisticated coordination between multiple goroutines:
package main
import (
"fmt"
"sync"
"time"
)
// Barrier synchronizes multiple goroutines to a common point
type Barrier struct {
count int
mutex sync.Mutex
cond *sync.Cond
required int
}
// NewBarrier creates a new barrier that waits for the required number of goroutines
func NewBarrier(required int) *Barrier {
b := &Barrier{required: required}
b.cond = sync.NewCond(&b.mutex)
return b
}
// Wait blocks until all goroutines have reached the barrier
func (b *Barrier) Wait() {
b.mutex.Lock()
defer b.mutex.Unlock()
b.count++
if b.count == b.required {
// Last goroutine to arrive, reset and broadcast
b.count = 0
b.cond.Broadcast()
} else {
// Wait for all goroutines to arrive
b.cond.Wait()
}
}
// WorkGroup coordinates multiple goroutines with phases
type WorkGroup struct {
size int
wg sync.WaitGroup
startChan chan struct{}
doneChan chan struct{}
barriers []*Barrier
}
// NewWorkGroup creates a new work group for coordinating goroutines
func NewWorkGroup(size int, phases int) *WorkGroup {
wg := &WorkGroup{
size: size,
startChan: make(chan struct{}),
doneChan: make(chan struct{}),
barriers: make([]*Barrier, phases),
}
// Create barriers for each phase
for i := 0; i < phases; i++ {
wg.barriers[i] = NewBarrier(size)
}
return wg
}
// Start launches the work group
func (wg *WorkGroup) Start(work func(id, phase int)) {
// Start all workers
wg.wg.Add(wg.size)
for i := 0; i < wg.size; i++ {
go func(id int) {
defer wg.wg.Done()
// Wait for start signal
<-wg.startChan
// Execute work through all phases
for phase := 0; phase < len(wg.barriers); phase++ {
work(id, phase)
wg.barriers[phase].Wait() // Synchronize at barrier
}
}(i)
}
// Signal all goroutines to start
close(wg.startChan)
// Wait for all to complete in a separate goroutine
go func() {
wg.wg.Wait()
close(wg.doneChan)
}()
}
// Wait blocks until all work is complete
func (wg *WorkGroup) Wait() {
<-wg.doneChan
}
// Rendezvous coordinates pairs of goroutines
type Rendezvous struct {
firstArrived chan int
secondArrived chan struct{}
}
// NewRendezvous creates a new rendezvous point
func NewRendezvous() *Rendezvous {
return &Rendezvous{
firstArrived: make(chan int),
secondArrived: make(chan struct{}),
}
}
// Arrive waits for a pair of goroutines to meet
// Returns the ID of the first to arrive if this is the second arrival
func (r *Rendezvous) Arrive(id int) (int, bool) {
select {
case r.firstArrived <- id:
// We're first to arrive, wait for second
<-r.secondArrived
return 0, false
case firstID := <-r.firstArrived:
// We're second to arrive, signal first
close(r.secondArrived)
return firstID, true
}
}
func main() {
// Demonstrate barrier synchronization
fmt.Println("=== Barrier Synchronization ===")
barrier := NewBarrier(3)
for i := 0; i < 3; i++ {
go func(id int) {
fmt.Printf("Goroutine %d starting\n", id)
time.Sleep(time.Duration(id*300) * time.Millisecond) // Different work times
fmt.Printf("Goroutine %d arriving at barrier\n", id)
barrier.Wait()
fmt.Printf("Goroutine %d continuing after barrier\n", id)
}(i)
}
// Demonstrate phased work group
fmt.Println("\n=== Phased Work Group ===")
workGroup := NewWorkGroup(4, 3)
workGroup.Start(func(id, phase int) {
fmt.Printf("Worker %d executing phase %d\n", id, phase)
time.Sleep(time.Duration(100*(id+phase)) * time.Millisecond)
fmt.Printf("Worker %d completed phase %d\n", id, phase)
})
workGroup.Wait()
fmt.Println("All workers completed all phases")
// Demonstrate rendezvous pattern
fmt.Println("\n=== Rendezvous Pattern ===")
rendezvous := NewRendezvous()
go func() {
fmt.Println("Goroutine A starting")
time.Sleep(300 * time.Millisecond)
fmt.Println("Goroutine A arriving at rendezvous")
otherID, isSecond := rendezvous.Arrive(1)
if isSecond {
fmt.Printf("Goroutine A met with Goroutine %d\n", otherID)
} else {
fmt.Println("Goroutine A continuing after rendezvous")
}
}()
go func() {
fmt.Println("Goroutine B starting")
time.Sleep(100 * time.Millisecond)
fmt.Println("Goroutine B arriving at rendezvous")
otherID, isSecond := rendezvous.Arrive(2)
if isSecond {
fmt.Printf("Goroutine B met with Goroutine %d\n", otherID)
} else {
fmt.Println("Goroutine B continuing after rendezvous")
}
}()
// Wait for demonstration to complete
time.Sleep(1 * time.Second)
}
These coordination techniques enable:
- Phased execution: Coordinating multiple goroutines through distinct phases
- Barrier synchronization: Ensuring all goroutines reach a common point before proceeding
- Rendezvous: Coordinating pairs of goroutines to meet at a common point
- Work distribution: Efficiently distributing work across multiple goroutines
Multiplexing and Demultiplexing Channels
Channel multiplexing combines multiple input channels into a single output channel, while demultiplexing does the reverse:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// multiplexChannels combines multiple input channels into a single output channel
func multiplexChannels(inputs ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
// Start a goroutine for each input channel
for i, ch := range inputs {
wg.Add(1)
go func(id int, ch <-chan int) {
defer wg.Done()
for val := range ch {
fmt.Printf("Multiplexer receiving from input %d: %d\n", id, val)
output <- val
}
}(i, ch)
}
// Close the output channel when all input channels are done
go func() {
wg.Wait()
close(output)
fmt.Println("All input channels closed, multiplexer shutting down")
}()
return output
}
// demultiplexChannel splits a single input channel into multiple output channels
func demultiplexChannel(input <-chan int, n int) []<-chan int {
outputs := make([]chan int, n)
for i := range outputs {
outputs[i] = make(chan int)
}
// Convert to read-only channels for return
readOnlyOutputs := make([]<-chan int, n)
for i, ch := range outputs {
readOnlyOutputs[i] = ch
}
// Start the demultiplexer
go func() {
defer func() {
for _, ch := range outputs {
close(ch)
}
fmt.Println("Demultiplexer shutting down, all output channels closed")
}()
// Round-robin distribution
for val := range input {
// Determine which output channel to use based on the value
outChan := val % n
fmt.Printf("Demultiplexer sending %d to output %d\n", val, outChan)
outputs[outChan] <- val
}
}()
return readOnlyOutputs
}
// contentBasedDemux routes messages based on their content
func contentBasedDemux(input <-chan int, predicates ...func(int) bool) []<-chan int {
outputs := make([]chan int, len(predicates))
for i := range outputs {
outputs[i] = make(chan int)
}
// Convert to read-only channels for return
readOnlyOutputs := make([]<-chan int, len(predicates))
for i, ch := range outputs {
readOnlyOutputs[i] = ch
}
// Start the content-based router
go func() {
defer func() {
for _, ch := range outputs {
close(ch)
}
fmt.Println("Content router shutting down, all output channels closed")
}()
for val := range input {
// Send to all matching outputs
for i, predicate := range predicates {
if predicate(val) {
fmt.Printf("Router sending %d to output %d\n", val, i)
outputs[i] <- val
}
}
}
}()
return readOnlyOutputs
}
// broadcastChannel sends each input value to all output channels
func broadcastChannel(input <-chan int, n int) []<-chan int {
outputs := make([]chan int, n)
for i := range outputs {
outputs[i] = make(chan int, 5) // Buffer to prevent blocking
}
// Convert to read-only channels for return
readOnlyOutputs := make([]<-chan int, n)
for i, ch := range outputs {
readOnlyOutputs[i] = ch
}
// Start the broadcaster
go func() {
defer func() {
for _, ch := range outputs {
close(ch)
}
fmt.Println("Broadcaster shutting down, all output channels closed")
}()
for val := range input {
fmt.Printf("Broadcasting value: %d\n", val)
for i, ch := range outputs {
select {
case ch <- val:
// Value sent successfully
default:
// Channel buffer full, log and continue
fmt.Printf("Warning: Output %d buffer full, dropping value %d\n", i, val)
}
}
}
}()
return readOnlyOutputs
}
func main() {
// Create input channels for multiplexing demo
fmt.Println("=== Channel Multiplexing ===")
inputs := make([]chan int, 3)
for i := range inputs {
inputs[i] = make(chan int)
i := i // Create new variable to avoid closure problem
// Start producer for each input channel
go func() {
defer close(inputs[i])
for j := 0; j < 3; j++ {
val := i*10 + j
fmt.Printf("Producer %d sending: %d\n", i, val)
inputs[i] <- val
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}()
}
// Convert to read-only channels for multiplexing
readOnlyInputs := make([]<-chan int, len(inputs))
for i, ch := range inputs {
readOnlyInputs[i] = ch
}
// Multiplex the channels
multiplexed := multiplexChannels(readOnlyInputs...)
// Consume multiplexed output
go func() {
for val := range multiplexed {
fmt.Printf("Consumer received from multiplexed channel: %d\n", val)
}
}()
// Wait for multiplexing demo to complete
time.Sleep(2 * time.Second)
// Demonstrate demultiplexing
fmt.Println("\n=== Channel Demultiplexing ===")
input := make(chan int)
// Start producer for input channel
go func() {
defer close(input)
for i := 0; i < 10; i++ {
fmt.Printf("Demux producer sending: %d\n", i)
input <- i
time.Sleep(100 * time.Millisecond)
}
}()
// Demultiplex into 3 output channels
outputs := demultiplexChannel(input, 3)
// Start consumers for each output channel
var wg sync.WaitGroup
for i, ch := range outputs {
wg.Add(1)
i := i // Create new variable to avoid closure problem
ch := ch
go func() {
defer wg.Done()
for val := range ch {
fmt.Printf("Demux consumer %d received: %d\n", i, val)
}
}()
}
// Wait for demultiplexing demo to complete
time.Sleep(2 * time.Second)
// Demonstrate content-based routing
fmt.Println("\n=== Content-Based Routing ===")
routerInput := make(chan int)
// Define predicates for routing
isEven := func(n int) bool { return n%2 == 0 }
isDivisibleBy3 := func(n int) bool { return n%3 == 0 }
isGreaterThan5 := func(n int) bool { return n > 5 }
// Create router
routedOutputs := contentBasedDemux(routerInput, isEven, isDivisibleBy3, isGreaterThan5)
// Start consumers for each routed output
for i, ch := range routedOutputs {
wg.Add(1)
i := i // Create new variable to avoid closure problem
ch := ch
go func() {
defer wg.Done()
for val := range ch {
var condition string
switch i {
case 0:
condition = "even"
case 1:
condition = "divisible by 3"
case 2:
condition = "greater than 5"
}
fmt.Printf("Router consumer %d received %d (%s)\n", i, val, condition)
}
}()
}
// Send values to the router
go func() {
defer close(routerInput)
for i := 0; i < 10; i++ {
fmt.Printf("Router producer sending: %d\n", i)
routerInput <- i
time.Sleep(100 * time.Millisecond)
}
}()
// Wait for routing demo to complete
time.Sleep(2 * time.Second)
}
These multiplexing and demultiplexing patterns enable:
- Channel consolidation: Combining multiple input sources into a single stream
- Load distribution: Distributing work across multiple workers
- Content-based routing: Directing messages based on their content
- Broadcasting: Sending the same message to multiple recipients
Error Handling in Channel Communication
Robust concurrent systems require effective error handling strategies. Go’s channel-based concurrency introduces unique challenges for error propagation and handling.
Error Propagation Patterns
There are several patterns for propagating errors through channel-based systems:
package main
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
// Result represents a computation result or error
type Result struct {
Value int
Err error
}
// errorPropagationBasic demonstrates the basic error propagation pattern
func errorPropagationBasic() {
// Create a channel for results
results := make(chan Result)
// Start a worker that might encounter errors
go func() {
defer close(results)
// Simulate work with possible errors
for i := 0; i < 5; i++ {
// 30% chance of error
if rand.Float32() < 0.3 {
results <- Result{Err: fmt.Errorf("error processing item %d", i)}
continue
}
// Successful computation
results <- Result{Value: i * 10}
}
}()
// Process results, handling errors
for result := range results {
if result.Err != nil {
fmt.Printf("Error: %v\n", result.Err)
} else {
fmt.Printf("Success: %d\n", result.Value)
}
}
}
// errorPropagationPipeline demonstrates error propagation through a pipeline
func errorPropagationPipeline() {
// Create a pipeline with error propagation
source := generateWithErrors(1, 10)
processed := processWithErrors(source)
// Consume results
for result := range processed {
if result.Err != nil {
fmt.Printf("Pipeline error: %v\n", result.Err)
} else {
fmt.Printf("Pipeline result: %d\n", result.Value)
}
}
}
// generateWithErrors produces integers with possible errors
func generateWithErrors(start, end int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for i := start; i <= end; i++ {
// 20% chance of error
if rand.Float32() < 0.2 {
out <- Result{Err: fmt.Errorf("failed to generate item %d", i)}
continue
}
// Simulate work
time.Sleep(50 * time.Millisecond)
out <- Result{Value: i}
}
}()
return out
}
// processWithErrors transforms values and propagates errors
func processWithErrors(in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for result := range in {
// If input already has an error, propagate it
if result.Err != nil {
out <- result
continue
}
// 10% chance of new error during processing
if rand.Float32() < 0.1 {
out <- Result{Err: fmt.Errorf("failed to process value %d", result.Value)}
continue
}
// Successful processing
time.Sleep(30 * time.Millisecond)
out <- Result{Value: result.Value * result.Value}
}
}()
return out
}
// errorAggregation demonstrates collecting and aggregating errors
func errorAggregation() {
// Create multiple workers that might produce errors
numWorkers := 5
results := make(chan Result)
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 40% chance of error
if rand.Float32() < 0.4 {
results <- Result{Err: fmt.Errorf("worker %d failed", id)}
return
}
// Successful computation
results <- Result{Value: id * 10}
}(i)
}
// Close results channel when all workers are done
go func() {
wg.Wait()
close(results)
}()
// Collect and aggregate results
var successCount, errorCount int
var errors []error
var values []int
for result := range results {
if result.Err != nil {
errorCount++
errors = append(errors, result.Err)
} else {
successCount++
values = append(values, result.Value)
}
}
// Report aggregated results
fmt.Printf("Successful operations: %d\n", successCount)
fmt.Printf("Failed operations: %d\n", errorCount)
if errorCount > 0 {
fmt.Println("Errors encountered:")
for _, err := range errors {
fmt.Printf(" - %v\n", err)
}
}
if successCount > 0 {
fmt.Printf("Values: %v\n", values)
}
}
// earlyTerminationOnError demonstrates stopping a pipeline on first error
func earlyTerminationOnError() {
// Create a context for cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a pipeline with early termination
source := generateWithContext(ctx, 1, 20)
processed := processWithContext(ctx, source)
// Consume results until first error
for result := range processed {
if result.Err != nil {
fmt.Printf("Pipeline error: %v\n", result.Err)
fmt.Println("Cancelling pipeline...")
cancel() // Signal all stages to stop
break
}
fmt.Printf("Pipeline result: %d\n", result.Value)
}
// Wait for pipeline to fully terminate
time.Sleep(100 * time.Millisecond)
fmt.Println("Pipeline terminated")
}
// generateWithContext produces integers with possible errors and respects context
func generateWithContext(ctx context.Context, start, end int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for i := start; i <= end; i++ {
// Check for cancellation
select {
case <-ctx.Done():
fmt.Println("Generator cancelled")
return
default:
// Continue processing
}
// 10% chance of error
if rand.Float32() < 0.1 {
select {
case out <- Result{Err: fmt.Errorf("failed to generate item %d", i)}:
// Error sent
case <-ctx.Done():
return
}
continue
}
// Simulate work
time.Sleep(50 * time.Millisecond)
// Send result
select {
case out <- Result{Value: i}:
// Value sent
case <-ctx.Done():
return
}
}
}()
return out
}
// processWithContext transforms values and respects context
func processWithContext(ctx context.Context, in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for {
// Check for cancellation between items
select {
case <-ctx.Done():
fmt.Println("Processor cancelled")
return
default:
// Continue processing
}
// Try to receive next item
var result Result
var ok bool
select {
case result, ok = <-in:
if !ok {
return // Input channel closed
}
case <-ctx.Done():
return // Context cancelled
}
// If input already has an error, propagate it
if result.Err != nil {
select {
case out <- result:
// Error propagated
case <-ctx.Done():
return
}
continue
}
// 15% chance of new error during processing
if rand.Float32() < 0.15 {
select {
case out <- Result{Err: fmt.Errorf("failed to process value %d", result.Value)}:
// Error sent
case <-ctx.Done():
return
}
continue
}
// Successful processing
time.Sleep(30 * time.Millisecond)
select {
case out <- Result{Value: result.Value * result.Value}:
// Result sent
case <-ctx.Done():
return
}
}
}()
return out
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Demonstrate basic error propagation
fmt.Println("=== Basic Error Propagation ===")
errorPropagationBasic()
// Demonstrate error propagation through a pipeline
fmt.Println("\n=== Pipeline Error Propagation ===")
errorPropagationPipeline()
// Demonstrate error aggregation
fmt.Println("\n=== Error Aggregation ===")
errorAggregation()
// Demonstrate early termination on error
fmt.Println("\n=== Early Termination on Error ===")
earlyTerminationOnError()
}
Key error handling patterns:
- Result type pattern: Combining results and errors in a single struct
- Error propagation: Passing errors through pipeline stages
- Error aggregation: Collecting and summarizing errors from multiple operations
- Early termination: Stopping all processing when an error occurs
Performance Optimization and Monitoring
Building high-performance concurrent systems requires careful attention to channel usage patterns and performance characteristics.
Channel Sizing and Buffering Strategies
The size of channel buffers can significantly impact performance:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// benchmarkChannelBuffering measures the performance impact of different buffer sizes
func benchmarkChannelBuffering() {
// Test parameters
numItems := 10000
bufferSizes := []int{0, 1, 10, 100, 1000}
fmt.Println("Testing channel performance with different buffer sizes")
fmt.Println("Buffer Size | Producer Time | Consumer Time | Total Time")
fmt.Println("-----------|---------------|---------------|------------")
for _, bufferSize := range bufferSizes {
// Create channel with specified buffer size
ch := make(chan int, bufferSize)
var wg sync.WaitGroup
wg.Add(2) // One for producer, one for consumer
// Track timing
start := time.Now()
var producerDone time.Time
// Start producer
go func() {
defer wg.Done()
defer close(ch)
for i := 0; i < numItems; i++ {
ch <- i
}
producerDone = time.Now()
}()
// Start consumer
go func() {
defer wg.Done()
count := 0
for range ch {
count++
// Simulate variable processing time
if rand.Intn(100) < 10 {
time.Sleep(10 * time.Microsecond)
}
}
}()
// Wait for both to finish
wg.Wait()
totalTime := time.Since(start)
producerTime := producerDone.Sub(start)
consumerTime := totalTime
// Report results
fmt.Printf("%-11d | %-13s | %-13s | %s\n",
bufferSize,
producerTime.String(),
consumerTime.String(),
totalTime.String())
}
}
// demonstrateBackpressure shows how buffered channels provide backpressure
func demonstrateBackpressure() {
fmt.Println("\nDemonstrating backpressure with buffered channels")
// Create a channel with limited buffer
bufferSize := 5
ch := make(chan int, bufferSize)
// Start a slow consumer
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for item := range ch {
fmt.Printf("Consumer processing item %d\n", item)
time.Sleep(200 * time.Millisecond) // Slow consumer
}
}()
// Producer tries to send items faster than consumer can process
for i := 0; i < 10; i++ {
fmt.Printf("Producer attempting to send item %d\n", i)
start := time.Now()
ch <- i
elapsed := time.Since(start)
if elapsed > 100*time.Millisecond {
fmt.Printf("Producer blocked for %s while sending item %d (backpressure in action)\n",
elapsed, i)
} else {
fmt.Printf("Producer sent item %d immediately\n", i)
}
time.Sleep(100 * time.Millisecond)
}
close(ch)
wg.Wait()
}
// channelOverheadComparison compares channels to other synchronization methods
func channelOverheadComparison() {
fmt.Println("\nComparing channel overhead to other synchronization methods")
iterations := 100000
// Test mutex-based synchronization
start := time.Now()
var mutex sync.Mutex
var counter int
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
mutex.Lock()
counter++
mutex.Unlock()
}
}()
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
mutex.Lock()
counter++
mutex.Unlock()
}
}()
wg.Wait()
mutexTime := time.Since(start)
// Test channel-based synchronization
start = time.Now()
ch := make(chan int)
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
ch <- 1
}
}()
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
<-ch
}
}()
wg.Wait()
channelTime := time.Since(start)
// Test buffered channel
start = time.Now()
bufferedCh := make(chan int, 1000)
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < iterations;
for i := 0; i < iterations; i++ {
bufferedCh <- 1
}
}()
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
<-bufferedCh
}
}()
wg.Wait()
bufferedChannelTime := time.Since(start)
// Report results
fmt.Printf("Mutex: %s\n", mutexTime)
fmt.Printf("Unbuffered Chan: %s\n", channelTime)
fmt.Printf("Buffered Chan: %s\n", bufferedChannelTime)
}
func main() {
// Benchmark different buffer sizes
benchmarkChannelBuffering()
// Demonstrate backpressure
demonstrateBackpressure()
// Compare channel overhead to other synchronization methods
channelOverheadComparison()
}
Key performance considerations:
- Buffer sizing: Larger buffers can improve throughput but increase memory usage
- Backpressure: Buffered channels naturally implement backpressure when producers outpace consumers
- Overhead comparison: Channels have higher overhead than mutexes but provide more functionality
- Batching: Processing items in batches can reduce channel communication overhead
Monitoring Channel Health
Monitoring channel behavior is essential for identifying bottlenecks and deadlocks:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// ChannelStats tracks statistics about a channel
type ChannelStats struct {
Name string
SendCount int64
ReceiveCount int64
BlockedSends int64
BlockedReceives int64
LastActivity time.Time
mutex sync.Mutex
}
// NewChannelStats creates a new channel statistics tracker
func NewChannelStats(name string) *ChannelStats {
return &ChannelStats{
Name: name,
LastActivity: time.Now(),
}
}
// RecordSend records a send operation
func (cs *ChannelStats) RecordSend(blocked bool) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.SendCount++
if blocked {
cs.BlockedSends++
}
cs.LastActivity = time.Now()
}
// RecordReceive records a receive operation
func (cs *ChannelStats) RecordReceive(blocked bool) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.ReceiveCount++
if blocked {
cs.BlockedReceives++
}
cs.LastActivity = time.Now()
}
// GetStats returns the current statistics
func (cs *ChannelStats) GetStats() map[string]interface{} {
cs.mutex.Lock()
defer cs.mutex.Unlock()
return map[string]interface{}{
"name": cs.Name,
"sends": cs.SendCount,
"receives": cs.ReceiveCount,
"blocked_sends": cs.BlockedSends,
"blocked_receives": cs.BlockedReceives,
"idle_time": time.Since(cs.LastActivity).String(),
}
}
// InstrumentedChannel wraps a channel with monitoring
type InstrumentedChannel struct {
ch chan int
stats *ChannelStats
}
// NewInstrumentedChannel creates a new instrumented channel
func NewInstrumentedChannel(name string, buffer int) *InstrumentedChannel {
return &InstrumentedChannel{
ch: make(chan int, buffer),
stats: NewChannelStats(name),
}
}
// Send sends a value on the channel with instrumentation
func (ic *InstrumentedChannel) Send(value int) {
// Try non-blocking send first
select {
case ic.ch <- value:
ic.stats.RecordSend(false)
default:
// Blocking send
ic.stats.RecordSend(true)
ic.ch <- value
}
}
// Receive receives a value from the channel with instrumentation
func (ic *InstrumentedChannel) Receive() (int, bool) {
// Try non-blocking receive first
select {
case value, ok := <-ic.ch:
ic.stats.RecordReceive(false)
return value, ok
default:
// Blocking receive
ic.stats.RecordReceive(true)
value, ok := <-ic.ch
return value, ok
}
}
// Close closes the underlying channel
func (ic *InstrumentedChannel) Close() {
close(ic.ch)
}
// GetStats returns the channel statistics
func (ic *InstrumentedChannel) GetStats() map[string]interface{} {
return ic.stats.GetStats()
}
// monitorChannels periodically reports channel statistics
func monitorChannels(channels []*InstrumentedChannel, interval time.Duration, done <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("\nChannel Statistics:")
fmt.Println("-------------------")
for _, ch := range channels {
stats := ch.GetStats()
fmt.Printf("Channel: %s\n", stats["name"])
fmt.Printf(" Sends: %d (blocked: %d)\n", stats["sends"], stats["blocked_sends"])
fmt.Printf(" Receives: %d (blocked: %d)\n", stats["receives"], stats["blocked_receives"])
fmt.Printf(" Idle time: %s\n", stats["idle_time"])
}
// Report goroutine count
fmt.Printf("\nTotal goroutines: %d\n", runtime.NumGoroutine())
case <-done:
return
}
}
}
// simulateChannelWorkload demonstrates channel monitoring
func simulateChannelWorkload() {
// Create instrumented channels
fastChannel := NewInstrumentedChannel("fast", 10)
slowChannel := NewInstrumentedChannel("slow", 5)
// Create done channel for cleanup
done := make(chan struct{})
// Start monitoring
go monitorChannels([]*InstrumentedChannel{fastChannel, slowChannel}, 1*time.Second, done)
// Start producer for fast channel
go func() {
for i := 0; i < 1000; i++ {
fastChannel.Send(i)
time.Sleep(10 * time.Millisecond)
}
fastChannel.Close()
}()
// Start consumer for fast channel
go func() {
for {
_, ok := fastChannel.Receive()
if !ok {
break
}
time.Sleep(20 * time.Millisecond) // Consumer is slower than producer
}
}()
// Start producer for slow channel
go func() {
for i := 0; i < 100; i++ {
slowChannel.Send(i)
time.Sleep(50 * time.Millisecond)
}
slowChannel.Close()
}()
// Start consumer for slow channel
go func() {
for {
_, ok := slowChannel.Receive()
if !ok {
break
}
time.Sleep(10 * time.Millisecond) // Consumer is faster than producer
}
}()
// Run simulation for 10 seconds
time.Sleep(10 * time.Second)
close(done)
}
func main() {
simulateChannelWorkload()
}
Key monitoring techniques:
- Instrumented channels: Wrapping channels with monitoring code
- Operation tracking: Recording send and receive operations
- Blocked operation detection: Identifying when operations block
- Idle time tracking: Detecting potentially deadlocked channels
- Goroutine count monitoring: Identifying potential goroutine leaks
Channel Memory Management
Efficient channel memory management is crucial for high-performance systems:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// demonstrateChannelMemoryUsage shows memory usage patterns of channels
func demonstrateChannelMemoryUsage() {
fmt.Println("=== Channel Memory Usage ===")
// Print initial memory stats
printMemStats("Initial")
// Create many small channels
fmt.Println("\nCreating 100,000 small channels...")
smallChannels := make([]chan int, 100000)
for i := range smallChannels {
smallChannels[i] = make(chan int, 1)
}
// Print memory stats after creating small channels
printMemStats("After small channels")
// Create a few large channels
fmt.Println("\nCreating 10 large channels (buffer size 10,000)...")
largeChannels := make([]chan int, 10)
for i := range largeChannels {
largeChannels[i] = make(chan int, 10000)
}
// Print memory stats after creating large channels
printMemStats("After large channels")
// Fill the large channels
fmt.Println("\nFilling large channels...")
for _, ch := range largeChannels {
for i := 0; i < 10000; i++ {
ch <- i
}
}
// Print memory stats after filling large channels
printMemStats("After filling large channels")
// Clear references to allow garbage collection
fmt.Println("\nClearing references...")
smallChannels = nil
largeChannels = nil
// Force garbage collection
runtime.GC()
// Print final memory stats
printMemStats("After garbage collection")
}
// printMemStats prints current memory statistics
func printMemStats(label string) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("%s:\n", label)
fmt.Printf(" Alloc: %.2f MB\n", float64(m.Alloc)/1024/1024)
fmt.Printf(" Sys: %.2f MB\n", float64(m.Sys)/1024/1024)
fmt.Printf(" NumGC: %d\n", m.NumGC)
}
// objectPool demonstrates reusing channel values to reduce allocations
type objectPool struct {
pool chan []byte
}
// newObjectPool creates a new object pool
func newObjectPool(size int, bufferSize int) *objectPool {
p := &objectPool{
pool: make(chan []byte, size),
}
// Pre-allocate objects
for i := 0; i < size; i++ {
p.pool <- make([]byte, bufferSize)
}
return p
}
// get retrieves an object from the pool or creates a new one if none available
func (p *objectPool) get() []byte {
select {
case obj := <-p.pool:
return obj
default:
// Pool is empty, create a new object
return make([]byte, 4096)
}
}
// put returns an object to the pool
func (p *objectPool) put(obj []byte) {
// Clear the buffer for reuse
for i := range obj {
obj[i] = 0
}
select {
case p.pool <- obj:
// Object returned to pool
default:
// Pool is full, let the object be garbage collected
}
}
// demonstrateObjectPooling shows how to reduce allocations with object pooling
func demonstrateObjectPooling() {
fmt.Println("\n=== Object Pooling ===")
// Create a pool of 100 byte slices, each 4KB
pool := newObjectPool(100, 4096)
// Benchmark without pooling
printMemStats("Before non-pooled operations")
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Allocate and use a new buffer each time
buf := make([]byte, 4096)
for i := range buf {
buf[i] = byte(i % 256)
}
}()
}
wg.Wait()
nonPooledTime := time.Since(start)
printMemStats("After non-pooled operations")
// Force garbage collection
runtime.GC()
// Benchmark with pooling
printMemStats("Before pooled operations")
start = time.Now()
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Get buffer from pool
buf := pool.get()
// Use the buffer
for i := range buf {
buf[i] = byte(i % 256)
}
// Return buffer to pool
pool.put(buf)
}()
}
wg.Wait()
pooledTime := time.Since(start)
printMemStats("After pooled operations")
// Report timing results
fmt.Printf("\nNon-pooled time: %s\n", nonPooledTime)
fmt.Printf("Pooled time: %s\n", pooledTime)
fmt.Printf("Improvement: %.2f%%\n", 100*(1-float64(pooledTime)/float64(nonPooledTime)))
}
func main() {
// Demonstrate channel memory usage
demonstrateChannelMemoryUsage()
// Demonstrate object pooling
demonstrateObjectPooling()
}
Key memory management techniques:
- Buffer sizing: Choosing appropriate buffer sizes to balance memory usage and performance
- Object pooling: Reusing objects to reduce allocation and garbage collection overhead
- Memory monitoring: Tracking memory usage to identify leaks and inefficiencies
- Pre-allocation: Allocating channels and buffers upfront to reduce dynamic allocations
Production Implementation Strategies
Implementing channel-based concurrency patterns in production systems requires careful consideration of reliability, maintainability, and performance.
Graceful Shutdown Patterns
Proper shutdown handling is essential for production systems:
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// Worker represents a long-running worker goroutine
type Worker struct {
id int
jobs <-chan int
results chan<- int
ctx context.Context
cancel context.CancelFunc
gracePeriod time.Duration
wg *sync.WaitGroup
}
// NewWorker creates a new worker
func NewWorker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) *Worker {
ctx, cancel := context.WithCancel(context.Background())
return &Worker{
id: id,
jobs: jobs,
results: results,
ctx: ctx,
cancel: cancel,
gracePeriod: 2 * time.Second,
wg: wg,
}
}
// Start begins the worker's processing loop
func (w *Worker) Start() {
w.wg.Add(1)
go func() {
defer w.wg.Done()
for {
select {
case <-w.ctx.Done():
fmt.Printf("Worker %d shutting down\n", w.id)
return
case job, ok := <-w.jobs:
if !ok {
fmt.Printf("Worker %d: job channel closed\n", w.id)
return
}
// Process the job
fmt.Printf("Worker %d processing job %d\n", w.id, job)
time.Sleep(500 * time.Millisecond) // Simulate work
// Try to send result, respecting cancellation
select {
case w.results <- job * 2:
// Result sent successfully
case <-w.ctx.Done():
fmt.Printf("Worker %d: cancelled while sending result\n", w.id)
return
}
}
}
}()
}
// Stop signals the worker to stop and waits for graceful shutdown
func (w *Worker) Stop() {
fmt.Printf("Stopping worker %d (grace period: %v)\n", w.id, w.gracePeriod)
// Signal worker to stop
w.cancel()
// Create a channel that will be closed after the grace period
gracePeriodExpired := make(chan struct{})
go func() {
time.Sleep(w.gracePeriod)
close(gracePeriodExpired)
}()
// Wait for either the worker to finish or the grace period to expire
select {
case <-gracePeriodExpired:
fmt.Printf("Grace period expired for worker %d\n", w.id)
}
}
// WorkerPool manages a pool of workers
type WorkerPool struct {
workers []*Worker
jobs chan int
results chan int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
shutdownWg sync.WaitGroup
gracePeriod time.Duration
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool(numWorkers int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
return &WorkerPool{
workers: make([]*Worker, numWorkers),
jobs: make(chan int),
results: make(chan int),
ctx: ctx,
cancel: cancel,
gracePeriod: 5 * time.Second,
}
}
// Start launches the worker pool
func (wp *WorkerPool) Start() {
// Start workers
for i := 0; i < len(wp.workers); i++ {
wp.workers[i] = NewWorker(i, wp.jobs, wp.results, &wp.wg)
wp.workers[i].Start()
}
// Start result collector
wp.shutdownWg.Add(1)
go func() {
defer wp.shutdownWg.Done()
for {
select {
case result, ok := <-wp.results:
if !ok {
return
}
fmt.Printf("Got result: %d\n", result)
case <-wp.ctx.Done():
fmt.Println("Result collector shutting down")
return
}
}
}()
}
// SubmitJob adds a job to the pool
func (wp *WorkerPool) SubmitJob(job int) error {
select {
case wp.jobs <- job:
return nil
case <-wp.ctx.Done():
return fmt.Errorf("worker pool is shutting down")
}
}
// Shutdown gracefully shuts down the worker pool
func (wp *WorkerPool) Shutdown() {
fmt.Printf("Initiating graceful shutdown (grace period: %v)\n", wp.gracePeriod)
// Signal shutdown
wp.cancel()
// Close the jobs channel to signal no more jobs
close(wp.jobs)
// Create a channel that will be closed after the grace period
gracePeriodExpired := make(chan struct{})
go func() {
time.Sleep(wp.gracePeriod)
close(gracePeriodExpired)
}()
// Wait for either all workers to finish or the grace period to expire
doneChannel := make(chan struct{})
go func() {
wp.wg.Wait()
close(doneChannel)
}()
select {
case <-doneChannel:
fmt.Println("All workers completed gracefully")
case <-gracePeriodExpired:
fmt.Println("Grace period expired, some workers may still be running")
}
// Close the results channel
close(wp.results)
// Wait for the result collector to finish
wp.shutdownWg.Wait()
fmt.Println("Worker pool shutdown complete")
}
// setupGracefulShutdown sets up signal handling for graceful shutdown
func setupGracefulShutdown(shutdown func()) {
// Create channel to receive OS signals
sigs := make(chan os.Signal, 1)
// Register for SIGINT (Ctrl+C) and SIGTERM
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
// Start goroutine to handle signals
go func() {
sig := <-sigs
fmt.Printf("\nReceived signal: %s\n", sig)
shutdown()
}()
}
func main() {
// Create a worker pool with 3 workers
pool := NewWorkerPool(3)
// Set up graceful shutdown
setupGracefulShutdown(pool.Shutdown)
// Start the worker pool
pool.Start()
// Submit some jobs
fmt.Println("Submitting jobs...")
for i := 1; i <= 10; i++ {
if err := pool.SubmitJob(i); err != nil {
fmt.Printf("Error submitting job: %v\n", err)
}
}
// Wait for a bit to let some processing happen
time.Sleep(3 * time.Second)
// Initiate graceful shutdown
fmt.Println("Initiating shutdown...")
pool.Shutdown()
fmt.Println("Main function exiting")
}
Key shutdown patterns:
- Context cancellation: Using context to signal shutdown to all components
- Grace periods: Allowing components time to finish current work
- Signal handling: Responding to OS signals for clean shutdown
- Resource cleanup: Ensuring all resources are properly released
Error Handling and Recovery
Robust error handling and recovery are essential for production systems:
package main
import (
"errors"
"fmt"
"log"
"math/rand"
"sync"
"time"
)
// RecoverableWorker represents a worker that can recover from panics
type RecoverableWorker struct {
id int
jobs <-chan int
results chan<- Result
errors chan<- error
restartDelay time.Duration
maxRestarts int
wg *sync.WaitGroup
}
// Result represents a job result or error
type Result struct {
JobID int
Value int
Worker int
}
// NewRecoverableWorker creates a new worker that can recover from panics
func NewRecoverableWorker(id int, jobs <-chan int, results chan<- Result, errors chan<- error, wg *sync.WaitGroup) *RecoverableWorker {
return &RecoverableWorker{
id: id,
jobs: jobs,
results: results,
errors: errors,
restartDelay: 1 * time.Second,
maxRestarts: 3,
wg: wg,
}
}
// Start begins the worker's processing loop with recovery
func (w *RecoverableWorker) Start() {
w.wg.Add(1)
go func() {
defer w.wg.Done()
restarts := 0
for restarts <= w.maxRestarts {
if restarts > 0 {
log.Printf("Worker %d: restarting (%d/%d) after delay of %v",
w.id, restarts, w.maxRestarts, w.restartDelay)
time.Sleep(w.restartDelay)
}
// Run the worker with panic recovery
if w.runWithRecovery() {
// Normal exit, no need to restart
return
}
restarts++
}
log.Printf("Worker %d: exceeded maximum restarts (%d)", w.id, w.maxRestarts)
w.errors <- fmt.Errorf("worker %d exceeded maximum restarts (%d)", w.id, w.maxRestarts)
}()
}
// runWithRecovery runs the worker's main loop with panic recovery
// Returns true if the worker exited normally, false if it panicked
func (w *RecoverableWorker) runWithRecovery() (normalExit bool) {
defer func() {
if r := recover(); r != nil {
log.Printf("Worker %d: recovered from panic: %v", w.id, r)
normalExit = false
}
}()
for job := range w.jobs {
// Simulate random panics
if rand.Float32() < 0.1 {
panic(fmt.Sprintf("simulated panic processing job %d", job))
}
// Simulate work
time.Sleep(200 * time.Millisecond)
// Simulate random errors
if rand.Float32() < 0.2 {
w.errors <- fmt.Errorf("worker %d: error processing job %d", w.id, job)
continue
}
// Send successful result
w.results <- Result{
JobID: job,
Value: job * 2,
Worker: w.id,
}
}
return true // Normal exit
}
// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
failures int
threshold int
resetTimeout time.Duration
halfOpenTimeout time.Duration
lastFailure time.Time
state string
mutex sync.Mutex
}
// NewCircuitBreaker creates a new circuit breaker
func NewCircuitBreaker(threshold int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
threshold: threshold,
resetTimeout: resetTimeout,
halfOpenTimeout: 5 * time.Second,
state: "closed",
}
}
// Execute runs the given function with circuit breaker protection
func (cb *CircuitBreaker) Execute(operation func() error) error {
cb.mutex.Lock()
// Check if circuit is open
if cb.state == "open" {
// Check if reset timeout has elapsed
if time.Since(cb.lastFailure) > cb.resetTimeout {
log.Println("Circuit half-open, allowing trial request")
cb.state = "half-open"
} else {
cb.mutex.Unlock()
return errors.New("circuit breaker is open")
}
}
cb.mutex.Unlock()
// Execute the operation
err := operation()
cb.mutex.Lock()
defer cb.mutex.Unlock()
if err != nil {
// Operation failed
cb.failures++
cb.lastFailure = time.Now()
if cb.state == "half-open" || cb.failures >= cb.threshold {
// Open the circuit
cb.state = "open"
log.Printf("Circuit opened: %v failures (threshold: %d)", cb.failures, cb.threshold)
}
return err
}
// Operation succeeded
if cb.state == "half-open" {
// Reset the circuit
cb.state = "closed"
cb.failures = 0
log.Println("Circuit closed: trial request succeeded")
}
return nil
}
// State returns the current state of the circuit breaker
func (cb *CircuitBreaker) State() string {
cb.mutex.Lock()
defer cb.mutex.Unlock()
return cb.state
}
// demonstrateCircuitBreaker shows how to use a circuit breaker with channels
func demonstrateCircuitBreaker() {
// Create a circuit breaker
cb := NewCircuitBreaker(3, 5*time.Second)
// Create channels
requests := make(chan int, 10)
results := make(chan Result)
errors := make(chan error)
// Start worker
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for req := range requests {
// Use circuit breaker to protect the operation
err := cb.Execute(func() error {
// Simulate an unreliable operation
if rand.Float32() < 0.7 {
return fmt.Errorf("simulated error processing request %d", req)
}
// Successful operation
results <- Result{JobID: req, Value: req * 10}
return nil
})
if err != nil {
errors <- err
}
}
}()
// Start error and result handlers
go func() {
for err := range errors {
log.Printf("Error: %v", err)
}
}()
go func() {
for result := range results {
log.Printf("Result: %+v", result)
}
}()
// Send requests
log.Println("Sending requests...")
for i := 1; i <= 20; i++ {
requests <- i
log.Printf("Request %d sent, circuit state: %s", i, cb.State())
time.Sleep(500 * time.Millisecond)
}
close(requests)
wg.Wait()
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Demonstrate recoverable workers
log.Println("=== Recoverable Workers ===")
// Create channels
jobs := make(chan int, 10)
results := make(chan Result)
errors := make(chan error)
// Create and start workers
var wg sync.WaitGroup
numWorkers := 3
for i := 0; i < numWorkers; i++ {
worker := NewRecoverableWorker(i, jobs, results, errors, &wg)
worker.Start()
}
// Start error handler
go func() {
for err := range errors {
log.Printf("Error: %v", err)
}
}()
// Start result handler
go func() {
for result := range results {
log.Printf("Result: Job %d = %d (Worker %d)",
result.JobID, result.Value, result.Worker)
}
}()
// Send jobs
for i := 1; i <= 20; i++ {
jobs <- i
}
// Close jobs channel and wait for workers to finish
close(jobs)
wg.Wait()
// Demonstrate circuit breaker
log.Println("\n=== Circuit Breaker ===")
demonstrateCircuitBreaker()
}
Key error handling patterns:
- Panic recovery: Recovering from panics to prevent goroutine crashes
- Worker restart: Automatically restarting failed workers
- Circuit breaker: Preventing cascading failures by failing fast
- Error propagation: Sending errors through dedicated channels
Testing Concurrent Code
Testing concurrent code requires specialized techniques:
package main
import (
"context"
"fmt"
"sync"
"testing"
"time"
)
// Pipeline represents a simple data processing pipeline
type Pipeline struct {
source func(ctx context.Context) <-chan int
transform func(ctx context.Context, in <-chan int) <-chan int
sink func(ctx context.Context, in <-chan int) <-chan Result
}
// TestPipeline demonstrates testing a concurrent pipeline
func TestPipeline(t *testing.T) {
// Create a test pipeline
p := Pipeline{
source: func(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= 5; i++ {
select {
case out <- i:
case <-ctx.Done():
return
}
}
}()
return out
},
transform: func(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * 2:
case <-ctx.Done():
return
}
}
}()
return out
},
sink: func(ctx context.Context, in <-chan int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for n := range in {
select {
case out <- Result{Value: n}:
case <-ctx.Done():
return
}
}
}()
return out
},
}
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// Run the pipeline
source := p.source(ctx)
transformed := p.transform(ctx, source)
results := p.sink(ctx, transformed)
// Collect and verify results
var actual []int
for result := range results {
actual = append(actual, result.Value)
}
// Verify results
expected := []int{2, 4, 6, 8, 10}
if len(actual) != len(expected) {
t.Errorf("Expected %d results, got %d", len(expected), len(actual))
}
for i, v := range actual {
if v != expected[i] {
t.Errorf("Expected %d at position %d, got %d", expected[i], i, v)
}
}
}
// TestRaceConditions demonstrates testing for race conditions
func TestRaceConditions(t *testing.T) {
// Create a shared counter
var counter int
var mutex sync.Mutex
// Create a WaitGroup to synchronize goroutines
var wg sync.WaitGroup
// Launch multiple goroutines to increment the counter
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Properly synchronized increment
mutex.Lock()
counter++
mutex.Unlock()
// Incorrectly synchronized increment would be:
// counter++ // This would cause a race condition
}()
}
// Wait for all goroutines to finish
wg.Wait()
// Verify the counter value
if counter != 100 {
t.Errorf("Expected counter to be 100, got %d", counter)
}
// Note: This test should be run with the -race flag:
// go test -race
}
// TestDeadlockDetection demonstrates testing for deadlocks
func TestDeadlockDetection(t *testing.T) {
// Create a channel for communication
ch := make(chan int)
// Set a timeout to detect deadlocks
timeout := time.After(500 * time.Millisecond)
// Start a goroutine that sends a value
go func() {
ch <- 42
}()
// Try to receive with a timeout
select {
case val := <-ch:
fmt.Printf("Received: %d\n", val)
case <-timeout:
t.Fatal("Deadlock detected: timed out waiting for channel send/receive")
}
// Example of a potential deadlock (commented out)
/*
unbufferedCh := make(chan int) // Unbuffered channel
// This would deadlock if uncommented:
// unbufferedCh <- 1 // Send without a receiver
// Instead, use a goroutine:
go func() {
unbufferedCh <- 1
}()
// And receive with a timeout:
select {
case <-unbufferedCh:
// Success
case <-time.After(500 * time.Millisecond):
t.Fatal("Deadlock detected")
}
*/
}
func main() {
// These functions would typically be run as tests
// TestPipeline(nil)
// TestRaceConditions(nil)
// TestDeadlockDetection(nil)
fmt.Println("Run these functions as tests with 'go test'")
}
Key testing techniques:
- Timeout-based testing: Using timeouts to detect deadlocks and hangs
- Race detection: Using Go’s race detector to find race conditions
- Context cancellation: Testing proper cancellation handling
- Deterministic testing: Creating reproducible tests for concurrent code