Build efficient data processing pipelines in Go using channel patterns.

#Processing large amounts of data efficiently is a common challenge. Pipeline patterns break complex processing into stages, where each stage does one thing well and passes results to the next stage.

What Are Pipeline Patterns?

Think of an assembly line: each worker performs a specific task and passes the work to the next person. Pipeline patterns work similarly:

  1. Input Stage: Receives raw data
  2. Processing Stages: Transform, filter, or enrich data
  3. Output Stage: Sends results somewhere useful

Each stage runs concurrently, so while stage 1 processes item N, stage 2 can process item N-1, and stage 3 can process item N-2.

Why Use Pipelines?

Pipelines solve several problems:

  • Throughput: Process multiple items simultaneously
  • Modularity: Each stage has a single responsibility
  • Scalability: Add more workers to bottleneck stages
  • Testability: Test each stage independently
  • Backpressure: Handle situations where one stage is slower

Common Use Cases

Pipeline patterns work well for:

  • Log Processing: Parse, filter, and route log entries
  • Image Processing: Resize, compress, and store images
  • Data ETL: Extract, transform, and load data
  • Stream Analytics: Process real-time event streams
  • API Processing: Handle, validate, and respond to requests

Basic Pipeline Structure

func pipeline() {
    // Stage 1: Generate data
    input := make(chan int)
    
    // Stage 2: Process data
    processed := make(chan int)
    
    // Stage 3: Output results
    output := make(chan int)
    
    // Connect the stages
    go generator(input)
    go processor(input, processed)
    go consumer(processed)
}

This guide covers building robust pipelines that handle errors, backpressure, and graceful shutdown.


Pipeline Fundamentals and Design Principles

At its core, a pipeline consists of a series of stages connected by channels, where each stage receives input from an upstream stage, performs some processing, and sends output to a downstream stage. This simple concept forms the foundation for powerful data processing architectures.

Basic Pipeline Structure

Let’s start with a simple pipeline implementation that processes integers:

package main

import (
	"fmt"
)

// generator creates a channel that emits the numbers from 1 to n
func generator(n int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for i := 1; i <= n; i++ {
			out <- i
		}
	}()
	return out
}

// square receives integers from a channel, squares them, and sends the results to a new channel
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * n
		}
	}()
	return out
}

// sum receives integers from a channel, sums them, and returns the total
func sum(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		total := 0
		for n := range in {
			total += n
		}
		out <- total
	}()
	return out
}

func main() {
	// Create a pipeline: generator -> square -> sum
	c1 := generator(5)
	c2 := square(c1)
	c3 := sum(c2)

	// Output the final result
	fmt.Println("Sum of squares:", <-c3)
}

This example demonstrates the core components of a pipeline:

  1. Stages: Each function (generator, square, sum) represents a stage in the pipeline.
  2. Channels: Each stage is connected to the next via channels, which serve as conduits for data.
  3. Goroutines: Each stage runs in its own goroutine, enabling concurrent processing.
  4. Unidirectional Channel Types: Stages accept input channels as <-chan T and return output channels as <-chan T, making the data flow direction explicit.

Pipeline Design Principles

When designing pipelines, several key principles should guide your implementation:

1. Single Responsibility Principle

Each stage in a pipeline should have a single, well-defined responsibility. This promotes code reusability, testability, and maintainability.

// Bad: Stage doing too much
func processData(in <-chan Record) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for record := range in {
            // Parse the record
            parsed := parseRecord(record)
            
            // Validate the record
            if !isValid(parsed) {
                continue
            }
            
            // Transform the record
            transformed := transform(parsed)
            
            // Enrich the record with additional data
            enriched := enrich(transformed)
            
            out <- enriched
        }
    }()
    return out
}

// Better: Separate stages with single responsibilities
func parse(in <-chan Record) <-chan ParsedRecord {
    // Implementation
}

func validate(in <-chan ParsedRecord) <-chan ParsedRecord {
    // Implementation
}

func transform(in <-chan ParsedRecord) <-chan TransformedRecord {
    // Implementation
}

func enrich(in <-chan TransformedRecord) <-chan Result {
    // Implementation
}

// Usage:
// pipeline := enrich(transform(validate(parse(records))))
2. Explicit Error Handling

Errors should be treated as first-class citizens in pipelines, with explicit mechanisms for propagation and handling.

type Result struct {
    Value interface{}
    Err   error
}

func processStage(in <-chan Result) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for result := range in {
            // Propagate errors from upstream stages
            if result.Err != nil {
                out <- result
                continue
            }
            
            // Process the value and handle any errors
            value, err := process(result.Value)
            out <- Result{Value: value, Err: err}
        }
    }()
    return out
}
3. Cancellation and Cleanup

Pipelines should support graceful termination and resource cleanup, typically using the context package.

func processWithCancellation(ctx context.Context, in <-chan Data) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for {
            select {
            case data, ok := <-in:
                if !ok {
                    return // Input channel closed
                }
                // Process data and send result
                out <- process(data)
            case <-ctx.Done():
                // Context cancelled, clean up and exit
                return
            }
        }
    }()
    return out
}
4. Backpressure Handling

Pipelines should handle backpressure—the situation where a slow consumer causes upstream stages to slow down or buffer data.

// Using buffered channels to handle temporary backpressure
func bufferingStage(in <-chan Data, bufferSize int) <-chan Result {
    // Buffer helps handle temporary spikes in throughput
    out := make(chan Result, bufferSize)
    go func() {
        defer close(out)
        for data := range in {
            result := process(data)
            out <- result
        }
    }()
    return out
}
5. Resource Management

Pipelines should carefully manage resources like memory, file handles, and network connections.

func resourceManagedStage(in <-chan Request) <-chan Response {
    out := make(chan Response)
    go func() {
        defer close(out)
        
        // Acquire resource
        resource, err := acquireResource()
        if err != nil {
            out <- Response{Err: err}
            return
        }
        defer resource.Close() // Ensure resource is released
        
        for req := range in {
            // Use resource to process request
            resp := resource.Process(req)
            out <- resp
        }
    }()
    return out
}

These design principles form the foundation for building robust, maintainable pipeline architectures. In the next sections, we’ll explore more advanced pipeline patterns and implementations.

Advanced Pipeline Architectures

Building on the fundamentals, we can now explore more sophisticated pipeline architectures that address complex data processing requirements.

Fan-Out/Fan-In Pattern

The fan-out/fan-in pattern distributes work across multiple goroutines and then consolidates the results, enabling parallel processing of independent data items.

package main

import (
	"fmt"
	"sync"
	"time"
)

// generator produces integers from 1 to n
func generator(n int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for i := 1; i <= n; i++ {
			out <- i
		}
	}()
	return out
}

// processor is a stage that simulates CPU-intensive work
func processor(id int, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			// Simulate CPU-intensive work
			time.Sleep(100 * time.Millisecond)
			result := n * n
			fmt.Printf("Processor %d: %d² = %d\n", id, n, result)
			out <- result
		}
	}()
	return out
}

// fanOut creates multiple processor stages that read from the same input channel
func fanOut(in <-chan int, numProcessors int) []<-chan int {
	outputs := make([]<-chan int, numProcessors)
	for i := 0; i < numProcessors; i++ {
		outputs[i] = processor(i+1, in)
	}
	return outputs
}

// fanIn combines multiple input channels into a single output channel
func fanIn(inputs []<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup
	
	// Start a goroutine for each input channel
	for _, ch := range inputs {
		wg.Add(1)
		go func(ch <-chan int) {
			defer wg.Done()
			for n := range ch {
				out <- n
			}
		}(ch)
	}
	
	// Start a goroutine to close the output channel when all input channels are done
	go func() {
		wg.Wait()
		close(out)
	}()
	
	return out
}

