Performance Optimization and Monitoring

To build high-performance pipeline systems, careful optimization and monitoring are essential.

Buffer Sizing and Throughput Optimization

The size of channel buffers can significantly impact pipeline performance:

package main

import (
	"fmt"
	"sync"
	"time"
)

// benchmarkPipeline measures the throughput of a pipeline with different buffer sizes
func benchmarkPipeline(bufferSize int, numItems int) time.Duration {
	start := time.Now()
	
	// Create stages with specified buffer size
	stage1 := make(chan int, bufferSize)
	stage2 := make(chan int, bufferSize)
	stage3 := make(chan int, bufferSize)
	
	// Start producer
	go func() {
		defer close(stage1)
		for i := 0; i < numItems; i++ {
			stage1 <- i
		}
	}()
	
	// Start stage 2 processor
	go func() {
		defer close(stage2)
		for item := range stage1 {
			// Simulate processing
			time.Sleep(1 * time.Millisecond)
			stage2 <- item * 2
		}
	}()
	
	// Start stage 3 processor
	go func() {
		defer close(stage3)
		for item := range stage2 {
			// Simulate processing
			time.Sleep(2 * time.Millisecond)
			stage3 <- item + 1
		}
	}()
	
	// Consume results
	var count int
	for range stage3 {
		count++
	}
	
	return time.Since(start)
}

func main() {
	numItems := 1000
	bufferSizes := []int{1, 10, 100, 1000}
	
	fmt.Printf("Processing %d items through a 3-stage pipeline\n", numItems)
	fmt.Println("Buffer Size | Duration | Items/sec")
	fmt.Println("-----------|----------|----------")
	
	for _, size := range bufferSizes {
		duration := benchmarkPipeline(size, numItems)
		itemsPerSec := float64(numItems) / duration.Seconds()
		
		fmt.Printf("%10d | %8v | %9.2f\n", size, duration.Round(time.Millisecond), itemsPerSec)
	}
}

Key optimization principles:

  1. Buffer Sizing: Larger buffers can improve throughput by reducing blocking, but consume more memory
  2. Stage Balancing: Match processing speeds across stages to minimize bottlenecks
  3. Batch Processing: Process items in batches to amortize overhead costs

Pipeline Monitoring and Metrics

Monitoring pipeline performance is essential for identifying bottlenecks and ensuring reliability:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// PipelineMetrics tracks performance metrics for a pipeline stage
type PipelineMetrics struct {
	StageName       string
	ItemsProcessed  atomic.Int64
	ErrorCount      atomic.Int64
	ProcessingTime  atomic.Int64 // Nanoseconds
	LastProcessed   atomic.Int64 // Unix timestamp
	QueueDepth      atomic.Int64
	MaxQueueDepth   atomic.Int64
	TotalWaitTime   atomic.Int64 // Nanoseconds
	ProcessingCount atomic.Int64 // Number of items currently being processed
}

// NewPipelineMetrics creates a new metrics tracker for a stage
func NewPipelineMetrics(name string) *PipelineMetrics {
	return &PipelineMetrics{
		StageName: name,
	}
}

// RecordProcessingTime records the time taken to process an item
func (m *PipelineMetrics) RecordProcessingTime(duration time.Duration) {
	m.ProcessingTime.Add(int64(duration))
	m.ItemsProcessed.Add(1)
	m.LastProcessed.Store(time.Now().Unix())
}

// RecordError increments the error count
func (m *PipelineMetrics) RecordError() {
	m.ErrorCount.Add(1)
}

// RecordQueueDepth updates the queue depth metrics
func (m *PipelineMetrics) RecordQueueDepth(depth int) {
	m.QueueDepth.Store(int64(depth))
	
	// Update max queue depth if needed
	for {
		current := m.MaxQueueDepth.Load()
		if int64(depth) <= current {
			break
		}
		if m.MaxQueueDepth.CompareAndSwap(current, int64(depth)) {
			break
		}
	}
}

// RecordWaitTime records the time an item spent waiting in the queue
func (m *PipelineMetrics) RecordWaitTime(duration time.Duration) {
	m.TotalWaitTime.Add(int64(duration))
}

