Pipeline Fundamentals and Design Principles

At its core, a pipeline consists of a series of stages connected by channels, where each stage receives input from an upstream stage, performs some processing, and sends output to a downstream stage. This simple concept forms the foundation for powerful data processing architectures.

Basic Pipeline Structure

Let’s start with a simple pipeline implementation that processes integers:

package main

import (
	"fmt"
)

// generator creates a channel that emits the numbers from 1 to n
func generator(n int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for i := 1; i <= n; i++ {
			out <- i
		}
	}()
	return out
}

// square receives integers from a channel, squares them, and sends the results to a new channel
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * n
		}
	}()
	return out
}

// sum receives integers from a channel, sums them, and returns the total
func sum(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		total := 0
		for n := range in {
			total += n
		}
		out <- total
	}()
	return out
}

func main() {
	// Create a pipeline: generator -> square -> sum
	c1 := generator(5)
	c2 := square(c1)
	c3 := sum(c2)

	// Output the final result
	fmt.Println("Sum of squares:", <-c3)
}

This example demonstrates the core components of a pipeline:

  1. Stages: Each function (generator, square, sum) represents a stage in the pipeline.
  2. Channels: Each stage is connected to the next via channels, which serve as conduits for data.
  3. Goroutines: Each stage runs in its own goroutine, enabling concurrent processing.
  4. Unidirectional Channel Types: Stages accept input channels as <-chan T and return output channels as <-chan T, making the data flow direction explicit.

Pipeline Design Principles

When designing pipelines, several key principles should guide your implementation:

1. Single Responsibility Principle

Each stage in a pipeline should have a single, well-defined responsibility. This promotes code reusability, testability, and maintainability.

// Bad: Stage doing too much
func processData(in <-chan Record) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for record := range in {
            // Parse the record
            parsed := parseRecord(record)
            
            // Validate the record
            if !isValid(parsed) {
                continue
            }
            
            // Transform the record
            transformed := transform(parsed)
            
            // Enrich the record with additional data
            enriched := enrich(transformed)
            
            out <- enriched
        }
    }()
    return out
}

// Better: Separate stages with single responsibilities
func parse(in <-chan Record) <-chan ParsedRecord {
    // Implementation
}

func validate(in <-chan ParsedRecord) <-chan ParsedRecord {
    // Implementation
}

func transform(in <-chan ParsedRecord) <-chan TransformedRecord {
    // Implementation
}

func enrich(in <-chan TransformedRecord) <-chan Result {
    // Implementation
}

// Usage:
// pipeline := enrich(transform(validate(parse(records))))
2. Explicit Error Handling

Errors should be treated as first-class citizens in pipelines, with explicit mechanisms for propagation and handling.

type Result struct {
    Value interface{}
    Err   error
}

func processStage(in <-chan Result) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for result := range in {
            // Propagate errors from upstream stages
            if result.Err != nil {
                out <- result
                continue
            }
            
            // Process the value and handle any errors
            value, err := process(result.Value)
            out <- Result{Value: value, Err: err}
        }
    }()
    return out
}
3. Cancellation and Cleanup

Pipelines should support graceful termination and resource cleanup, typically using the context package.

func processWithCancellation(ctx context.Context, in <-chan Data) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for {
            select {
            case data, ok := <-in:
                if !ok {
                    return // Input channel closed
                }
                // Process data and send result
                out <- process(data)
            case <-ctx.Done():
                // Context cancelled, clean up and exit
                return
            }
        }
    }()
    return out
}
4. Backpressure Handling

Pipelines should handle backpressure—the situation where a slow consumer causes upstream stages to slow down or buffer data.

// Using buffered channels to handle temporary backpressure
func bufferingStage(in <-chan Data, bufferSize int) <-chan Result {
    // Buffer helps handle temporary spikes in throughput
    out := make(chan Result, bufferSize)
    go func() {
        defer close(out)
        for data := range in {
            result := process(data)
            out <- result
        }
    }()
    return out
}
5. Resource Management

Pipelines should carefully manage resources like memory, file handles, and network connections.

func resourceManagedStage(in <-chan Request) <-chan Response {
    out := make(chan Response)
    go func() {
        defer close(out)
        
        // Acquire resource
        resource, err := acquireResource()
        if err != nil {
            out <- Response{Err: err}
            return
        }
        defer resource.Close() // Ensure resource is released
        
        for req := range in {
            // Use resource to process request
            resp := resource.Process(req)
            out <- resp
        }
    }()
    return out
}

These design principles form the foundation for building robust, maintainable pipeline architectures. In the next sections, we’ll explore more advanced pipeline patterns and implementations.