func main() {
	startTime := time.Now()
	
	// Create the pipeline
	input := generator(10)
	processors := fanOut(input, 4)
	output := fanIn(processors)
	
	// Collect and sum the results
	sum := 0
	for n := range output {
		sum += n
	}
	
	fmt.Printf("Sum: %d\n", sum)
	fmt.Printf("Time taken: %v\n", time.Since(startTime))
}

This pattern is particularly useful for CPU-bound tasks where parallel processing can significantly improve throughput. The key components are:

  1. Fan-Out: Distribute work from a single source to multiple workers
  2. Parallel Processing: Each worker processes items independently
  3. Fan-In: Consolidate results from multiple workers into a single stream

Dynamic Pipeline Construction

In real-world applications, pipeline topologies may need to be constructed dynamically based on configuration or runtime conditions. Here’s a pattern for building pipelines dynamically:

package main

import (
	"fmt"
	"strings"
)

// PipelineStage represents a single stage in a pipeline
type PipelineStage func(<-chan string) <-chan string

// Pipeline represents a series of stages
type Pipeline struct {
	stages []PipelineStage
}

// NewPipeline creates a new empty pipeline
func NewPipeline() *Pipeline {
	return &Pipeline{stages: []PipelineStage{}}
}

// Add appends a stage to the pipeline
func (p *Pipeline) Add(stage PipelineStage) *Pipeline {
	p.stages = append(p.stages, stage)
	return p
}

// Run executes the pipeline with the given input
func (p *Pipeline) Run(input <-chan string) <-chan string {
	current := input
	for _, stage := range p.stages {
		current = stage(current)
	}
	return current
}

// Example pipeline stages
func toUpper(in <-chan string) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for s := range in {
			out <- strings.ToUpper(s)
		}
	}()
	return out
}

func addPrefix(prefix string) PipelineStage {
	return func(in <-chan string) <-chan string {
		out := make(chan string)
		go func() {
			defer close(out)
			for s := range in {
				out <- prefix + s
			}
		}()
		return out
	}
}

func addSuffix(suffix string) PipelineStage {
	return func(in <-chan string) <-chan string {
		out := make(chan string)
		go func() {
			defer close(out)
			for s := range in {
				out <- s + suffix
			}
		}()
		return out
	}
}

func filter(predicate func(string) bool) PipelineStage {
	return func(in <-chan string) <-chan string {
		out := make(chan string)
		go func() {
			defer close(out)
			for s := range in {
				if predicate(s) {
					out <- s
				}
			}
		}()
		return out
	}
}

func main() {
	// Create input channel
	input := make(chan string)
	
	// Build pipeline dynamically
	pipeline := NewPipeline()
	pipeline.Add(toUpper)
	pipeline.Add(addPrefix(">> "))
	pipeline.Add(filter(func(s string) bool {
		return len(s) > 5
	}))
	pipeline.Add(addSuffix(" <<"))
	
	// Run the pipeline
	output := pipeline.Run(input)
	
	// Feed input in a separate goroutine
	go func() {
		defer close(input)
		words := []string{"hello", "world", "pipeline", "go", "channels"}
		for _, word := range words {
			input <- word
		}
	}()
	
	// Collect results
	for result := range output {
		fmt.Println(result)
	}
}

This pattern provides several benefits:

  1. Composability: Pipeline stages can be combined in different ways
  2. Reusability: Stages can be reused across different pipelines
  3. Configurability: Pipelines can be constructed based on configuration
  4. Testability: Individual stages can be tested in isolation

Bidirectional Pipelines

Sometimes pipelines need to support bidirectional communication, where downstream stages can send feedback or control signals to upstream stages:

package main

import (
	"fmt"
	"time"
)

// Request represents a work item with a response channel
type Request struct {
	Data     int
	Response chan<- int
}

// Worker processes requests and sends responses back
func worker(requests <-chan Request) {
	for req := range requests {
		// Simulate processing
		time.Sleep(100 * time.Millisecond)
		result := req.Data * req.Data
		
		// Send response back through the response channel
		req.Response <- result
		close(req.Response) // Signal that no more responses will be sent
	}
}

func main() {
	// Create request channel
	requests := make(chan Request)
	
	// Start worker
	go worker(requests)
	
	// Send requests and receive responses
	for i := 1; i <= 5; i++ {
		// Create response channel for this request
		respCh := make(chan int)
		
		// Send request with response channel
		requests <- Request{
			Data:     i,
			Response: respCh,
		}
		
		// Wait for response
		resp := <-respCh
		fmt.Printf("Request: %d, Response: %d\n", i, resp)
	}
	
	close(requests)
}

This pattern enables:

  1. Request-Response Communication: Each request can receive a dedicated response
  2. Feedback Loops: Downstream stages can provide feedback to upstream stages
  3. Dynamic Flow Control: Processing can adapt based on feedback

Multi-Stage Pipeline with Context Cancellation

For long-running pipelines, proper cancellation support is essential. Here’s a pattern that integrates context cancellation across multiple stages:

package main

import (
	"context"
	"fmt"
	"time"
)

// generator produces integers until cancelled
func generator(ctx context.Context) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		n := 1
		for {
			select {
			case <-ctx.Done():
				fmt.Println("Generator cancelled")
				return
			case out <- n:
				n++
				time.Sleep(100 * time.Millisecond)
			}
		}
	}()
	return out
}

// processor squares numbers and handles cancellation
func processor(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for {
			select {
			case <-ctx.Done():
				fmt.Println("Processor cancelled")
				return
			case n, ok := <-in:
				if !ok {
					return
				}
				select {
				case <-ctx.Done():
					fmt.Println("Processor cancelled")
					return
				case out <- n * n:
					// Value sent successfully
				}
			}
		}
	}()
	return out
}

// filter only passes even numbers and handles cancellation
func filter(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for {
			select {
			case <-ctx.Done():
				fmt.Println("Filter cancelled")
				return
			case n, ok := <-in:
				if !ok {
					return
				}
				if n%2 == 0 {
					select {
					case <-ctx.Done():
						fmt.Println("Filter cancelled")
						return
					case out <- n:
						// Value sent successfully
					}
				}
			}
		}
	}()
	return out
}

func main() {
	// Create a context with cancellation
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
	
	// Build and run the pipeline
	gen := generator(ctx)
	proc := processor(ctx, gen)
	filt := filter(ctx, proc)
	
	// Consume the results
	for n := range filt {
		fmt.Println("Result:", n)
	}
	
	fmt.Println("Pipeline completed")
}

This pattern ensures:

  1. Graceful Termination: All stages can clean up resources when cancelled
  2. Propagation of Cancellation: Cancellation signals propagate through the entire pipeline
  3. Timeout Support: Pipelines can automatically terminate after a specified duration
  4. Resource Management: Resources are properly released when the pipeline terminates

These advanced pipeline architectures provide powerful tools for building sophisticated data processing systems. In the next section, we’ll explore specific patterns for stream processing.

Stream Processing Patterns

Stream processing involves continuously processing data as it arrives, rather than processing data in batches. Go’s concurrency model is particularly well-suited for implementing stream processing patterns.

Linear Stream Processing

The simplest form of stream processing is a linear pipeline where data flows through a sequence of transformations:

package main

import (
	"fmt"
	"strings"
	"time"
)