// BeginProcessing increments the count of items being processed
func (m *PipelineMetrics) BeginProcessing() {
	m.ProcessingCount.Add(1)
}

// EndProcessing decrements the count of items being processed
func (m *PipelineMetrics) EndProcessing() {
	m.ProcessingCount.Add(-1)
}

// GetStats returns a snapshot of the current metrics
func (m *PipelineMetrics) GetStats() map[string]interface{} {
	itemsProcessed := m.ItemsProcessed.Load()
	processingTime := m.ProcessingTime.Load()
	
	var avgProcessingTime float64
	if itemsProcessed > 0 {
		avgProcessingTime = float64(processingTime) / float64(itemsProcessed) / float64(time.Millisecond)
	}
	
	var avgWaitTime float64
	if itemsProcessed > 0 {
		avgWaitTime = float64(m.TotalWaitTime.Load()) / float64(itemsProcessed) / float64(time.Millisecond)
	}
	
	return map[string]interface{}{
		"stage_name":          m.StageName,
		"items_processed":     itemsProcessed,
		"errors":              m.ErrorCount.Load(),
		"avg_processing_time": avgProcessingTime,
		"last_processed":      time.Unix(m.LastProcessed.Load(), 0),
		"current_queue_depth": m.QueueDepth.Load(),
		"max_queue_depth":     m.MaxQueueDepth.Load(),
		"avg_wait_time":       avgWaitTime,
		"active_processors":   m.ProcessingCount.Load(),
	}
}

// instrumentedStage wraps a processing function with metrics
func instrumentedStage(name string, in <-chan int, processFunc func(int) (int, error)) (<-chan int, *PipelineMetrics) {
	metrics := NewPipelineMetrics(name)
	out := make(chan int, cap(in)) // Match buffer size
	
	go func() {
		defer close(out)
		
		for item := range in {
			// Record queue depth
			metrics.RecordQueueDepth(len(in))
			
			// Record wait time (simplified - in a real system you'd track per-item timestamps)
			waitStart := time.Now()
			
			// Begin processing
			metrics.BeginProcessing()
			processStart := time.Now()
			metrics.RecordWaitTime(processStart.Sub(waitStart))
			
			// Process the item
			result, err := processFunc(item)
			
			// Record metrics
			metrics.RecordProcessingTime(time.Since(processStart))
			metrics.EndProcessing()
			
			if err != nil {
				metrics.RecordError()
				fmt.Printf("Error in %s: %v\n", name, err)
				continue
			}
			
			// Send result to output
			out <- result
		}
	}()
	
	return out, metrics
}

// metricsReporter periodically prints metrics
func metricsReporter(metrics []*PipelineMetrics, interval time.Duration, done <-chan struct{}) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			fmt.Println("\n--- Pipeline Metrics ---")
			for _, m := range metrics {
				stats := m.GetStats()
				fmt.Printf("Stage: %s\n", stats["stage_name"])
				fmt.Printf("  Processed: %d items (%.2f items/sec)\n", 
					stats["items_processed"],
					float64(stats["items_processed"].(int64))/interval.Seconds())
				fmt.Printf("  Errors: %d\n", stats["errors"])
				fmt.Printf("  Avg Processing Time: %.2f ms\n", stats["avg_processing_time"])
				fmt.Printf("  Queue Depth: %d (max: %d)\n", 
					stats["current_queue_depth"], stats["max_queue_depth"])
				fmt.Printf("  Avg Wait Time: %.2f ms\n", stats["avg_wait_time"])
				fmt.Printf("  Active Processors: %d\n", stats["active_processors"])
			}
		case <-done:
			return
		}
	}
}

