Go Pipeline Patterns for Data Processing
Build efficient data processing pipelines in Go using channel patterns.
#Processing large amounts of data efficiently is a common challenge. Pipeline patterns break complex processing into stages, where each stage does one thing well and passes results to the next stage.
What Are Pipeline Patterns?
Think of an assembly line: each worker performs a specific task and passes the work to the next person. Pipeline patterns work similarly:
- Input Stage: Receives raw data
- Processing Stages: Transform, filter, or enrich data
- Output Stage: Sends results somewhere useful
Each stage runs concurrently, so while stage 1 processes item N, stage 2 can process item N-1, and stage 3 can process item N-2.
Why Use Pipelines?
Pipelines solve several problems:
- Throughput: Process multiple items simultaneously
- Modularity: Each stage has a single responsibility
- Scalability: Add more workers to bottleneck stages
- Testability: Test each stage independently
- Backpressure: Handle situations where one stage is slower
Common Use Cases
Pipeline patterns work well for:
- Log Processing: Parse, filter, and route log entries
- Image Processing: Resize, compress, and store images
- Data ETL: Extract, transform, and load data
- Stream Analytics: Process real-time event streams
- API Processing: Handle, validate, and respond to requests
Basic Pipeline Structure
func pipeline() {
// Stage 1: Generate data
input := make(chan int)
// Stage 2: Process data
processed := make(chan int)
// Stage 3: Output results
output := make(chan int)
// Connect the stages
go generator(input)
go processor(input, processed)
go consumer(processed)
}
This guide covers building robust pipelines that handle errors, backpressure, and graceful shutdown.
Pipeline Fundamentals and Design Principles
At its core, a pipeline consists of a series of stages connected by channels, where each stage receives input from an upstream stage, performs some processing, and sends output to a downstream stage. This simple concept forms the foundation for powerful data processing architectures.
Basic Pipeline Structure
Let’s start with a simple pipeline implementation that processes integers:
package main
import (
"fmt"
)
// generator creates a channel that emits the numbers from 1 to n
func generator(n int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= n; i++ {
out <- i
}
}()
return out
}
// square receives integers from a channel, squares them, and sends the results to a new channel
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// sum receives integers from a channel, sums them, and returns the total
func sum(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
total := 0
for n := range in {
total += n
}
out <- total
}()
return out
}
func main() {
// Create a pipeline: generator -> square -> sum
c1 := generator(5)
c2 := square(c1)
c3 := sum(c2)
// Output the final result
fmt.Println("Sum of squares:", <-c3)
}
This example demonstrates the core components of a pipeline:
- Stages: Each function (
generator
,square
,sum
) represents a stage in the pipeline. - Channels: Each stage is connected to the next via channels, which serve as conduits for data.
- Goroutines: Each stage runs in its own goroutine, enabling concurrent processing.
- Unidirectional Channel Types: Stages accept input channels as
<-chan T
and return output channels as<-chan T
, making the data flow direction explicit.
Pipeline Design Principles
When designing pipelines, several key principles should guide your implementation:
1. Single Responsibility Principle
Each stage in a pipeline should have a single, well-defined responsibility. This promotes code reusability, testability, and maintainability.
// Bad: Stage doing too much
func processData(in <-chan Record) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for record := range in {
// Parse the record
parsed := parseRecord(record)
// Validate the record
if !isValid(parsed) {
continue
}
// Transform the record
transformed := transform(parsed)
// Enrich the record with additional data
enriched := enrich(transformed)
out <- enriched
}
}()
return out
}
// Better: Separate stages with single responsibilities
func parse(in <-chan Record) <-chan ParsedRecord {
// Implementation
}
func validate(in <-chan ParsedRecord) <-chan ParsedRecord {
// Implementation
}
func transform(in <-chan ParsedRecord) <-chan TransformedRecord {
// Implementation
}
func enrich(in <-chan TransformedRecord) <-chan Result {
// Implementation
}
// Usage:
// pipeline := enrich(transform(validate(parse(records))))
2. Explicit Error Handling
Errors should be treated as first-class citizens in pipelines, with explicit mechanisms for propagation and handling.
type Result struct {
Value interface{}
Err error
}
func processStage(in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for result := range in {
// Propagate errors from upstream stages
if result.Err != nil {
out <- result
continue
}
// Process the value and handle any errors
value, err := process(result.Value)
out <- Result{Value: value, Err: err}
}
}()
return out
}
3. Cancellation and Cleanup
Pipelines should support graceful termination and resource cleanup, typically using the context package.
func processWithCancellation(ctx context.Context, in <-chan Data) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for {
select {
case data, ok := <-in:
if !ok {
return // Input channel closed
}
// Process data and send result
out <- process(data)
case <-ctx.Done():
// Context cancelled, clean up and exit
return
}
}
}()
return out
}
4. Backpressure Handling
Pipelines should handle backpressure—the situation where a slow consumer causes upstream stages to slow down or buffer data.
// Using buffered channels to handle temporary backpressure
func bufferingStage(in <-chan Data, bufferSize int) <-chan Result {
// Buffer helps handle temporary spikes in throughput
out := make(chan Result, bufferSize)
go func() {
defer close(out)
for data := range in {
result := process(data)
out <- result
}
}()
return out
}
5. Resource Management
Pipelines should carefully manage resources like memory, file handles, and network connections.
func resourceManagedStage(in <-chan Request) <-chan Response {
out := make(chan Response)
go func() {
defer close(out)
// Acquire resource
resource, err := acquireResource()
if err != nil {
out <- Response{Err: err}
return
}
defer resource.Close() // Ensure resource is released
for req := range in {
// Use resource to process request
resp := resource.Process(req)
out <- resp
}
}()
return out
}
These design principles form the foundation for building robust, maintainable pipeline architectures. In the next sections, we’ll explore more advanced pipeline patterns and implementations.
Advanced Pipeline Architectures
Building on the fundamentals, we can now explore more sophisticated pipeline architectures that address complex data processing requirements.
Fan-Out/Fan-In Pattern
The fan-out/fan-in pattern distributes work across multiple goroutines and then consolidates the results, enabling parallel processing of independent data items.
package main
import (
"fmt"
"sync"
"time"
)
// generator produces integers from 1 to n
func generator(n int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= n; i++ {
out <- i
}
}()
return out
}
// processor is a stage that simulates CPU-intensive work
func processor(id int, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
// Simulate CPU-intensive work
time.Sleep(100 * time.Millisecond)
result := n * n
fmt.Printf("Processor %d: %d² = %d\n", id, n, result)
out <- result
}
}()
return out
}
// fanOut creates multiple processor stages that read from the same input channel
func fanOut(in <-chan int, numProcessors int) []<-chan int {
outputs := make([]<-chan int, numProcessors)
for i := 0; i < numProcessors; i++ {
outputs[i] = processor(i+1, in)
}
return outputs
}
// fanIn combines multiple input channels into a single output channel
func fanIn(inputs []<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
// Start a goroutine for each input channel
for _, ch := range inputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(ch)
}
// Start a goroutine to close the output channel when all input channels are done
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
startTime := time.Now()
// Create the pipeline
input := generator(10)
processors := fanOut(input, 4)
output := fanIn(processors)
// Collect and sum the results
sum := 0
for n := range output {
sum += n
}
fmt.Printf("Sum: %d\n", sum)
fmt.Printf("Time taken: %v\n", time.Since(startTime))
}
This pattern is particularly useful for CPU-bound tasks where parallel processing can significantly improve throughput. The key components are:
- Fan-Out: Distribute work from a single source to multiple workers
- Parallel Processing: Each worker processes items independently
- Fan-In: Consolidate results from multiple workers into a single stream
Dynamic Pipeline Construction
In real-world applications, pipeline topologies may need to be constructed dynamically based on configuration or runtime conditions. Here’s a pattern for building pipelines dynamically:
package main
import (
"fmt"
"strings"
)
// PipelineStage represents a single stage in a pipeline
type PipelineStage func(<-chan string) <-chan string
// Pipeline represents a series of stages
type Pipeline struct {
stages []PipelineStage
}
// NewPipeline creates a new empty pipeline
func NewPipeline() *Pipeline {
return &Pipeline{stages: []PipelineStage{}}
}
// Add appends a stage to the pipeline
func (p *Pipeline) Add(stage PipelineStage) *Pipeline {
p.stages = append(p.stages, stage)
return p
}
// Run executes the pipeline with the given input
func (p *Pipeline) Run(input <-chan string) <-chan string {
current := input
for _, stage := range p.stages {
current = stage(current)
}
return current
}
// Example pipeline stages
func toUpper(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for s := range in {
out <- strings.ToUpper(s)
}
}()
return out
}
func addPrefix(prefix string) PipelineStage {
return func(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for s := range in {
out <- prefix + s
}
}()
return out
}
}
func addSuffix(suffix string) PipelineStage {
return func(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for s := range in {
out <- s + suffix
}
}()
return out
}
}
func filter(predicate func(string) bool) PipelineStage {
return func(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for s := range in {
if predicate(s) {
out <- s
}
}
}()
return out
}
}
func main() {
// Create input channel
input := make(chan string)
// Build pipeline dynamically
pipeline := NewPipeline()
pipeline.Add(toUpper)
pipeline.Add(addPrefix(">> "))
pipeline.Add(filter(func(s string) bool {
return len(s) > 5
}))
pipeline.Add(addSuffix(" <<"))
// Run the pipeline
output := pipeline.Run(input)
// Feed input in a separate goroutine
go func() {
defer close(input)
words := []string{"hello", "world", "pipeline", "go", "channels"}
for _, word := range words {
input <- word
}
}()
// Collect results
for result := range output {
fmt.Println(result)
}
}
This pattern provides several benefits:
- Composability: Pipeline stages can be combined in different ways
- Reusability: Stages can be reused across different pipelines
- Configurability: Pipelines can be constructed based on configuration
- Testability: Individual stages can be tested in isolation
Bidirectional Pipelines
Sometimes pipelines need to support bidirectional communication, where downstream stages can send feedback or control signals to upstream stages:
package main
import (
"fmt"
"time"
)
// Request represents a work item with a response channel
type Request struct {
Data int
Response chan<- int
}
// Worker processes requests and sends responses back
func worker(requests <-chan Request) {
for req := range requests {
// Simulate processing
time.Sleep(100 * time.Millisecond)
result := req.Data * req.Data
// Send response back through the response channel
req.Response <- result
close(req.Response) // Signal that no more responses will be sent
}
}
func main() {
// Create request channel
requests := make(chan Request)
// Start worker
go worker(requests)
// Send requests and receive responses
for i := 1; i <= 5; i++ {
// Create response channel for this request
respCh := make(chan int)
// Send request with response channel
requests <- Request{
Data: i,
Response: respCh,
}
// Wait for response
resp := <-respCh
fmt.Printf("Request: %d, Response: %d\n", i, resp)
}
close(requests)
}
This pattern enables:
- Request-Response Communication: Each request can receive a dedicated response
- Feedback Loops: Downstream stages can provide feedback to upstream stages
- Dynamic Flow Control: Processing can adapt based on feedback
Multi-Stage Pipeline with Context Cancellation
For long-running pipelines, proper cancellation support is essential. Here’s a pattern that integrates context cancellation across multiple stages:
package main
import (
"context"
"fmt"
"time"
)
// generator produces integers until cancelled
func generator(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
n := 1
for {
select {
case <-ctx.Done():
fmt.Println("Generator cancelled")
return
case out <- n:
n++
time.Sleep(100 * time.Millisecond)
}
}
}()
return out
}
// processor squares numbers and handles cancellation
func processor(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
fmt.Println("Processor cancelled")
return
case n, ok := <-in:
if !ok {
return
}
select {
case <-ctx.Done():
fmt.Println("Processor cancelled")
return
case out <- n * n:
// Value sent successfully
}
}
}
}()
return out
}
// filter only passes even numbers and handles cancellation
func filter(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
fmt.Println("Filter cancelled")
return
case n, ok := <-in:
if !ok {
return
}
if n%2 == 0 {
select {
case <-ctx.Done():
fmt.Println("Filter cancelled")
return
case out <- n:
// Value sent successfully
}
}
}
}
}()
return out
}
func main() {
// Create a context with cancellation
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// Build and run the pipeline
gen := generator(ctx)
proc := processor(ctx, gen)
filt := filter(ctx, proc)
// Consume the results
for n := range filt {
fmt.Println("Result:", n)
}
fmt.Println("Pipeline completed")
}
This pattern ensures:
- Graceful Termination: All stages can clean up resources when cancelled
- Propagation of Cancellation: Cancellation signals propagate through the entire pipeline
- Timeout Support: Pipelines can automatically terminate after a specified duration
- Resource Management: Resources are properly released when the pipeline terminates
These advanced pipeline architectures provide powerful tools for building sophisticated data processing systems. In the next section, we’ll explore specific patterns for stream processing.
Stream Processing Patterns
Stream processing involves continuously processing data as it arrives, rather than processing data in batches. Go’s concurrency model is particularly well-suited for implementing stream processing patterns.
Linear Stream Processing
The simplest form of stream processing is a linear pipeline where data flows through a sequence of transformations:
package main
import (
"fmt"
"strings"
"time"
)
// LogEntry represents a log entry in a stream
type LogEntry struct {
Timestamp time.Time
Level string
Message string
}
// Source generates a stream of log entries
func source() <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
levels := []string{"INFO", "WARNING", "ERROR", "DEBUG"}
messages := []string{
"User logged in",
"Failed to connect to database",
"Processing request",
"Cache miss",
"Request completed",
}
for i := 0; i < 20; i++ {
entry := LogEntry{
Timestamp: time.Now().Add(-time.Duration(i) * time.Minute),
Level: levels[i%len(levels)],
Message: messages[i%len(messages)],
}
out <- entry
time.Sleep(100 * time.Millisecond)
}
}()
return out
}
// Filter keeps only entries with specified log levels
func filter(in <-chan LogEntry, allowedLevels ...string) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
levelSet := make(map[string]bool)
for _, level := range allowedLevels {
levelSet[level] = true
}
for entry := range in {
if levelSet[entry.Level] {
out <- entry
}
}
}()
return out
}
// Enrich adds additional information to log entries
func enrich(in <-chan LogEntry) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for entry := range in {
// Add additional context to error messages
if entry.Level == "ERROR" {
entry.Message = fmt.Sprintf("%s (Contact: [email protected])", entry.Message)
}
out <- entry
}
}()
return out
}
// Format converts log entries to formatted strings
func format(in <-chan LogEntry) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for entry := range in {
formatted := fmt.Sprintf(
"[%s] %s: %s",
entry.Timestamp.Format("15:04:05"),
entry.Level,
entry.Message,
)
out <- formatted
}
}()
return out
}
// Sink consumes the final stream and performs an action
func sink(in <-chan string) {
for s := range in {
fmt.Println(s)
}
}
func main() {
// Build the stream processing pipeline
logs := source()
filtered := filter(logs, "ERROR", "WARNING")
enriched := enrich(filtered)
formatted := format(enriched)
// Consume the stream
sink(formatted)
}
This pattern is effective for straightforward transformations where each item is processed independently.
Windowed Stream Processing
Many stream processing applications need to analyze data within time or count-based windows:
package main
import (
"fmt"
"time"
)
// Event represents a data point in a stream
type Event struct {
Timestamp time.Time
Value float64
}
// Window represents a collection of events within a time window
type Window struct {
StartTime time.Time
EndTime time.Time
Events []Event
}
// source generates a stream of events
func source() <-chan Event {
out := make(chan Event)
go func() {
defer close(out)
now := time.Now()
for i := 0; i < 100; i++ {
event := Event{
Timestamp: now.Add(time.Duration(i*100) * time.Millisecond),
Value: float64(i % 10),
}
out <- event
time.Sleep(50 * time.Millisecond)
}
}()
return out
}
// tumblingWindow groups events into non-overlapping time windows of fixed duration
func tumblingWindow(in <-chan Event, windowDuration time.Duration) <-chan Window {
out := make(chan Window)
go func() {
defer close(out)
var currentWindow Window
var windowStarted bool
for event := range in {
// If this is the first event or the event belongs to a new window
if !windowStarted || event.Timestamp.After(currentWindow.EndTime) {
// If we have a window in progress, emit it
if windowStarted {
out <- currentWindow
}
// Start a new window
windowStart := event.Timestamp
windowEnd := windowStart.Add(windowDuration)
currentWindow = Window{
StartTime: windowStart,
EndTime: windowEnd,
Events: []Event{event},
}
windowStarted = true
} else {
// Add event to current window
currentWindow.Events = append(currentWindow.Events, event)
}
}
// Emit the last window if it has events
if windowStarted && len(currentWindow.Events) > 0 {
out <- currentWindow
}
}()
return out
}
// slidingWindow groups events into overlapping time windows
func slidingWindow(in <-chan Event, windowDuration, slideInterval time.Duration) <-chan Window {
out := make(chan Window)
go func() {
defer close(out)
// Buffer to hold events for windowing
var buffer []Event
// Track the oldest timestamp we need to keep
var oldestNeeded time.Time
for event := range in {
// Add new event to buffer
buffer = append(buffer, event)
// Update oldest timestamp needed
if oldestNeeded.IsZero() {
oldestNeeded = event.Timestamp
}
// Check if we should emit a window
windowEnd := event.Timestamp
windowStart := windowEnd.Add(-windowDuration)
// Emit window if slide interval has passed since last emission
if oldestNeeded.IsZero() || event.Timestamp.Sub(oldestNeeded) >= slideInterval {
// Create window with events in the time range
window := Window{
StartTime: windowStart,
EndTime: windowEnd,
Events: []Event{},
}
for _, e := range buffer {
if !e.Timestamp.Before(windowStart) && !e.Timestamp.After(windowEnd) {
window.Events = append(window.Events, e)
}
}
// Emit window if it has events
if len(window.Events) > 0 {
out <- window
}
// Update oldest needed timestamp
oldestNeeded = event.Timestamp
// Remove events that are no longer needed
newBuffer := []Event{}
for _, e := range buffer {
if !e.Timestamp.Before(windowStart) {
newBuffer = append(newBuffer, e)
}
}
buffer = newBuffer
}
}
}()
return out
}
// aggregate computes statistics for each window
func aggregate(in <-chan Window) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for window := range in {
// Calculate average
sum := 0.0
for _, event := range window.Events {
sum += event.Value
}
avg := sum / float64(len(window.Events))
result := fmt.Sprintf(
"Window [%s to %s]: %d events, avg=%.2f",
window.StartTime.Format("15:04:05.000"),
window.EndTime.Format("15:04:05.000"),
len(window.Events),
avg,
)
out <- result
}
}()
return out
}
func main() {
events := source()
// Create two different windowing strategies
tumbling := tumblingWindow(events, 1*time.Second)
tumblingResults := aggregate(tumbling)
// Consume and print results
for result := range tumblingResults {
fmt.Println(result)
}
}
Windowed processing is essential for:
- Time-Based Analysis: Computing statistics over time periods
- Trend Detection: Identifying patterns in streaming data
- Anomaly Detection: Detecting unusual behavior in real-time
Stateful Stream Processing
Some stream processing applications need to maintain state across multiple events:
package main
import (
"fmt"
"time"
)
// Transaction represents a financial transaction
type Transaction struct {
ID string
UserID string
Amount float64
Timestamp time.Time
}
// Alert represents a fraud detection alert
type Alert struct {
UserID string
Reason string
Transactions []Transaction
Timestamp time.Time
}
// source generates a stream of transactions
func source() <-chan Transaction {
out := make(chan Transaction)
go func() {
defer close(out)
users := []string{"user1", "user2", "user3", "user4"}
now := time.Now()
// Generate normal transactions
for i := 0; i < 20; i++ {
userID := users[i%len(users)]
amount := 10.0 + float64(i%5)*20.0
// For user3, generate suspicious transactions
if userID == "user3" && i > 10 {
amount = 500.0 + float64(i%3)*200.0
}
tx := Transaction{
ID: fmt.Sprintf("tx-%d", i),
UserID: userID,
Amount: amount,
Timestamp: now.Add(time.Duration(i*30) * time.Second),
}
out <- tx
time.Sleep(100 * time.Millisecond)
}
}()
return out
}
// fraudDetector maintains state to detect suspicious patterns
func fraudDetector(in <-chan Transaction) <-chan Alert {
out := make(chan Alert)
go func() {
defer close(out)
// State: track recent transactions by user
userTransactions := make(map[string][]Transaction)
// State: track total amount in last hour by user
userAmounts := make(map[string]float64)
for tx := range in {
// Add transaction to user history
userTransactions[tx.UserID] = append(userTransactions[tx.UserID], tx)
// Update total amount (simplified - not actually checking time window)
userAmounts[tx.UserID] += tx.Amount
// Check for suspicious activity
if userAmounts[tx.UserID] > 1000.0 {
// Create alert
alert := Alert{
UserID: tx.UserID,
Reason: "High transaction volume",
Timestamp: time.Now(),
}
out <- alert
// Reset state for this user
userAmounts[tx.UserID] = 0
}
}
}()
return out
}
// alertHandler processes alerts
func alertHandler(in <-chan Alert) {
for alert := range in {
fmt.Printf("ALERT for %s: %s\n", alert.UserID, alert.Reason)
fmt.Printf(" Total transactions: %d\n", len(alert.Transactions))
fmt.Printf(" Latest transaction: %s for $%.2f\n",
alert.Transactions[len(alert.Transactions)-1].ID,
alert.Transactions[len(alert.Transactions)-1].Amount)
}
}
func main() {
// Build the stateful stream processing pipeline
transactions := source()
alerts := fraudDetector(transactions)
// Process alerts
alertHandler(alerts)
}
This pattern enables:
- State Maintenance: Keeping track of information across multiple events
- Pattern Recognition: Identifying complex patterns that span multiple events
- Aggregation: Computing running statistics over a stream of events
These stream processing patterns provide powerful tools for building real-time data processing systems. In the next section, we’ll explore error handling and recovery strategies for pipelines.
Error Handling and Recovery in Pipelines
Robust error handling is critical for production-grade pipeline systems. Let’s explore patterns for handling errors in pipeline architectures.
Error Propagation Patterns
There are several approaches to propagating errors through a pipeline:
package main
import (
"errors"
"fmt"
"math/rand"
"time"
)
// Result wraps a value and an error
type Result struct {
Value interface{}
Err error
}
// source generates integers with occasional errors
func source() <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for i := 0; i < 10; i++ {
// Randomly generate errors
if rand.Float64() < 0.3 {
out <- Result{Err: fmt.Errorf("error generating value %d", i)}
continue
}
out <- Result{Value: i, Err: nil}
time.Sleep(100 * time.Millisecond)
}
}()
return out
}
// transform applies a transformation, propagating any errors
func transform(in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for res := range in {
// If we received an error, propagate it
if res.Err != nil {
out <- res
continue
}
// Try to process the value
val, ok := res.Value.(int)
if !ok {
out <- Result{Err: errors.New("expected integer value")}
continue
}
// Randomly fail during processing
if rand.Float64() < 0.2 {
out <- Result{Err: fmt.Errorf("error processing value %d", val)}
continue
}
// Success case
out <- Result{Value: val * val, Err: nil}
}
}()
return out
}
// sink consumes results and handles errors
func sink(in <-chan Result) {
for res := range in {
if res.Err != nil {
fmt.Printf("Error: %v\n", res.Err)
} else {
fmt.Printf("Result: %v\n", res.Value)
}
}
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Build and run the pipeline
results := transform(source())
sink(results)
}
This pattern demonstrates:
- Error Wrapping: Each value is wrapped with potential error information
- Error Propagation: Errors from upstream stages are passed downstream
- Error Generation: Stages can generate their own errors
- Error Handling: The final stage decides how to handle errors
Error Recovery Strategies
For long-running pipelines, recovering from errors rather than failing is often desirable:
package main
import (
"context"
"errors"
"fmt"
"math/rand"
"time"
)
// retryableStage attempts to process items with retries
func retryableStage(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
// Try to process with retries
result, err := processWithRetry(n, 3, 100*time.Millisecond)
if err != nil {
fmt.Printf("Failed to process %d after retries: %v\n", n, err)
continue // Skip this item
}
out <- result
}
}()
return out
}
// processWithRetry attempts to process a value with retries
func processWithRetry(n int, maxRetries int, delay time.Duration) (int, error) {
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
// Attempt to process
result, err := process(n)
if err == nil {
return result, nil // Success
}
lastErr = err
fmt.Printf("Attempt %d failed for %d: %v\n", attempt+1, n, err)
// Don't sleep after the last attempt
if attempt < maxRetries {
// Exponential backoff
sleepTime := delay * time.Duration(1<<attempt)
time.Sleep(sleepTime)
}
}
return 0, fmt.Errorf("all retries failed: %w", lastErr)
}
// process simulates a flaky operation
func process(n int) (int, error) {
// Simulate random failures
if rand.Float64() < 0.6 {
return 0, errors.New("random processing error")
}
return n * n, nil
}
// circuitBreakerStage implements the circuit breaker pattern
func circuitBreakerStage(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
// Circuit breaker state
var failures int
var lastFailure time.Time
const maxFailures = 3
const resetTimeout = 5 * time.Second
circuitOpen := false
for n := range in {
// Check if circuit is open
if circuitOpen {
// Check if we should try to reset
if time.Since(lastFailure) > resetTimeout {
fmt.Println("Circuit half-open, attempting reset")
circuitOpen = false
failures = 0
} else {
fmt.Printf("Circuit open, skipping item %d\n", n)
continue
}
}
// Try to process
result, err := process(n)
if err != nil {
failures++
lastFailure = time.Now()
fmt.Printf("Processing failed for %d: %v (failures: %d)\n", n, err, failures)
// Check if we should open the circuit
if failures >= maxFailures {
fmt.Println("Circuit breaker tripped, opening circuit")
circuitOpen = true
}
continue
}
// Success, reset failure count
failures = 0
out <- result
}
}()
return out
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create input channel
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 20; i++ {
input <- i
time.Sleep(200 * time.Millisecond)
}
}()
// Create pipeline with retry stage
retryOutput := retryableStage(input)
// Consume results
for result := range retryOutput {
fmt.Printf("Got result: %d\n", result)
}
}
This example demonstrates two important error recovery patterns:
- Retry Pattern: Automatically retry failed operations with exponential backoff
- Circuit Breaker Pattern: Prevent cascading failures by temporarily disabling operations after repeated failures
Partial Failure Handling
In distributed pipelines, handling partial failures is essential:
package main
import (
"context"
"fmt"
"sync"
"time"
)
// BatchResult represents the result of processing a batch of items
type BatchResult struct {
Successful []int
Failed map[int]error
}
// batchProcessor processes items in batches with partial failure handling
func batchProcessor(ctx context.Context, in <-chan int, batchSize int) <-chan BatchResult {
out := make(chan BatchResult)
go func() {
defer close(out)
batch := make([]int, 0, batchSize)
// Helper function to process and send the current batch
processBatch := func() {
if len(batch) == 0 {
return
}
result := BatchResult{
Successful: []int{},
Failed: make(map[int]error),
}
// Process each item in the batch
for _, item := range batch {
// Simulate processing with potential failures
if item%3 == 0 {
result.Failed[item] = fmt.Errorf("failed to process item %d", item)
} else {
result.Successful = append(result.Successful, item)
}
}
// Send batch result
select {
case out <- result:
// Result sent successfully
case <-ctx.Done():
return
}
// Clear the batch
batch = make([]int, 0, batchSize)
}
for {
select {
case item, ok := <-in:
if !ok {
// Input channel closed, process remaining items
processBatch()
return
}
// Add item to batch
batch = append(batch, item)
// Process batch if it's full
if len(batch) >= batchSize {
processBatch()
}
case <-ctx.Done():
// Context cancelled
return
}
}
}()
return out
}
func main() {
// Create context with cancellation
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create input channel
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 20; i++ {
input <- i
time.Sleep(100 * time.Millisecond)
}
}()
// Process in batches
results := batchProcessor(ctx, input, 5)
// Handle batch results
for result := range results {
fmt.Printf("Batch processed: %d successful, %d failed\n",
len(result.Successful), len(result.Failed))
if len(result.Successful) > 0 {
fmt.Printf(" Successful items: %v\n", result.Successful)
}
if len(result.Failed) > 0 {
fmt.Println(" Failed items:")
for item, err := range result.Failed {
fmt.Printf(" Item %d: %v\n", item, err)
}
}
}
}
This pattern enables:
- Batch Processing: Process items in groups for efficiency
- Partial Success: Continue processing despite some failures
- Detailed Error Reporting: Track exactly which items failed and why
- Graceful Degradation: The pipeline continues to make progress even with errors
These error handling patterns are essential for building resilient pipeline systems that can recover from failures and continue processing data.
Performance Optimization and Monitoring
To build high-performance pipeline systems, careful optimization and monitoring are essential.
Buffer Sizing and Throughput Optimization
The size of channel buffers can significantly impact pipeline performance:
package main
import (
"fmt"
"sync"
"time"
)
// benchmarkPipeline measures the throughput of a pipeline with different buffer sizes
func benchmarkPipeline(bufferSize int, numItems int) time.Duration {
start := time.Now()
// Create stages with specified buffer size
stage1 := make(chan int, bufferSize)
stage2 := make(chan int, bufferSize)
stage3 := make(chan int, bufferSize)
// Start producer
go func() {
defer close(stage1)
for i := 0; i < numItems; i++ {
stage1 <- i
}
}()
// Start stage 2 processor
go func() {
defer close(stage2)
for item := range stage1 {
// Simulate processing
time.Sleep(1 * time.Millisecond)
stage2 <- item * 2
}
}()
// Start stage 3 processor
go func() {
defer close(stage3)
for item := range stage2 {
// Simulate processing
time.Sleep(2 * time.Millisecond)
stage3 <- item + 1
}
}()
// Consume results
var count int
for range stage3 {
count++
}
return time.Since(start)
}
func main() {
numItems := 1000
bufferSizes := []int{1, 10, 100, 1000}
fmt.Printf("Processing %d items through a 3-stage pipeline\n", numItems)
fmt.Println("Buffer Size | Duration | Items/sec")
fmt.Println("-----------|----------|----------")
for _, size := range bufferSizes {
duration := benchmarkPipeline(size, numItems)
itemsPerSec := float64(numItems) / duration.Seconds()
fmt.Printf("%10d | %8v | %9.2f\n", size, duration.Round(time.Millisecond), itemsPerSec)
}
}
Key optimization principles:
- Buffer Sizing: Larger buffers can improve throughput by reducing blocking, but consume more memory
- Stage Balancing: Match processing speeds across stages to minimize bottlenecks
- Batch Processing: Process items in batches to amortize overhead costs
Pipeline Monitoring and Metrics
Monitoring pipeline performance is essential for identifying bottlenecks and ensuring reliability:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// PipelineMetrics tracks performance metrics for a pipeline stage
type PipelineMetrics struct {
StageName string
ItemsProcessed atomic.Int64
ErrorCount atomic.Int64
ProcessingTime atomic.Int64 // Nanoseconds
LastProcessed atomic.Int64 // Unix timestamp
QueueDepth atomic.Int64
MaxQueueDepth atomic.Int64
TotalWaitTime atomic.Int64 // Nanoseconds
ProcessingCount atomic.Int64 // Number of items currently being processed
}
// NewPipelineMetrics creates a new metrics tracker for a stage
func NewPipelineMetrics(name string) *PipelineMetrics {
return &PipelineMetrics{
StageName: name,
}
}
// RecordProcessingTime records the time taken to process an item
func (m *PipelineMetrics) RecordProcessingTime(duration time.Duration) {
m.ProcessingTime.Add(int64(duration))
m.ItemsProcessed.Add(1)
m.LastProcessed.Store(time.Now().Unix())
}
// RecordError increments the error count
func (m *PipelineMetrics) RecordError() {
m.ErrorCount.Add(1)
}
// RecordQueueDepth updates the queue depth metrics
func (m *PipelineMetrics) RecordQueueDepth(depth int) {
m.QueueDepth.Store(int64(depth))
// Update max queue depth if needed
for {
current := m.MaxQueueDepth.Load()
if int64(depth) <= current {
break
}
if m.MaxQueueDepth.CompareAndSwap(current, int64(depth)) {
break
}
}
}
// RecordWaitTime records the time an item spent waiting in the queue
func (m *PipelineMetrics) RecordWaitTime(duration time.Duration) {
m.TotalWaitTime.Add(int64(duration))
}
// BeginProcessing increments the count of items being processed
func (m *PipelineMetrics) BeginProcessing() {
m.ProcessingCount.Add(1)
}
// EndProcessing decrements the count of items being processed
func (m *PipelineMetrics) EndProcessing() {
m.ProcessingCount.Add(-1)
}
// GetStats returns a snapshot of the current metrics
func (m *PipelineMetrics) GetStats() map[string]interface{} {
itemsProcessed := m.ItemsProcessed.Load()
processingTime := m.ProcessingTime.Load()
var avgProcessingTime float64
if itemsProcessed > 0 {
avgProcessingTime = float64(processingTime) / float64(itemsProcessed) / float64(time.Millisecond)
}
var avgWaitTime float64
if itemsProcessed > 0 {
avgWaitTime = float64(m.TotalWaitTime.Load()) / float64(itemsProcessed) / float64(time.Millisecond)
}
return map[string]interface{}{
"stage_name": m.StageName,
"items_processed": itemsProcessed,
"errors": m.ErrorCount.Load(),
"avg_processing_time": avgProcessingTime,
"last_processed": time.Unix(m.LastProcessed.Load(), 0),
"current_queue_depth": m.QueueDepth.Load(),
"max_queue_depth": m.MaxQueueDepth.Load(),
"avg_wait_time": avgWaitTime,
"active_processors": m.ProcessingCount.Load(),
}
}
// instrumentedStage wraps a processing function with metrics
func instrumentedStage(name string, in <-chan int, processFunc func(int) (int, error)) (<-chan int, *PipelineMetrics) {
metrics := NewPipelineMetrics(name)
out := make(chan int, cap(in)) // Match buffer size
go func() {
defer close(out)
for item := range in {
// Record queue depth
metrics.RecordQueueDepth(len(in))
// Record wait time (simplified - in a real system you'd track per-item timestamps)
waitStart := time.Now()
// Begin processing
metrics.BeginProcessing()
processStart := time.Now()
metrics.RecordWaitTime(processStart.Sub(waitStart))
// Process the item
result, err := processFunc(item)
// Record metrics
metrics.RecordProcessingTime(time.Since(processStart))
metrics.EndProcessing()
if err != nil {
metrics.RecordError()
fmt.Printf("Error in %s: %v\n", name, err)
continue
}
// Send result to output
out <- result
}
}()
return out, metrics
}
// metricsReporter periodically prints metrics
func metricsReporter(metrics []*PipelineMetrics, interval time.Duration, done <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("\n--- Pipeline Metrics ---")
for _, m := range metrics {
stats := m.GetStats()
fmt.Printf("Stage: %s\n", stats["stage_name"])
fmt.Printf(" Processed: %d items (%.2f items/sec)\n",
stats["items_processed"],
float64(stats["items_processed"].(int64))/interval.Seconds())
fmt.Printf(" Errors: %d\n", stats["errors"])
fmt.Printf(" Avg Processing Time: %.2f ms\n", stats["avg_processing_time"])
fmt.Printf(" Queue Depth: %d (max: %d)\n",
stats["current_queue_depth"], stats["max_queue_depth"])
fmt.Printf(" Avg Wait Time: %.2f ms\n", stats["avg_wait_time"])
fmt.Printf(" Active Processors: %d\n", stats["active_processors"])
}
case <-done:
return
}
}
}
func main() {
// Create input channel
input := make(chan int, 100)
// Create instrumented pipeline stages
stage1, metrics1 := instrumentedStage("Processor", input, func(i int) (int, error) {
time.Sleep(10 * time.Millisecond)
return i * 2, nil
})
stage2, metrics2 := instrumentedStage("Transformer", stage1, func(i int) (int, error) {
time.Sleep(15 * time.Millisecond)
return i + 1, nil
})
stage3, metrics3 := instrumentedStage("Validator", stage2, func(i int) (int, error) {
time.Sleep(5 * time.Millisecond)
return i, nil
})
// Start metrics reporter
done := make(chan struct{})
go metricsReporter([]*PipelineMetrics{metrics1, metrics2, metrics3}, 1*time.Second, done)
// Start consumer
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
count := 0
for range stage3 {
count++
}
fmt.Printf("\nProcessed %d items\n", count)
}()
// Feed input
for i := 0; i < 1000; i++ {
input <- i
time.Sleep(5 * time.Millisecond)
}
close(input)
// Wait for pipeline to complete
wg.Wait()
close(done)
}
This monitoring approach provides:
- Real-time Metrics: Track throughput, latency, and error rates in real time
- Bottleneck Identification: Identify stages that are processing slowly or have deep queues
- Resource Utilization: Monitor memory usage through queue depths
- Error Tracking: Track error rates by stage
Memory Optimization Techniques
Memory efficiency is critical for high-throughput pipelines:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// Item represents a data item to be processed
type Item struct {
ID int
Data []byte
Extra map[string]interface{}
}
// ItemPool implements an object pool for Items
type ItemPool struct {
pool sync.Pool
}
// NewItemPool creates a new pool for Items
func NewItemPool() *ItemPool {
return &ItemPool{
pool: sync.Pool{
New: func() interface{} {
return &Item{
Data: make([]byte, 1024), // Pre-allocate buffer
Extra: make(map[string]interface{}),
}
},
},
}
}
// Get retrieves an Item from the pool
func (p *ItemPool) Get() *Item {
item := p.pool.Get().(*Item)
// Reset the item (clear the map but keep the allocated memory)
for k := range item.Extra {
delete(item.Extra, k)
}
return item
}
// Put returns an Item to the pool
func (p *ItemPool) Put(item *Item) {
p.pool.Put(item)
}
// memoryEfficientStage processes items using an object pool
func memoryEfficientStage(in <-chan *Item, pool *ItemPool) <-chan *Item {
out := make(chan *Item, cap(in))
go func() {
defer close(out)
for item := range in {
// Process the item
item.Extra["processed"] = true
item.Extra["timestamp"] = time.Now()
// Send to output
out <- item
}
}()
return out
}
// printMemStats prints memory statistics
func printMemStats() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Alloc: %v MiB, TotalAlloc: %v MiB, Sys: %v MiB, NumGC: %v\n",
m.Alloc/1024/1024,
m.TotalAlloc/1024/1024,
m.Sys/1024/1024,
m.NumGC)
}
func main() {
// Create item pool
pool := NewItemPool()
// Create channels
input := make(chan *Item, 100)
// Create pipeline
processed := memoryEfficientStage(input, pool)
// Start consumer that returns items to the pool
go func() {
for item := range processed {
// Use the item...
// Return to pool when done
pool.Put(item)
}
}()
// Print initial memory stats
fmt.Println("Initial memory stats:")
printMemStats()
// Process items
for i := 0; i < 100000; i++ {
// Get item from pool
item := pool.Get()
item.ID = i
// Send to pipeline
input <- item
if i%10000 == 0 && i > 0 {
fmt.Printf("\nAfter %d items:\n", i)
printMemStats()
}
}
close(input)
// Final memory stats
fmt.Println("\nFinal memory stats:")
printMemStats()
}
Key memory optimization techniques:
- Object Pooling: Reuse objects to reduce allocation and GC pressure
- Buffer Reuse: Preallocate and reuse buffers for I/O operations
- Batch Processing: Process items in batches to reduce per-item overhead
- Memory Profiling: Regularly profile memory usage to identify leaks
These performance optimization techniques are essential for building high-throughput, low-latency pipeline systems.
Production Implementation Strategies
When deploying pipeline architectures to production environments, several additional considerations come into play.
Graceful Shutdown
Proper shutdown handling ensures that in-flight data is processed and resources are released:
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// Pipeline represents a multi-stage processing pipeline
type Pipeline struct {
stages []Stage
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
// Stage represents a single stage in the pipeline
type Stage struct {
Name string
Process func(context.Context) error
Shutdown func() error
}
// NewPipeline creates a new pipeline with cancellation support
func NewPipeline() *Pipeline {
ctx, cancel := context.WithCancel(context.Background())
return &Pipeline{
ctx: ctx,
cancel: cancel,
}
}
// AddStage adds a processing stage to the pipeline
func (p *Pipeline) AddStage(name string, process func(context.Context) error, shutdown func() error) {
p.stages = append(p.stages, Stage{
Name: name,
Process: process,
Shutdown: shutdown,
})
}
// Start launches all pipeline stages
func (p *Pipeline) Start() {
fmt.Println("Starting pipeline...")
for _, stage := range p.stages {
p.wg.Add(1)
s := stage // Capture for closure
go func() {
defer p.wg.Done()
fmt.Printf("Starting stage: %s\n", s.Name)
err := s.Process(p.ctx)
if err != nil && err != context.Canceled {
fmt.Printf("Error in stage %s: %v\n", s.Name, err)
}
fmt.Printf("Stage completed: %s\n", s.Name)
}()
}
}
// Shutdown gracefully shuts down the pipeline
func (p *Pipeline) Shutdown(timeout time.Duration) error {
fmt.Println("Initiating graceful shutdown...")
// Signal all stages to stop accepting new work
p.cancel()
// Create a timeout context for shutdown
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// Create a channel to signal when all goroutines are done
done := make(chan struct{})
// Wait for all goroutines in a separate goroutine
go func() {
p.wg.Wait()
close(done)
}()
// Wait for completion or timeout
select {
case <-done:
fmt.Println("All stages completed gracefully")
case <-ctx.Done():
return fmt.Errorf("shutdown timed out after %v", timeout)
}
// Call shutdown functions for each stage in reverse order
for i := len(p.stages) - 1; i >= 0; i-- {
stage := p.stages[i]
if stage.Shutdown != nil {
fmt.Printf("Shutting down stage: %s\n", stage.Name)
if err := stage.Shutdown(); err != nil {
fmt.Printf("Error shutting down stage %s: %v\n", stage.Name, err)
}
}
}
return nil
}
func main() {
// Create pipeline
pipeline := NewPipeline()
// Add stages
pipeline.AddStage(
"DataSource",
func(ctx context.Context) error {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("DataSource: Generating data...")
case <-ctx.Done():
fmt.Println("DataSource: Stopping data generation")
return ctx.Err()
}
}
},
func() error {
fmt.Println("DataSource: Closing resources")
return nil
},
)
pipeline.AddStage(
"Processor",
func(ctx context.Context) error {
ticker := time.NewTicker(300 * time.Millisecond)
defer ticker.
Transactions: userTransactions[tx.UserID],
Stop()
for {
select {
case <-ticker.C:
fmt.Println("Processor: Processing data...")
case <-ctx.Done():
fmt.Println("Processor: Stopping processing")
return ctx.Err()
}
}
},
func() error {
fmt.Println("Processor: Flushing pending work")
time.Sleep(100 * time.Millisecond) // Simulate flushing
return nil
},
)
// Start the pipeline
pipeline.Start()
// Set up signal handling for graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Wait for termination signal
sig := <-sigCh
fmt.Printf("Received signal: %v\n", sig)
// Perform graceful shutdown
if err := pipeline.Shutdown(5 * time.Second); err != nil {
fmt.Printf("Shutdown error: %v\n", err)
os.Exit(1)
}
fmt.Println("Pipeline shutdown complete")
}
This pattern ensures:
- Orderly Shutdown: Each stage has an opportunity to complete in-flight work
- Resource Cleanup: Resources are released in the correct order
- Timeout Handling: Shutdown doesn’t hang indefinitely
- Signal Handling: The pipeline responds to system signals
Deployment Considerations
When deploying pipeline architectures to production, several factors should be considered:
package main
import (
"context"
"fmt"
"log"
"os"
"runtime"
"runtime/pprof"
"sync"
"time"
)
// Configuration represents pipeline configuration that can be adjusted
// based on deployment environment
type Configuration struct {
WorkerCount int
BufferSizes int
BatchSize int
ShutdownTimeout time.Duration
EnableProfiling bool
ProfilingDir string
MetricsInterval time.Duration
HealthCheckPort int
ResourceLimits ResourceLimits
}
// ResourceLimits defines resource constraints for the pipeline
type ResourceLimits struct {
MaxMemoryMB int
MaxCPUPercentage int
MaxOpenFiles int
}
// LoadConfiguration loads configuration from environment or config file
func LoadConfiguration() Configuration {
// In a real application, this would load from environment variables,
// config files, or service discovery
return Configuration{
WorkerCount: runtime.NumCPU(),
BufferSizes: 1000,
BatchSize: 100,
ShutdownTimeout: 30 * time.Second,
EnableProfiling: true,
ProfilingDir: "./profiles",
MetricsInterval: 10 * time.Second,
HealthCheckPort: 8080,
ResourceLimits: ResourceLimits{
MaxMemoryMB: 1024,
MaxCPUPercentage: 80,
MaxOpenFiles: 1000,
},
}
}
// setupProfiling configures runtime profiling
func setupProfiling(config Configuration) func() {
if !config.EnableProfiling {
return func() {}
}
// Create profiling directory if it doesn't exist
if err := os.MkdirAll(config.ProfilingDir, 0755); err != nil {
log.Printf("Failed to create profiling directory: %v", err)
return func() {}
}
// Start CPU profiling
cpuFile, err := os.Create(fmt.Sprintf("%s/cpu-%d.pprof", config.ProfilingDir, time.Now().Unix()))
if err != nil {
log.Printf("Failed to create CPU profile: %v", err)
} else {
pprof.StartCPUProfile(cpuFile)
}
// Return cleanup function
return func() {
if cpuFile != nil {
pprof.StopCPUProfile()
cpuFile.Close()
}
// Write heap profile
heapFile, err := os.Create(fmt.Sprintf("%s/heap-%d.pprof", config.ProfilingDir, time.Now().Unix()))
if err != nil {
log.Printf("Failed to create heap profile: %v", err)
return
}
defer heapFile.Close()
if err := pprof.WriteHeapProfile(heapFile); err != nil {
log.Printf("Failed to write heap profile: %v", err)
}
}
}
// startHealthCheck starts a simple health check server
func startHealthCheck(ctx context.Context, port int) {
// In a real application, this would start an HTTP server
// that reports pipeline health status
log.Printf("Health check server started on port %d", port)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Println("Pipeline health: OK")
case <-ctx.Done():
log.Println("Stopping health check server")
return
}
}
}
func main() {
// Load configuration
config := LoadConfiguration()
// Set up profiling
cleanup := setupProfiling(config)
defer cleanup()
// Create context with cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start health check
go startHealthCheck(ctx, config.HealthCheckPort)
// Create and start pipeline (simplified)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
log.Println("Pipeline started")
// Simulate pipeline running
select {
case <-time.After(30 * time.Second):
log.Println("Pipeline completed normally")
case <-ctx.Done():
log.Println("Pipeline cancelled")
}
}()
// Wait for pipeline to complete
wg.Wait()
log.Println("Application shutting down")
}
Key deployment considerations:
- Configuration Management: Load configuration from environment variables or config files
- Resource Limits: Set appropriate memory and CPU limits
- Monitoring and Profiling: Enable metrics collection and profiling in production
- Health Checks: Implement health checks for container orchestration systems
- Logging: Configure structured logging for observability
Scaling Strategies
Pipeline architectures can be scaled in various ways:
package main
import (
"fmt"
"sync"
"time"
)
// ScalableStage represents a pipeline stage that can be scaled horizontally
type ScalableStage struct {
name string
process func(int) int
workerPool *WorkerPool
}
// WorkerPool manages a pool of workers for a stage
type WorkerPool struct {
input chan int
output chan int
workers int
queueSize int
wg sync.WaitGroup
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool(workers, queueSize int) *WorkerPool {
return &WorkerPool{
input: make(chan int, queueSize),
output: make(chan int, queueSize),
workers: workers,
queueSize: queueSize,
}
}
// Start launches the worker pool
func (p *WorkerPool) Start(process func(int) int) {
// Start workers
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for item := range p.input {
// Process item
result := process(item)
p.output <- result
}
}(i + 1)
}
// Close output when all workers are done
go func() {
p.wg.Wait()
close(p.output)
}()
}
// Submit adds an item to the worker pool
func (p *WorkerPool) Submit(item int) {
p.input <- item
}
// Results returns the output channel
func (p *WorkerPool) Results() <-chan int {
return p.output
}
// Stop gracefully shuts down the worker pool
func (p *WorkerPool) Stop() {
close(p.input)
p.wg.Wait()
}
// NewScalableStage creates a new scalable pipeline stage
func NewScalableStage(name string, workers, queueSize int, process func(int) int) *ScalableStage {
return &ScalableStage{
name: name,
process: process,
workerPool: NewWorkerPool(workers, queueSize),
}
}
// Start launches the stage
func (s *ScalableStage) Start() {
fmt.Printf("Starting stage %s with %d workers\n", s.name, s.workerPool.workers)
s.workerPool.Start(s.process)
}
// Submit adds an item to the stage
func (s *ScalableStage) Submit(item int) {
s.workerPool.Submit(item)
}
// Results returns the output channel
func (s *ScalableStage) Results() <-chan int {
return s.workerPool.Results()
}
// Stop gracefully shuts down the stage
func (s *ScalableStage) Stop() {
s.workerPool.Stop()
}
func main() {
// Create scalable pipeline stages
stage1 := NewScalableStage("Parser", 2, 100, func(i int) int {
// Simulate parsing
time.Sleep(10 * time.Millisecond)
return i
})
stage2 := NewScalableStage("Transformer", 4, 100, func(i int) int {
// Simulate transformation (CPU-intensive)
time.Sleep(20 * time.Millisecond)
return i * 2
})
stage3 := NewScalableStage("Validator", 1, 100, func(i int) int {
// Simulate validation
time.Sleep(5 * time.Millisecond)
return i
})
// Start all stages
stage1.Start()
stage2.Start()
stage3.Start()
// Connect stages
go func() {
for result := range stage1.Results() {
stage2.Submit(result)
}
stage2.Stop()
}()
go func() {
for result := range stage2.Results() {
stage3.Submit(result)
}
stage3.Stop()
}()
// Submit items to the first stage
for i := 1; i <= 100; i++ {
stage1.Submit(i)
}
stage1.Stop()
// Collect results from the final stage
var results []int
for result := range stage3.Results() {
results = append(results, result)
}
fmt.Printf("Processed %d items\n", len(results))
}
Key scaling strategies:
- Horizontal Scaling: Add more instances of a stage to handle increased load
- Vertical Scaling: Allocate more resources to each stage
- Dynamic Scaling: Adjust the number of workers based on load
- Partitioning: Divide work across multiple pipelines based on a partition key
- Load Balancing: Distribute work evenly across workers
These production implementation strategies ensure that pipeline architectures can be deployed reliably and scaled effectively in production environments.
The Art of Pipeline Design: Balancing Complexity and Performance
Throughout this article, we’ve explored a wide range of pipeline patterns and techniques for building robust, efficient data processing systems in Go. From basic linear pipelines to sophisticated stream processing architectures, we’ve seen how Go’s concurrency primitives provide powerful tools for expressing complex data flows.
The key to successful pipeline design lies in finding the right balance between complexity and performance. While advanced patterns can significantly improve throughput and resource utilization, they also introduce additional complexity that must be carefully managed. Here are some guiding principles to consider when designing pipeline architectures:
-
Start Simple: Begin with the simplest pipeline structure that meets your requirements, and add complexity only when necessary.
-
Measure Performance: Use benchmarks and profiling to identify bottlenecks before applying optimizations.
-
Consider Failure Modes: Design pipelines with error handling and recovery mechanisms appropriate for your reliability requirements.
-
Plan for Observability: Incorporate monitoring and metrics from the beginning to ensure visibility into pipeline behavior.
-
Design for Evolution: Create modular pipeline components that can be reconfigured and extended as requirements change.
By applying these principles and the patterns we’ve explored, you can build pipeline architectures that efficiently process data streams while maintaining code clarity and operational reliability. Whether you’re building real-time analytics systems, log processors, or high-throughput microservices, the pipeline patterns in this article provide a powerful toolkit for solving complex data processing challenges in Go.
As you implement these patterns in your own applications, remember that the most elegant solutions often emerge from a deep understanding of both the problem domain and the tools at your disposal. Take the time to understand your data flow requirements, experiment with different pipeline architectures, and continuously refine your approach based on real-world performance and maintainability considerations.
The journey to mastering pipeline patterns is ongoing, but with the foundation provided in this article, you’re well-equipped to design and implement sophisticated data processing systems that leverage the full power of Go’s concurrency model.