// LogEntry represents a log entry in a stream
type LogEntry struct {
	Timestamp time.Time
	Level     string
	Message   string
}

// Source generates a stream of log entries
func source() <-chan LogEntry {
	out := make(chan LogEntry)
	go func() {
		defer close(out)
		levels := []string{"INFO", "WARNING", "ERROR", "DEBUG"}
		messages := []string{
			"User logged in",
			"Failed to connect to database",
			"Processing request",
			"Cache miss",
			"Request completed",
		}
		
		for i := 0; i < 20; i++ {
			entry := LogEntry{
				Timestamp: time.Now().Add(-time.Duration(i) * time.Minute),
				Level:     levels[i%len(levels)],
				Message:   messages[i%len(messages)],
			}
			out <- entry
			time.Sleep(100 * time.Millisecond)
		}
	}()
	return out
}

// Filter keeps only entries with specified log levels
func filter(in <-chan LogEntry, allowedLevels ...string) <-chan LogEntry {
	out := make(chan LogEntry)
	go func() {
		defer close(out)
		levelSet := make(map[string]bool)
		for _, level := range allowedLevels {
			levelSet[level] = true
		}
		
		for entry := range in {
			if levelSet[entry.Level] {
				out <- entry
			}
		}
	}()
	return out
}

// Enrich adds additional information to log entries
func enrich(in <-chan LogEntry) <-chan LogEntry {
	out := make(chan LogEntry)
	go func() {
		defer close(out)
		for entry := range in {
			// Add additional context to error messages
			if entry.Level == "ERROR" {
				entry.Message = fmt.Sprintf("%s (Contact: [email protected])", entry.Message)
			}
			out <- entry
		}
	}()
	return out
}

// Format converts log entries to formatted strings
func format(in <-chan LogEntry) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for entry := range in {
			formatted := fmt.Sprintf(
				"[%s] %s: %s",
				entry.Timestamp.Format("15:04:05"),
				entry.Level,
				entry.Message,
			)
			out <- formatted
		}
	}()
	return out
}

// Sink consumes the final stream and performs an action
func sink(in <-chan string) {
	for s := range in {
		fmt.Println(s)
	}
}

func main() {
	// Build the stream processing pipeline
	logs := source()
	filtered := filter(logs, "ERROR", "WARNING")
	enriched := enrich(filtered)
	formatted := format(enriched)
	
	// Consume the stream
	sink(formatted)
}

This pattern is effective for straightforward transformations where each item is processed independently.

Windowed Stream Processing

Many stream processing applications need to analyze data within time or count-based windows:

package main

import (
	"fmt"
	"time"
)

// Event represents a data point in a stream
type Event struct {
	Timestamp time.Time
	Value     float64
}

// Window represents a collection of events within a time window
type Window struct {
	StartTime time.Time
	EndTime   time.Time
	Events    []Event
}

// source generates a stream of events
func source() <-chan Event {
	out := make(chan Event)
	go func() {
		defer close(out)
		now := time.Now()
		for i := 0; i < 100; i++ {
			event := Event{
				Timestamp: now.Add(time.Duration(i*100) * time.Millisecond),
				Value:     float64(i % 10),
			}
			out <- event
			time.Sleep(50 * time.Millisecond)
		}
	}()
	return out
}

// tumblingWindow groups events into non-overlapping time windows of fixed duration
func tumblingWindow(in <-chan Event, windowDuration time.Duration) <-chan Window {
	out := make(chan Window)
	go func() {
		defer close(out)
		
		var currentWindow Window
		var windowStarted bool
		
		for event := range in {
			// If this is the first event or the event belongs to a new window
			if !windowStarted || event.Timestamp.After(currentWindow.EndTime) {
				// If we have a window in progress, emit it
				if windowStarted {
					out <- currentWindow
				}
				
				// Start a new window
				windowStart := event.Timestamp
				windowEnd := windowStart.Add(windowDuration)
				currentWindow = Window{
					StartTime: windowStart,
					EndTime:   windowEnd,
					Events:    []Event{event},
				}
				windowStarted = true
			} else {
				// Add event to current window
				currentWindow.Events = append(currentWindow.Events, event)
			}
		}
		
		// Emit the last window if it has events
		if windowStarted && len(currentWindow.Events) > 0 {
			out <- currentWindow
		}
	}()
	return out
}

// slidingWindow groups events into overlapping time windows
func slidingWindow(in <-chan Event, windowDuration, slideInterval time.Duration) <-chan Window {
	out := make(chan Window)
	go func() {
		defer close(out)
		
		// Buffer to hold events for windowing
		var buffer []Event
		
		// Track the oldest timestamp we need to keep
		var oldestNeeded time.Time
		
		for event := range in {
			// Add new event to buffer
			buffer = append(buffer, event)
			
			// Update oldest timestamp needed
			if oldestNeeded.IsZero() {
				oldestNeeded = event.Timestamp
			}
			
			// Check if we should emit a window
			windowEnd := event.Timestamp
			windowStart := windowEnd.Add(-windowDuration)
			
			// Emit window if slide interval has passed since last emission
			if oldestNeeded.IsZero() || event.Timestamp.Sub(oldestNeeded) >= slideInterval {
				// Create window with events in the time range
				window := Window{
					StartTime: windowStart,
					EndTime:   windowEnd,
					Events:    []Event{},
				}
				
				for _, e := range buffer {
					if !e.Timestamp.Before(windowStart) && !e.Timestamp.After(windowEnd) {
						window.Events = append(window.Events, e)
					}
				}
				
				// Emit window if it has events
				if len(window.Events) > 0 {
					out <- window
				}
				
				// Update oldest needed timestamp
				oldestNeeded = event.Timestamp
				
				// Remove events that are no longer needed
				newBuffer := []Event{}
				for _, e := range buffer {
					if !e.Timestamp.Before(windowStart) {
						newBuffer = append(newBuffer, e)
					}
				}
				buffer = newBuffer
			}
		}
	}()
	return out
}

// aggregate computes statistics for each window
func aggregate(in <-chan Window) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for window := range in {
			// Calculate average
			sum := 0.0
			for _, event := range window.Events {
				sum += event.Value
			}
			avg := sum / float64(len(window.Events))
			
			result := fmt.Sprintf(
				"Window [%s to %s]: %d events, avg=%.2f",
				window.StartTime.Format("15:04:05.000"),
				window.EndTime.Format("15:04:05.000"),
				len(window.Events),
				avg,
			)
			out <- result
		}
	}()
	return out
}

func main() {
	events := source()
	
	// Create two different windowing strategies
	tumbling := tumblingWindow(events, 1*time.Second)
	tumblingResults := aggregate(tumbling)
	
	// Consume and print results
	for result := range tumblingResults {
		fmt.Println(result)
	}
}

Windowed processing is essential for:

  1. Time-Based Analysis: Computing statistics over time periods
  2. Trend Detection: Identifying patterns in streaming data
  3. Anomaly Detection: Detecting unusual behavior in real-time

Stateful Stream Processing

Some stream processing applications need to maintain state across multiple events:

package main

import (
	"fmt"
	"time"
)

// Transaction represents a financial transaction
type Transaction struct {
	ID        string
	UserID    string
	Amount    float64
	Timestamp time.Time
}

// Alert represents a fraud detection alert
type Alert struct {
	UserID      string
	Reason      string
	Transactions []Transaction
	Timestamp   time.Time
}