func main() {
	// Create input channel
	input := make(chan int, 100)
	
	// Create instrumented pipeline stages
	stage1, metrics1 := instrumentedStage("Processor", input, func(i int) (int, error) {
		time.Sleep(10 * time.Millisecond)
		return i * 2, nil
	})
	
	stage2, metrics2 := instrumentedStage("Transformer", stage1, func(i int) (int, error) {
		time.Sleep(15 * time.Millisecond)
		return i + 1, nil
	})
	
	stage3, metrics3 := instrumentedStage("Validator", stage2, func(i int) (int, error) {
		time.Sleep(5 * time.Millisecond)
		return i, nil
	})
	
	// Start metrics reporter
	done := make(chan struct{})
	go metricsReporter([]*PipelineMetrics{metrics1, metrics2, metrics3}, 1*time.Second, done)
	
	// Start consumer
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		count := 0
		for range stage3 {
			count++
		}
		fmt.Printf("\nProcessed %d items\n", count)
	}()
	
	// Feed input
	for i := 0; i < 1000; i++ {
		input <- i
		time.Sleep(5 * time.Millisecond)
	}
	close(input)
	
	// Wait for pipeline to complete
	wg.Wait()
	close(done)
}

This monitoring approach provides:

  1. Real-time Metrics: Track throughput, latency, and error rates in real time
  2. Bottleneck Identification: Identify stages that are processing slowly or have deep queues
  3. Resource Utilization: Monitor memory usage through queue depths
  4. Error Tracking: Track error rates by stage

Memory Optimization Techniques

Memory efficiency is critical for high-throughput pipelines:

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

// Item represents a data item to be processed
type Item struct {
	ID    int
	Data  []byte
	Extra map[string]interface{}
}

// ItemPool implements an object pool for Items
type ItemPool struct {
	pool sync.Pool
}

// NewItemPool creates a new pool for Items
func NewItemPool() *ItemPool {
	return &ItemPool{
		pool: sync.Pool{
			New: func() interface{} {
				return &Item{
					Data:  make([]byte, 1024), // Pre-allocate buffer
					Extra: make(map[string]interface{}),
				}
			},
		},
	}
}

// Get retrieves an Item from the pool
func (p *ItemPool) Get() *Item {
	item := p.pool.Get().(*Item)
	// Reset the item (clear the map but keep the allocated memory)
	for k := range item.Extra {
		delete(item.Extra, k)
	}
	return item
}

// Put returns an Item to the pool
func (p *ItemPool) Put(item *Item) {
	p.pool.Put(item)
}

// memoryEfficientStage processes items using an object pool
func memoryEfficientStage(in <-chan *Item, pool *ItemPool) <-chan *Item {
	out := make(chan *Item, cap(in))
	go func() {
		defer close(out)
		for item := range in {
			// Process the item
			item.Extra["processed"] = true
			item.Extra["timestamp"] = time.Now()
			
			// Send to output
			out <- item
		}
	}()
	return out
}

// printMemStats prints memory statistics
func printMemStats() {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	fmt.Printf("Alloc: %v MiB, TotalAlloc: %v MiB, Sys: %v MiB, NumGC: %v\n",
		m.Alloc/1024/1024,
		m.TotalAlloc/1024/1024,
		m.Sys/1024/1024,
		m.NumGC)
}

func main() {
	// Create item pool
	pool := NewItemPool()
	
	// Create channels
	input := make(chan *Item, 100)
	
	// Create pipeline
	processed := memoryEfficientStage(input, pool)
	
	// Start consumer that returns items to the pool
	go func() {
		for item := range processed {
			// Use the item...
			
			// Return to pool when done
			pool.Put(item)
		}
	}()
	
	// Print initial memory stats
	fmt.Println("Initial memory stats:")
	printMemStats()
	
	// Process items
	for i := 0; i < 100000; i++ {
		// Get item from pool
		item := pool.Get()
		item.ID = i
		
		// Send to pipeline
		input <- item
		
		if i%10000 == 0 && i > 0 {
			fmt.Printf("\nAfter %d items:\n", i)
			printMemStats()
		}
	}
	
	close(input)
	
	// Final memory stats
	fmt.Println("\nFinal memory stats:")
	printMemStats()
}

Key memory optimization techniques:

  1. Object Pooling: Reuse objects to reduce allocation and GC pressure
  2. Buffer Reuse: Preallocate and reuse buffers for I/O operations
  3. Batch Processing: Process items in batches to reduce per-item overhead
  4. Memory Profiling: Regularly profile memory usage to identify leaks

These performance optimization techniques are essential for building high-throughput, low-latency pipeline systems.