// source generates a stream of transactions
func source() <-chan Transaction {
	out := make(chan Transaction)
	go func() {
		defer close(out)
		users := []string{"user1", "user2", "user3", "user4"}
		now := time.Now()
		
		// Generate normal transactions
		for i := 0; i < 20; i++ {
			userID := users[i%len(users)]
			amount := 10.0 + float64(i%5)*20.0
			
			// For user3, generate suspicious transactions
			if userID == "user3" && i > 10 {
				amount = 500.0 + float64(i%3)*200.0
			}
			
			tx := Transaction{
				ID:        fmt.Sprintf("tx-%d", i),
				UserID:    userID,
				Amount:    amount,
				Timestamp: now.Add(time.Duration(i*30) * time.Second),
			}
			out <- tx
			time.Sleep(100 * time.Millisecond)
		}
	}()
	return out
}

// fraudDetector maintains state to detect suspicious patterns
func fraudDetector(in <-chan Transaction) <-chan Alert {
	out := make(chan Alert)
	go func() {
		defer close(out)
		
		// State: track recent transactions by user
		userTransactions := make(map[string][]Transaction)
		
		// State: track total amount in last hour by user
		userAmounts := make(map[string]float64)
		
		for tx := range in {
			// Add transaction to user history
			userTransactions[tx.UserID] = append(userTransactions[tx.UserID], tx)
			
			// Update total amount (simplified - not actually checking time window)
			userAmounts[tx.UserID] += tx.Amount
			
			// Check for suspicious activity
			if userAmounts[tx.UserID] > 1000.0 {
				// Create alert
				alert := Alert{
					UserID:      tx.UserID,
					Reason:      "High transaction volume",
					Timestamp:   time.Now(),
				}
				out <- alert
				
				// Reset state for this user
				userAmounts[tx.UserID] = 0
			}
		}
	}()
	return out
}

// alertHandler processes alerts
func alertHandler(in <-chan Alert) {
	for alert := range in {
		fmt.Printf("ALERT for %s: %s\n", alert.UserID, alert.Reason)
		fmt.Printf("  Total transactions: %d\n", len(alert.Transactions))
		fmt.Printf("  Latest transaction: %s for $%.2f\n",
			alert.Transactions[len(alert.Transactions)-1].ID,
			alert.Transactions[len(alert.Transactions)-1].Amount)
	}
}

func main() {
	// Build the stateful stream processing pipeline
	transactions := source()
	alerts := fraudDetector(transactions)
	
	// Process alerts
	alertHandler(alerts)
}

This pattern enables:

  1. State Maintenance: Keeping track of information across multiple events
  2. Pattern Recognition: Identifying complex patterns that span multiple events
  3. Aggregation: Computing running statistics over a stream of events

These stream processing patterns provide powerful tools for building real-time data processing systems. In the next section, we’ll explore error handling and recovery strategies for pipelines.

Error Handling and Recovery in Pipelines

Robust error handling is critical for production-grade pipeline systems. Let’s explore patterns for handling errors in pipeline architectures.

Error Propagation Patterns

There are several approaches to propagating errors through a pipeline:

package main

import (
	"errors"
	"fmt"
	"math/rand"
	"time"
)

// Result wraps a value and an error
type Result struct {
	Value interface{}
	Err   error
}

// source generates integers with occasional errors
func source() <-chan Result {
	out := make(chan Result)
	go func() {
		defer close(out)
		for i := 0; i < 10; i++ {
			// Randomly generate errors
			if rand.Float64() < 0.3 {
				out <- Result{Err: fmt.Errorf("error generating value %d", i)}
				continue
			}
			
			out <- Result{Value: i, Err: nil}
			time.Sleep(100 * time.Millisecond)
		}
	}()
	return out
}

// transform applies a transformation, propagating any errors
func transform(in <-chan Result) <-chan Result {
	out := make(chan Result)
	go func() {
		defer close(out)
		for res := range in {
			// If we received an error, propagate it
			if res.Err != nil {
				out <- res
				continue
			}
			
			// Try to process the value
			val, ok := res.Value.(int)
			if !ok {
				out <- Result{Err: errors.New("expected integer value")}
				continue
			}
			
			// Randomly fail during processing
			if rand.Float64() < 0.2 {
				out <- Result{Err: fmt.Errorf("error processing value %d", val)}
				continue
			}
			
			// Success case
			out <- Result{Value: val * val, Err: nil}
		}
	}()
	return out
}

// sink consumes results and handles errors
func sink(in <-chan Result) {
	for res := range in {
		if res.Err != nil {
			fmt.Printf("Error: %v\n", res.Err)
		} else {
			fmt.Printf("Result: %v\n", res.Value)
		}
	}
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Build and run the pipeline
	results := transform(source())
	sink(results)
}

This pattern demonstrates:

  1. Error Wrapping: Each value is wrapped with potential error information
  2. Error Propagation: Errors from upstream stages are passed downstream
  3. Error Generation: Stages can generate their own errors
  4. Error Handling: The final stage decides how to handle errors

Error Recovery Strategies

For long-running pipelines, recovering from errors rather than failing is often desirable:

package main

import (
	"context"
	"errors"
	"fmt"
	"math/rand"
	"time"
)

// retryableStage attempts to process items with retries
func retryableStage(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			// Try to process with retries
			result, err := processWithRetry(n, 3, 100*time.Millisecond)
			if err != nil {
				fmt.Printf("Failed to process %d after retries: %v\n", n, err)
				continue // Skip this item
			}
			out <- result
		}
	}()
	return out
}

// processWithRetry attempts to process a value with retries
func processWithRetry(n int, maxRetries int, delay time.Duration) (int, error) {
	var lastErr error
	
	for attempt := 0; attempt <= maxRetries; attempt++ {
		// Attempt to process
		result, err := process(n)
		if err == nil {
			return result, nil // Success
		}
		
		lastErr = err
		fmt.Printf("Attempt %d failed for %d: %v\n", attempt+1, n, err)
		
		// Don't sleep after the last attempt
		if attempt < maxRetries {
			// Exponential backoff
			sleepTime := delay * time.Duration(1<<attempt)
			time.Sleep(sleepTime)
		}
	}
	
	return 0, fmt.Errorf("all retries failed: %w", lastErr)
}

// process simulates a flaky operation
func process(n int) (int, error) {
	// Simulate random failures
	if rand.Float64() < 0.6 {
		return 0, errors.New("random processing error")
	}
	return n * n, nil
}

// circuitBreakerStage implements the circuit breaker pattern
func circuitBreakerStage(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		
		// Circuit breaker state
		var failures int
		var lastFailure time.Time
		const maxFailures = 3
		const resetTimeout = 5 * time.Second
		circuitOpen := false
		
		for n := range in {
			// Check if circuit is open
			if circuitOpen {
				// Check if we should try to reset
				if time.Since(lastFailure) > resetTimeout {
					fmt.Println("Circuit half-open, attempting reset")
					circuitOpen = false
					failures = 0
				} else {
					fmt.Printf("Circuit open, skipping item %d\n", n)
					continue
				}
			}
			
			// Try to process
			result, err := process(n)
			if err != nil {
				failures++
				lastFailure = time.Now()
				fmt.Printf("Processing failed for %d: %v (failures: %d)\n", n, err, failures)
				
				// Check if we should open the circuit
				if failures >= maxFailures {
					fmt.Println("Circuit breaker tripped, opening circuit")
					circuitOpen = true
				}
				continue
			}
			
			// Success, reset failure count
			failures = 0
			out <- result
		}
	}()
	return out
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create input channel
	input := make(chan int)
	go func() {
		defer close(input)
		for i := 1; i <= 20; i++ {
			input <- i
			time.Sleep(200 * time.Millisecond)
		}
	}()
	
	// Create pipeline with retry stage
	retryOutput := retryableStage(input)
	
	// Consume results
	for result := range retryOutput {
		fmt.Printf("Got result: %d\n", result)
	}
}

This example demonstrates two important error recovery patterns:

  1. Retry Pattern: Automatically retry failed operations with exponential backoff
  2. Circuit Breaker Pattern: Prevent cascading failures by temporarily disabling operations after repeated failures

Partial Failure Handling

In distributed pipelines, handling partial failures is essential:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// BatchResult represents the result of processing a batch of items
type BatchResult struct {
	Successful []int
	Failed     map[int]error
}

// batchProcessor processes items in batches with partial failure handling
func batchProcessor(ctx context.Context, in <-chan int, batchSize int) <-chan BatchResult {
	out := make(chan BatchResult)
	go func() {
		defer close(out)
		
		batch := make([]int, 0, batchSize)
		
		// Helper function to process and send the current batch
		processBatch := func() {
			if len(batch) == 0 {
				return
			}
			
			result := BatchResult{
				Successful: []int{},
				Failed:     make(map[int]error),
			}
			
			// Process each item in the batch
			for _, item := range batch {
				// Simulate processing with potential failures
				if item%3 == 0 {
					result.Failed[item] = fmt.Errorf("failed to process item %d", item)
				} else {
					result.Successful = append(result.Successful, item)
				}
			}
			
			// Send batch result
			select {
			case out <- result:
				// Result sent successfully
			case <-ctx.Done():
				return
			}
			
			// Clear the batch
			batch = make([]int, 0, batchSize)
		}
		
		for {
			select {
			case item, ok := <-in:
				if !ok {
					// Input channel closed, process remaining items
					processBatch()
					return
				}
				
				// Add item to batch
				batch = append(batch, item)
				
				// Process batch if it's full
				if len(batch) >= batchSize {
					processBatch()
				}
				
			case <-ctx.Done():
				// Context cancelled
				return
			}
		}
	}()
	return out
}

func main() {
	// Create context with cancellation
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	// Create input channel
	input := make(chan int)
	go func() {
		defer close(input)
		for i := 1; i <= 20; i++ {
			input <- i
			time.Sleep(100 * time.Millisecond)
		}
	}()
	
	// Process in batches
	results := batchProcessor(ctx, input, 5)
	
	// Handle batch results
	for result := range results {
		fmt.Printf("Batch processed: %d successful, %d failed\n", 
			len(result.Successful), len(result.Failed))
		
		if len(result.Successful) > 0 {
			fmt.Printf("  Successful items: %v\n", result.Successful)
		}
		
		if len(result.Failed) > 0 {
			fmt.Println("  Failed items:")
			for item, err := range result.Failed {
				fmt.Printf("    Item %d: %v\n", item, err)
			}
		}
	}
}

This pattern enables:

  1. Batch Processing: Process items in groups for efficiency
  2. Partial Success: Continue processing despite some failures
  3. Detailed Error Reporting: Track exactly which items failed and why
  4. Graceful Degradation: The pipeline continues to make progress even with errors

These error handling patterns are essential for building resilient pipeline systems that can recover from failures and continue processing data.

Performance Optimization and Monitoring

To build high-performance pipeline systems, careful optimization and monitoring are essential.

Buffer Sizing and Throughput Optimization

The size of channel buffers can significantly impact pipeline performance:

package main

import (
	"fmt"
	"sync"
	"time"
)

// benchmarkPipeline measures the throughput of a pipeline with different buffer sizes
func benchmarkPipeline(bufferSize int, numItems int) time.Duration {
	start := time.Now()
	
	// Create stages with specified buffer size
	stage1 := make(chan int, bufferSize)
	stage2 := make(chan int, bufferSize)
	stage3 := make(chan int, bufferSize)
	
	// Start producer
	go func() {
		defer close(stage1)
		for i := 0; i < numItems; i++ {
			stage1 <- i
		}
	}()
	
	// Start stage 2 processor
	go func() {
		defer close(stage2)
		for item := range stage1 {
			// Simulate processing
			time.Sleep(1 * time.Millisecond)
			stage2 <- item * 2
		}
	}()
	
	// Start stage 3 processor
	go func() {
		defer close(stage3)
		for item := range stage2 {
			// Simulate processing
			time.Sleep(2 * time.Millisecond)
			stage3 <- item + 1
		}
	}()
	
	// Consume results
	var count int
	for range stage3 {
		count++
	}
	
	return time.Since(start)
}

func main() {
	numItems := 1000
	bufferSizes := []int{1, 10, 100, 1000}
	
	fmt.Printf("Processing %d items through a 3-stage pipeline\n", numItems)
	fmt.Println("Buffer Size | Duration | Items/sec")
	fmt.Println("-----------|----------|----------")
	
	for _, size := range bufferSizes {
		duration := benchmarkPipeline(size, numItems)
		itemsPerSec := float64(numItems) / duration.Seconds()
		
		fmt.Printf("%10d | %8v | %9.2f\n", size, duration.Round(time.Millisecond), itemsPerSec)
	}
}

Key optimization principles:

  1. Buffer Sizing: Larger buffers can improve throughput by reducing blocking, but consume more memory
  2. Stage Balancing: Match processing speeds across stages to minimize bottlenecks
  3. Batch Processing: Process items in batches to amortize overhead costs

Pipeline Monitoring and Metrics

Monitoring pipeline performance is essential for identifying bottlenecks and ensuring reliability:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// PipelineMetrics tracks performance metrics for a pipeline stage
type PipelineMetrics struct {
	StageName       string
	ItemsProcessed  atomic.Int64
	ErrorCount      atomic.Int64
	ProcessingTime  atomic.Int64 // Nanoseconds
	LastProcessed   atomic.Int64 // Unix timestamp
	QueueDepth      atomic.Int64
	MaxQueueDepth   atomic.Int64
	TotalWaitTime   atomic.Int64 // Nanoseconds
	ProcessingCount atomic.Int64 // Number of items currently being processed
}

// NewPipelineMetrics creates a new metrics tracker for a stage
func NewPipelineMetrics(name string) *PipelineMetrics {
	return &PipelineMetrics{
		StageName: name,
	}
}

// RecordProcessingTime records the time taken to process an item
func (m *PipelineMetrics) RecordProcessingTime(duration time.Duration) {
	m.ProcessingTime.Add(int64(duration))
	m.ItemsProcessed.Add(1)
	m.LastProcessed.Store(time.Now().Unix())
}

// RecordError increments the error count
func (m *PipelineMetrics) RecordError() {
	m.ErrorCount.Add(1)
}

// RecordQueueDepth updates the queue depth metrics
func (m *PipelineMetrics) RecordQueueDepth(depth int) {
	m.QueueDepth.Store(int64(depth))
	
	// Update max queue depth if needed
	for {
		current := m.MaxQueueDepth.Load()
		if int64(depth) <= current {
			break
		}
		if m.MaxQueueDepth.CompareAndSwap(current, int64(depth)) {
			break
		}
	}
}

// RecordWaitTime records the time an item spent waiting in the queue
func (m *PipelineMetrics) RecordWaitTime(duration time.Duration) {
	m.TotalWaitTime.Add(int64(duration))
}

// BeginProcessing increments the count of items being processed
func (m *PipelineMetrics) BeginProcessing() {
	m.ProcessingCount.Add(1)
}

// EndProcessing decrements the count of items being processed
func (m *PipelineMetrics) EndProcessing() {
	m.ProcessingCount.Add(-1)
}

// GetStats returns a snapshot of the current metrics
func (m *PipelineMetrics) GetStats() map[string]interface{} {
	itemsProcessed := m.ItemsProcessed.Load()
	processingTime := m.ProcessingTime.Load()
	
	var avgProcessingTime float64
	if itemsProcessed > 0 {
		avgProcessingTime = float64(processingTime) / float64(itemsProcessed) / float64(time.Millisecond)
	}
	
	var avgWaitTime float64
	if itemsProcessed > 0 {
		avgWaitTime = float64(m.TotalWaitTime.Load()) / float64(itemsProcessed) / float64(time.Millisecond)
	}
	
	return map[string]interface{}{
		"stage_name":          m.StageName,
		"items_processed":     itemsProcessed,
		"errors":              m.ErrorCount.Load(),
		"avg_processing_time": avgProcessingTime,
		"last_processed":      time.Unix(m.LastProcessed.Load(), 0),
		"current_queue_depth": m.QueueDepth.Load(),
		"max_queue_depth":     m.MaxQueueDepth.Load(),
		"avg_wait_time":       avgWaitTime,
		"active_processors":   m.ProcessingCount.Load(),
	}
}

// instrumentedStage wraps a processing function with metrics
func instrumentedStage(name string, in <-chan int, processFunc func(int) (int, error)) (<-chan int, *PipelineMetrics) {
	metrics := NewPipelineMetrics(name)
	out := make(chan int, cap(in)) // Match buffer size
	
	go func() {
		defer close(out)
		
		for item := range in {
			// Record queue depth
			metrics.RecordQueueDepth(len(in))
			
			// Record wait time (simplified - in a real system you'd track per-item timestamps)
			waitStart := time.Now()
			
			// Begin processing
			metrics.BeginProcessing()
			processStart := time.Now()
			metrics.RecordWaitTime(processStart.Sub(waitStart))
			
			// Process the item
			result, err := processFunc(item)
			
			// Record metrics
			metrics.RecordProcessingTime(time.Since(processStart))
			metrics.EndProcessing()
			
			if err != nil {
				metrics.RecordError()
				fmt.Printf("Error in %s: %v\n", name, err)
				continue
			}
			
			// Send result to output
			out <- result
		}
	}()
	
	return out, metrics
}

// metricsReporter periodically prints metrics
func metricsReporter(metrics []*PipelineMetrics, interval time.Duration, done <-chan struct{}) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			fmt.Println("\n--- Pipeline Metrics ---")
			for _, m := range metrics {
				stats := m.GetStats()
				fmt.Printf("Stage: %s\n", stats["stage_name"])
				fmt.Printf("  Processed: %d items (%.2f items/sec)\n", 
					stats["items_processed"],
					float64(stats["items_processed"].(int64))/interval.Seconds())
				fmt.Printf("  Errors: %d\n", stats["errors"])
				fmt.Printf("  Avg Processing Time: %.2f ms\n", stats["avg_processing_time"])
				fmt.Printf("  Queue Depth: %d (max: %d)\n", 
					stats["current_queue_depth"], stats["max_queue_depth"])
				fmt.Printf("  Avg Wait Time: %.2f ms\n", stats["avg_wait_time"])
				fmt.Printf("  Active Processors: %d\n", stats["active_processors"])
			}
		case <-done:
			return
		}
	}
}

func main() {
	// Create input channel
	input := make(chan int, 100)
	
	// Create instrumented pipeline stages
	stage1, metrics1 := instrumentedStage("Processor", input, func(i int) (int, error) {
		time.Sleep(10 * time.Millisecond)
		return i * 2, nil
	})
	
	stage2, metrics2 := instrumentedStage("Transformer", stage1, func(i int) (int, error) {
		time.Sleep(15 * time.Millisecond)
		return i + 1, nil
	})
	
	stage3, metrics3 := instrumentedStage("Validator", stage2, func(i int) (int, error) {
		time.Sleep(5 * time.Millisecond)
		return i, nil
	})
	
	// Start metrics reporter
	done := make(chan struct{})
	go metricsReporter([]*PipelineMetrics{metrics1, metrics2, metrics3}, 1*time.Second, done)
	
	// Start consumer
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		count := 0
		for range stage3 {
			count++
		}
		fmt.Printf("\nProcessed %d items\n", count)
	}()
	
	// Feed input
	for i := 0; i < 1000; i++ {
		input <- i
		time.Sleep(5 * time.Millisecond)
	}
	close(input)
	
	// Wait for pipeline to complete
	wg.Wait()
	close(done)
}

This monitoring approach provides:

  1. Real-time Metrics: Track throughput, latency, and error rates in real time
  2. Bottleneck Identification: Identify stages that are processing slowly or have deep queues
  3. Resource Utilization: Monitor memory usage through queue depths
  4. Error Tracking: Track error rates by stage

Memory Optimization Techniques

Memory efficiency is critical for high-throughput pipelines:

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

// Item represents a data item to be processed
type Item struct {
	ID    int
	Data  []byte
	Extra map[string]interface{}
}

// ItemPool implements an object pool for Items
type ItemPool struct {
	pool sync.Pool
}

// NewItemPool creates a new pool for Items
func NewItemPool() *ItemPool {
	return &ItemPool{
		pool: sync.Pool{
			New: func() interface{} {
				return &Item{
					Data:  make([]byte, 1024), // Pre-allocate buffer
					Extra: make(map[string]interface{}),
				}
			},
		},
	}
}

// Get retrieves an Item from the pool
func (p *ItemPool) Get() *Item {
	item := p.pool.Get().(*Item)
	// Reset the item (clear the map but keep the allocated memory)
	for k := range item.Extra {
		delete(item.Extra, k)
	}
	return item
}

// Put returns an Item to the pool
func (p *ItemPool) Put(item *Item) {
	p.pool.Put(item)
}

// memoryEfficientStage processes items using an object pool
func memoryEfficientStage(in <-chan *Item, pool *ItemPool) <-chan *Item {
	out := make(chan *Item, cap(in))
	go func() {
		defer close(out)
		for item := range in {
			// Process the item
			item.Extra["processed"] = true
			item.Extra["timestamp"] = time.Now()
			
			// Send to output
			out <- item
		}
	}()
	return out
}

// printMemStats prints memory statistics
func printMemStats() {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	fmt.Printf("Alloc: %v MiB, TotalAlloc: %v MiB, Sys: %v MiB, NumGC: %v\n",
		m.Alloc/1024/1024,
		m.TotalAlloc/1024/1024,
		m.Sys/1024/1024,
		m.NumGC)
}

func main() {
	// Create item pool
	pool := NewItemPool()
	
	// Create channels
	input := make(chan *Item, 100)
	
	// Create pipeline
	processed := memoryEfficientStage(input, pool)
	
	// Start consumer that returns items to the pool
	go func() {
		for item := range processed {
			// Use the item...
			
			// Return to pool when done
			pool.Put(item)
		}
	}()
	
	// Print initial memory stats
	fmt.Println("Initial memory stats:")
	printMemStats()
	
	// Process items
	for i := 0; i < 100000; i++ {
		// Get item from pool
		item := pool.Get()
		item.ID = i
		
		// Send to pipeline
		input <- item
		
		if i%10000 == 0 && i > 0 {
			fmt.Printf("\nAfter %d items:\n", i)
			printMemStats()
		}
	}
	
	close(input)
	
	// Final memory stats
	fmt.Println("\nFinal memory stats:")
	printMemStats()
}

Key memory optimization techniques:

  1. Object Pooling: Reuse objects to reduce allocation and GC pressure
  2. Buffer Reuse: Preallocate and reuse buffers for I/O operations
  3. Batch Processing: Process items in batches to reduce per-item overhead
  4. Memory Profiling: Regularly profile memory usage to identify leaks

These performance optimization techniques are essential for building high-throughput, low-latency pipeline systems.

Production Implementation Strategies

When deploying pipeline architectures to production environments, several additional considerations come into play.

Graceful Shutdown

Proper shutdown handling ensures that in-flight data is processed and resources are released:

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

// Pipeline represents a multi-stage processing pipeline
type Pipeline struct {
	stages []Stage
	wg     sync.WaitGroup
	ctx    context.Context
	cancel context.CancelFunc
}

// Stage represents a single stage in the pipeline
type Stage struct {
	Name     string
	Process  func(context.Context) error
	Shutdown func() error
}

// NewPipeline creates a new pipeline with cancellation support
func NewPipeline() *Pipeline {
	ctx, cancel := context.WithCancel(context.Background())
	return &Pipeline{
		ctx:    ctx,
		cancel: cancel,
	}
}

// AddStage adds a processing stage to the pipeline
func (p *Pipeline) AddStage(name string, process func(context.Context) error, shutdown func() error) {
	p.stages = append(p.stages, Stage{
		Name:     name,
		Process:  process,
		Shutdown: shutdown,
	})
}

// Start launches all pipeline stages
func (p *Pipeline) Start() {
	fmt.Println("Starting pipeline...")
	
	for _, stage := range p.stages {
		p.wg.Add(1)
		s := stage // Capture for closure
		
		go func() {
			defer p.wg.Done()
			fmt.Printf("Starting stage: %s\n", s.Name)
			
			err := s.Process(p.ctx)
			if err != nil && err != context.Canceled {
				fmt.Printf("Error in stage %s: %v\n", s.Name, err)
			}
			
			fmt.Printf("Stage completed: %s\n", s.Name)
		}()
	}
}

// Shutdown gracefully shuts down the pipeline
func (p *Pipeline) Shutdown(timeout time.Duration) error {
	fmt.Println("Initiating graceful shutdown...")
	
	// Signal all stages to stop accepting new work
	p.cancel()
	
	// Create a timeout context for shutdown
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	
	// Create a channel to signal when all goroutines are done
	done := make(chan struct{})
	
	// Wait for all goroutines in a separate goroutine
	go func() {
		p.wg.Wait()
		close(done)
	}()
	
	// Wait for completion or timeout
	select {
	case <-done:
		fmt.Println("All stages completed gracefully")
	case <-ctx.Done():
		return fmt.Errorf("shutdown timed out after %v", timeout)
	}
	
	// Call shutdown functions for each stage in reverse order
	for i := len(p.stages) - 1; i >= 0; i-- {
		stage := p.stages[i]
		if stage.Shutdown != nil {
			fmt.Printf("Shutting down stage: %s\n", stage.Name)
			if err := stage.Shutdown(); err != nil {
				fmt.Printf("Error shutting down stage %s: %v\n", stage.Name, err)
			}
		}
	}
	
	return nil
}

func main() {
	// Create pipeline
	pipeline := NewPipeline()
	
	// Add stages
	pipeline.AddStage(
		"DataSource",
		func(ctx context.Context) error {
			ticker := time.NewTicker(200 * time.Millisecond)
			defer ticker.Stop()
			
			for {
				select {
				case <-ticker.C:
					fmt.Println("DataSource: Generating data...")
				case <-ctx.Done():
					fmt.Println("DataSource: Stopping data generation")
					return ctx.Err()
				}
			}
		},
		func() error {
			fmt.Println("DataSource: Closing resources")
			return nil
		},
	)
	
	pipeline.AddStage(
		"Processor",
		func(ctx context.Context) error {
			ticker := time.NewTicker(300 * time.Millisecond)
			defer ticker.
					Transactions: userTransactions[tx.UserID],
			Stop()
			
			for {
				select {
				case <-ticker.C:
					fmt.Println("Processor: Processing data...")
				case <-ctx.Done():
					fmt.Println("Processor: Stopping processing")
					return ctx.Err()
				}
			}
		},
		func() error {
			fmt.Println("Processor: Flushing pending work")
			time.Sleep(100 * time.Millisecond) // Simulate flushing
			return nil
		},
	)
	
	// Start the pipeline
	pipeline.Start()
	
	// Set up signal handling for graceful shutdown
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	
	// Wait for termination signal
	sig := <-sigCh
	fmt.Printf("Received signal: %v\n", sig)
	
	// Perform graceful shutdown
	if err := pipeline.Shutdown(5 * time.Second); err != nil {
		fmt.Printf("Shutdown error: %v\n", err)
		os.Exit(1)
	}
	
	fmt.Println("Pipeline shutdown complete")
}

This pattern ensures:

  1. Orderly Shutdown: Each stage has an opportunity to complete in-flight work
  2. Resource Cleanup: Resources are released in the correct order
  3. Timeout Handling: Shutdown doesn’t hang indefinitely
  4. Signal Handling: The pipeline responds to system signals

Deployment Considerations

When deploying pipeline architectures to production, several factors should be considered:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"runtime"
	"runtime/pprof"
	"sync"
	"time"
)

// Configuration represents pipeline configuration that can be adjusted
// based on deployment environment
type Configuration struct {
	WorkerCount      int
	BufferSizes      int
	BatchSize        int
	ShutdownTimeout  time.Duration
	EnableProfiling  bool
	ProfilingDir     string
	MetricsInterval  time.Duration
	HealthCheckPort  int
	ResourceLimits   ResourceLimits
}

// ResourceLimits defines resource constraints for the pipeline
type ResourceLimits struct {
	MaxMemoryMB      int
	MaxCPUPercentage int
	MaxOpenFiles     int
}

// LoadConfiguration loads configuration from environment or config file
func LoadConfiguration() Configuration {
	// In a real application, this would load from environment variables,
	// config files, or service discovery
	return Configuration{
		WorkerCount:     runtime.NumCPU(),
		BufferSizes:     1000,
		BatchSize:       100,
		ShutdownTimeout: 30 * time.Second,
		EnableProfiling: true,
		ProfilingDir:    "./profiles",
		MetricsInterval: 10 * time.Second,
		HealthCheckPort: 8080,
		ResourceLimits: ResourceLimits{
			MaxMemoryMB:      1024,
			MaxCPUPercentage: 80,
			MaxOpenFiles:     1000,
		},
	}
}

// setupProfiling configures runtime profiling
func setupProfiling(config Configuration) func() {
	if !config.EnableProfiling {
		return func() {}
	}
	
	// Create profiling directory if it doesn't exist
	if err := os.MkdirAll(config.ProfilingDir, 0755); err != nil {
		log.Printf("Failed to create profiling directory: %v", err)
		return func() {}
	}
	
	// Start CPU profiling
	cpuFile, err := os.Create(fmt.Sprintf("%s/cpu-%d.pprof", config.ProfilingDir, time.Now().Unix()))
	if err != nil {
		log.Printf("Failed to create CPU profile: %v", err)
	} else {
		pprof.StartCPUProfile(cpuFile)
	}
	
	// Return cleanup function
	return func() {
		if cpuFile != nil {
			pprof.StopCPUProfile()
			cpuFile.Close()
		}
		
		// Write heap profile
		heapFile, err := os.Create(fmt.Sprintf("%s/heap-%d.pprof", config.ProfilingDir, time.Now().Unix()))
		if err != nil {
			log.Printf("Failed to create heap profile: %v", err)
			return
		}
		defer heapFile.Close()
		
		if err := pprof.WriteHeapProfile(heapFile); err != nil {
			log.Printf("Failed to write heap profile: %v", err)
		}
	}
}

// startHealthCheck starts a simple health check server
func startHealthCheck(ctx context.Context, port int) {
	// In a real application, this would start an HTTP server
	// that reports pipeline health status
	log.Printf("Health check server started on port %d", port)
	
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			log.Println("Pipeline health: OK")
		case <-ctx.Done():
			log.Println("Stopping health check server")
			return
		}
	}
}

func main() {
	// Load configuration
	config := LoadConfiguration()
	
	// Set up profiling
	cleanup := setupProfiling(config)
	defer cleanup()
	
	// Create context with cancellation
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	
	// Start health check
	go startHealthCheck(ctx, config.HealthCheckPort)
	
	// Create and start pipeline (simplified)
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		log.Println("Pipeline started")
		
		// Simulate pipeline running
		select {
		case <-time.After(30 * time.Second):
			log.Println("Pipeline completed normally")
		case <-ctx.Done():
			log.Println("Pipeline cancelled")
		}
	}()
	
	// Wait for pipeline to complete
	wg.Wait()
	log.Println("Application shutting down")
}

Key deployment considerations:

  1. Configuration Management: Load configuration from environment variables or config files
  2. Resource Limits: Set appropriate memory and CPU limits
  3. Monitoring and Profiling: Enable metrics collection and profiling in production
  4. Health Checks: Implement health checks for container orchestration systems
  5. Logging: Configure structured logging for observability

Scaling Strategies

Pipeline architectures can be scaled in various ways:

package main

import (
	"fmt"
	"sync"
	"time"
)

// ScalableStage represents a pipeline stage that can be scaled horizontally
type ScalableStage struct {
	name       string
	process    func(int) int
	workerPool *WorkerPool
}

// WorkerPool manages a pool of workers for a stage
type WorkerPool struct {
	input     chan int
	output    chan int
	workers   int
	queueSize int
	wg        sync.WaitGroup
}

// NewWorkerPool creates a new worker pool
func NewWorkerPool(workers, queueSize int) *WorkerPool {
	return &WorkerPool{
		input:     make(chan int, queueSize),
		output:    make(chan int, queueSize),
		workers:   workers,
		queueSize: queueSize,
	}
}

// Start launches the worker pool
func (p *WorkerPool) Start(process func(int) int) {
	// Start workers
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go func(workerID int) {
			defer p.wg.Done()
			for item := range p.input {
				// Process item
				result := process(item)
				p.output <- result
			}
		}(i + 1)
	}
	
	// Close output when all workers are done
	go func() {
		p.wg.Wait()
		close(p.output)
	}()
}

// Submit adds an item to the worker pool
func (p *WorkerPool) Submit(item int) {
	p.input <- item
}

// Results returns the output channel
func (p *WorkerPool) Results() <-chan int {
	return p.output
}

// Stop gracefully shuts down the worker pool
func (p *WorkerPool) Stop() {
	close(p.input)
	p.wg.Wait()
}

// NewScalableStage creates a new scalable pipeline stage
func NewScalableStage(name string, workers, queueSize int, process func(int) int) *ScalableStage {
	return &ScalableStage{
		name:       name,
		process:    process,
		workerPool: NewWorkerPool(workers, queueSize),
	}
}

// Start launches the stage
func (s *ScalableStage) Start() {
	fmt.Printf("Starting stage %s with %d workers\n", s.name, s.workerPool.workers)
	s.workerPool.Start(s.process)
}

// Submit adds an item to the stage
func (s *ScalableStage) Submit(item int) {
	s.workerPool.Submit(item)
}

// Results returns the output channel
func (s *ScalableStage) Results() <-chan int {
	return s.workerPool.Results()
}

// Stop gracefully shuts down the stage
func (s *ScalableStage) Stop() {
	s.workerPool.Stop()
}

func main() {
	// Create scalable pipeline stages
	stage1 := NewScalableStage("Parser", 2, 100, func(i int) int {
		// Simulate parsing
		time.Sleep(10 * time.Millisecond)
		return i
	})
	
	stage2 := NewScalableStage("Transformer", 4, 100, func(i int) int {
		// Simulate transformation (CPU-intensive)
		time.Sleep(20 * time.Millisecond)
		return i * 2
	})
	
	stage3 := NewScalableStage("Validator", 1, 100, func(i int) int {
		// Simulate validation
		time.Sleep(5 * time.Millisecond)
		return i
	})
	
	// Start all stages
	stage1.Start()
	stage2.Start()
	stage3.Start()
	
	// Connect stages
	go func() {
		for result := range stage1.Results() {
			stage2.Submit(result)
		}
		stage2.Stop()
	}()
	
	go func() {
		for result := range stage2.Results() {
			stage3.Submit(result)
		}
		stage3.Stop()
	}()
	
	// Submit items to the first stage
	for i := 1; i <= 100; i++ {
		stage1.Submit(i)
	}
	stage1.Stop()
	
	// Collect results from the final stage
	var results []int
	for result := range stage3.Results() {
		results = append(results, result)
	}
	
	fmt.Printf("Processed %d items\n", len(results))
}

Key scaling strategies:

  1. Horizontal Scaling: Add more instances of a stage to handle increased load
  2. Vertical Scaling: Allocate more resources to each stage
  3. Dynamic Scaling: Adjust the number of workers based on load
  4. Partitioning: Divide work across multiple pipelines based on a partition key
  5. Load Balancing: Distribute work evenly across workers

These production implementation strategies ensure that pipeline architectures can be deployed reliably and scaled effectively in production environments.

The Art of Pipeline Design: Balancing Complexity and Performance

Throughout this article, we’ve explored a wide range of pipeline patterns and techniques for building robust, efficient data processing systems in Go. From basic linear pipelines to sophisticated stream processing architectures, we’ve seen how Go’s concurrency primitives provide powerful tools for expressing complex data flows.

The key to successful pipeline design lies in finding the right balance between complexity and performance. While advanced patterns can significantly improve throughput and resource utilization, they also introduce additional complexity that must be carefully managed. Here are some guiding principles to consider when designing pipeline architectures:

  1. Start Simple: Begin with the simplest pipeline structure that meets your requirements, and add complexity only when necessary.

  2. Measure Performance: Use benchmarks and profiling to identify bottlenecks before applying optimizations.

  3. Consider Failure Modes: Design pipelines with error handling and recovery mechanisms appropriate for your reliability requirements.

  4. Plan for Observability: Incorporate monitoring and metrics from the beginning to ensure visibility into pipeline behavior.

  5. Design for Evolution: Create modular pipeline components that can be reconfigured and extended as requirements change.

By applying these principles and the patterns we’ve explored, you can build pipeline architectures that efficiently process data streams while maintaining code clarity and operational reliability. Whether you’re building real-time analytics systems, log processors, or high-throughput microservices, the pipeline patterns in this article provide a powerful toolkit for solving complex data processing challenges in Go.

As you implement these patterns in your own applications, remember that the most elegant solutions often emerge from a deep understanding of both the problem domain and the tools at your disposal. Take the time to understand your data flow requirements, experiment with different pipeline architectures, and continuously refine your approach based on real-world performance and maintainability considerations.

The journey to mastering pipeline patterns is ongoing, but with the foundation provided in this article, you’re well-equipped to design and implement sophisticated data processing systems that leverage the full power of Go’s concurrency model.